8 min read

[box type=”note” align=”” class=”” width=””]The following excerpt is taken from the book Mastering MongoDB 3.x authored by Alex Giamas. This book demonstrates the power of MongoDB to build high performance database solutions with ease.[/box]

MongoDB is one of the most popular NoSQL databases in the world and can be combined with various Big Data tools for efficient data processing. In this article we explore interesting features of MongoDB, which has been underappreciated and not widely supported throughout the industry as yet – the ability to write MapReduce natively using shell.

MapReduce is a data processing method for getting aggregate results from a large set of data. The main advantage is that it is inherently parallelizable as evidenced by frameworks such as Hadoop.

A simple example of MapReduce would be as follows, given that our input books collection is as follows:

> db.books.find()

{ "_id" : ObjectId("592149c4aabac953a3a1e31e"), "isbn" : "101", "name" :

"Mastering MongoDB", "price" : 30 }

{ "_id" : ObjectId("59214bc1aabac954263b24e0"), "isbn" : "102", "name" :

"MongoDB in 7 years", "price" : 50 }

{ "_id" : ObjectId("59214bc1aabac954263b24e1"), "isbn" : "103", "name" :

"MongoDB for experts", "price" : 40 }

And our map and reduce functions are defined as follows:

> var mapper = function() {

emit(this.id, 1);

};

In this mapper, we simply output a key of the id of each document with a value of 1:

> var reducer = function(id, count) {

return Array.sum(count);

};

In the reducer, we sum across all values (where each one has a value of 1):

> db.books.mapReduce(mapper, reducer, { out:"books_count" });

{

"result" : "books_count",

"timeMillis" : 16613,

"counts" : {

"input" : 3,

"emit" : 3,

"reduce" : 1,

"output" : 1

},

"ok" : 1

}

> db.books_count.find()

{ "_id" : null, "value" : 3 }

>

Our final output is a document with no ID, since we didn’t output any value for id, and a value of 6, since there are six documents in the input dataset.

Using MapReduce, MongoDB will apply map to each input document, emitting key-value pairs at the end of the map phase. Then each reducer will get key-value pairs with the same key as input, processing all multiple values. The reducer’s output will be a single key-value pair for each key.

Optionally, we can use a finalize function to further process the results of the mapper and reducer. MapReduce functions use JavaScript and run within the mongod process. MapReduce can output inline as a single document, subject to the 16 MB document size limit, or as multiple documents in an output collection. Input and output collections can be sharded.

MapReduce concurrency

MapReduce operations will place several short-lived locks that should not affect operations. However, at the end of the reduce phase, if we are outputting data to an existing collection, then output actions such as merge, reduce, and replace will take an exclusive global write lock for the whole server, blocking all other writes in the db instance. If we want to avoid that we should invoke MapReduce in the following way:

> db.collection.mapReduce(

Mapper,

Reducer,

{

out: { merge/reduce: bookOrders, nonAtomic: true

}

})

We can apply nonAtomic only to merge or reduce actions. replace will just replace the contents of documents in bookOrders, which would not take much time anyway.

With the merge action, the new result is merged with the existing result if the output collection already exists. If an existing document has the same key as the new result, then it will overwrite that existing document.

With the reduce action, the new result is processed together with the existing result if the output collection already exists. If an existing document has the same key as the new result, it will apply the reduce function to both the new and the existing documents and overwrite the existing document with the result.

Although MapReduce has been present since the early versions of MongoDB, it hasn’t evolved as much as the rest of the database, resulting in its usage being less than that of specialized MapReduce frameworks such as Hadoop.

Incremental MapReduce

Incremental MapReduce is a pattern where we use MapReduce to aggregate to previously calculated values. An example would be counting non-distinct users in a collection for different reporting periods (that is, hour, day, month) without the need to recalculate the result every hour.

To set up our data for incremental MapReduce we need to do the following:

  • Output our reduce data to a different collection
  • At the end of every hour, query only for the data that got into the collection in the last hour
  • With the output of our reduce data, merge our results with the calculated results from the previous hour

Following up on the previous example, let’s assume that we have a published field in each of the documents, with our input dataset being:

> db.books.find()

{ "_id" : ObjectId("592149c4aabac953a3a1e31e"), "isbn" : "101", "name" :

"Mastering MongoDB", "price" : 30, "published" :

ISODate("2017-06-25T00:00:00Z") }

