-
Notifications
You must be signed in to change notification settings - Fork 602
Sensor Logs Example
This sample can be run from the root directory with ./gradlew sensorData
. This will generate the data necessary for this example.
This example will deal with a basic example that does a "join" across two different collections, and will demonstrate using MongoUpdateWritable
which lets you do complex updates when writing output records (instead of simple inserts).
Assume we have a collection called devices
, each document contains the description of a sensor which records a particular type of data,
for example:
{
"_id": ObjectId("51b792d381c3e67b0a18d0ed"),
"name": "730LsNaN",
"type": "pressure",
"owner": "lswNxts07k",
"model": 18,
"created_at": ISODate("2003-12-02T11:15:09.555-0500")
}
A second collection called logs
contains data recorded by these sensors. Each document records the _id
of the device it came from in
the d_id
field, the value, the timestamp when it was recorded, and the device's location at the time. The logs
collection will be
much larger than the devices
collection, since each device will record potentially thousands of data points.
{
"_id": ObjectId("51b792d381c3e67b0a18d678"),
"d_id": ObjectId("51b792d381c3e67b0a18d4a1"),
"v": 3328.5895416489802,
"timestamp": ISODate("2013-05-18T13:11:38.709-0400"),
"loc": [
-175.13,
51.658
]
}
As an example, let's solve an aggregation problem involving both of these collections - calculate the number of log entries for each owner, for each type of sensor (heat, pressure, etc).
We will solve this by doing two passes of Map/Reduce. The first will operate over the devices
collection and seed an output collection
by building a list of all the devices belonging to each owner. Then we will do a second pass over the logs
collection, computing the
totals by using $inc
on the pre-seeded output collection.
The Mapper
code in phase one just produces the pair <owner,_id>
for each device. The Reducer
then takes the list of all _id
s for
each owner and creates a new document containing them.
public class DeviceMapper extends Mapper<Object, BSONObject, Text, Text>{
@Override
public void map(Object key, BSONObject val, final Context context)
throws IOException, InterruptedException {
String keyOut = (String)val.get("owner") + " " + (String)val.get("type");
context.write(new Text(keyOut), new Text(val.get("_id").toString()));
}
}
public class DeviceReducer extends Reducer<Text, Text, NullWritable, MongoUpdateWritable>{
@Override
public void reduce( final Text pKey, final Iterable<Text> pValues,
final Context pContext )
throws IOException, InterruptedException{
BasicBSONObject query = new BasicBSONObject("_id", pKey.toString());
ArrayList<ObjectId> devices = new ArrayList<ObjectId>();
for(Text val : pValues){
devices.add(new ObjectId(val.toString()));
}
BasicBSONObject devices_list = new BasicBSONObject("devices", devices);
BasicBSONObject update = new BasicBSONObject("$pushAll", devices_list);
pContext.write(null, new MongoUpdateWritable(query, update, true, false));
}
}
After phase one, the output collection documents each look like this:
{
"_id": "1UoTcvnCTz temp",
"devices": [
ObjectId("51b792d381c3e67b0a18d475"),
ObjectId("51b792d381c3e67b0a18d16d"),
ObjectId("51b792d381c3e67b0a18d2bf"),
…
]
}
In phase two, we map over the large collection logs
and compute the totals for each device owner/type. The mapper emits the device id
from each log item along, and the reducer uses MongoUpdateWritable
to increment counts of these into the output collection by querying
the record that contains the device's _id
in its devices
array which we populated in phase one.
public class LogMapper extends Mapper<Object, BSONObject, Text, IntWritable>{
@Override
public void map(Object key, BSONObject val,
final Context context)
throws IOException, InterruptedException{
context.write(new Text(((ObjectId)val.get("d_id")).toString()),
new IntWritable(1));
}
}
public class LogReducer extends Reducer<Text, IntWritable, NullWritable, MongoUpdateWritable> {
@Override
public void reduce( final Text pKey,
final Iterable<IntWritable> pValues,
final Context pContext )
throws IOException, InterruptedException{
int count = 0;
for(IntWritable val : pValues){
count += val.get();
}
BasicBSONObject query = new BasicBSONObject("devices", new ObjectId(pKey.toString()));
BasicBSONObject update = new BasicBSONObject("$inc", new BasicBSONObject("logs_count", count));
pContext.write(null, new MongoUpdateWritable(query, update, true, false));
}
}
After phase two is finished, the result documents look like this (the logs_count
field is now populated with the result):
{
"_id": "1UoTcvnCTz temp",
"devices": [
ObjectId("51b792d381c3e67b0a18d475"),
ObjectId("51b792d381c3e67b0a18d16d"),
ObjectId("51b792d381c3e67b0a18d2bf"),
…
],
"logs_count": 1050616
}