{ "_id" : ObjectId("59214bc1aabac954263b24e0"), "isbn" : "102", "name" :

"MongoDB in 7 years", "price" : 50, "published" :

ISODate("2017-06-26T00:00:00Z") }

Using our previous example of counting books we would get the following:

var mapper = function() {

emit(this.id, 1);

};

var reducer = function(id, count) {

return Array.sum(count);

};

> db.books.mapReduce(mapper, reducer, { out: "books_count" })

{

"result" : "books_count",

"timeMillis" : 16700,

"counts" : {

"input" : 2,

"emit" : 2,

"reduce" : 1,

"output" : 1

},

"ok" : 1

}

> db.books_count.find()

{ "_id" : null, "value" : 2 }

Now we get a third book in our mongo_books collection with a document:

{ "_id" : ObjectId("59214bc1aabac954263b24e1"), "isbn" : "103", "name" :

"MongoDB for experts", "price" : 40, "published" :

ISODate("2017-07-01T00:00:00Z") }

> db.books.mapReduce( mapper, reducer, { query: { published: { $gte: ISODate('2017-07-01 00:00:00') } }, out: { reduce: "books_count" } } )

> db.books_count.find()

{ "_id" : null, "value" : 3 }

What happened here, is that by querying for documents in July 2017 we only got the new document out of the query and then used its value to reduce the value with the already calculated value of 2 in our books_count document, adding 1 to the final sum of three documents. This example, as contrived as it is, shows a powerful attribute of MapReduce: the ability to re-reduce results to incrementally calculate aggregations over time.

Troubleshooting MapReduce

Throughout the years, one of the major shortcomings of MapReduce frameworks has been the inherent difficulty in troubleshooting as opposed to simpler non-distributed patterns. Most of the time, the most effective tool is debugging using log statements to verify that output values match our expected values. In the mongo shell, this being a JavaScript shell, this is as simple as outputting using the console.log()function.

Diving deeper into MapReduce in MongoDB we can debug both in the map and the reduce phase by overloading the output values.

Debugging the mapper phase, we can overload the emit() function to test what the output key values are:

> var emit = function(key, value) {

print("debugging mapper's emit");

print("key: " + key + " value: " + tojson(value));

}

We can then call it manually on a single document to verify that we get back the key-value pair that we would expect:

> var myDoc = db.orders.findOne( { _id:

ObjectId("50a8240b927d5d8b5891743c") } );

> mapper.apply(myDoc);

The reducer function is somewhat more complicated. A MapReduce reducer function must meet the following criteria:

  • It must be idempotent
  • The order of values coming from the mapper function should not matter for the reducer’s result
  • The reduce function must return the same type of result as the mapper function

We will dissect these following requirements to understand what they really mean:

  • It must be idempotent: MapReduce by design may call the reducer multiple times for the same key with multiple values from the mapper phase. It also doesn’t need to reduce single instances of a key as it’s just added to the set. The final value should be the same no matter the order of execution. This can be verified by writing our own “verifier” function forcing the reducer to re-reduce or by executing the reducer many, many times:
reduce( key, [ reduce(key, valuesArray) ] ) == reduce( key, valuesArray )
  • It must be commutative: Again, because multiple invocations of the reducer may happen for the same key, if it has multiple values, the following should hold:
reduce(key, [ C, reduce(key, [ A, B ]) ] ) == reduce( key, [C, A, B ] )
  • The order of values coming from the mapper function should not matter for the reducer’s result: We can test that the order of values from the mapper doesn’t change the output for the reducer by passing in documents to the mapper in a different order and verifying that we get the same results out:
reduce( key, [ A, B ] ) == reduce( key, [ B, A ] )
  • The reduce function must return the same type of result as the mapper function: Hand-in-hand with the first requirement, the type of object that the reduce function returns should be the same as the output of the mapper function.

We saw how MapReduce is useful when implemented on a data pipeline. Multiple MapReduce commands can be chained to produce different results. An example would be aggregating data by different reporting periods (hour, day, week, month, year) where we use the output of each more granular reporting period to produce a less granular report.

If you found this article useful, make sure to check our book Mastering MongoDB 3.x to get more insights and information about MongoDB’s vast data storage, management and administration capabilities.Mastering MongoDB 3.x

Data Science Enthusiast. A massive science fiction and Manchester United fan. Loves to read, write and listen to music.

LEAVE A REPLY

Please enter your comment!
Please enter your name here