Skip to content
This repository has been archived by the owner on Jan 29, 2022. It is now read-only.

Sensor Logs Example

Lucka edited this page Apr 17, 2019 · 3 revisions

Sensor Logs

This sample can be run from the root directory with ./gradlew sensorData. This will generate the data necessary for this example.

Goal

This example will deal with a basic example that does a "join" across two different collections, and will demonstrate using MongoUpdateWritable which lets you do complex updates when writing output records (instead of simple inserts).

Assume we have a collection called devices, each document contains the description of a sensor which records a particular type of data, for example:

{
  "_id": ObjectId("51b792d381c3e67b0a18d0ed"),
  "name": "730LsNaN",
  "type": "pressure",
  "owner": "lswNxts07k",
  "model": 18,
  "created_at": ISODate("2003-12-02T11:15:09.555-0500")
}

A second collection called logs contains data recorded by these sensors. Each document records the _id of the device it came from in the d_id field, the value, the timestamp when it was recorded, and the device's location at the time. The logs collection will be much larger than the devices collection, since each device will record potentially thousands of data points.

{
  "_id": ObjectId("51b792d381c3e67b0a18d678"),
  "d_id": ObjectId("51b792d381c3e67b0a18d4a1"),
  "v": 3328.5895416489802,
  "timestamp": ISODate("2013-05-18T13:11:38.709-0400"),
  "loc": [
    -175.13,
    51.658
  ]
}

As an example, let's solve an aggregation problem involving both of these collections - calculate the number of log entries for each owner, for each type of sensor (heat, pressure, etc).

We will solve this by doing two passes of Map/Reduce. The first will operate over the devices collection and seed an output collection by building a list of all the devices belonging to each owner. Then we will do a second pass over the logs collection, computing the totals by using $inc on the pre-seeded output collection.

Phase One

The Mapper code in phase one just produces the pair <owner,_id> for each device. The Reducer then takes the list of all _ids for each owner and creates a new document containing them.

public class DeviceMapper extends Mapper<Object, BSONObject, Text, Text>{

    @Override
    public void map(Object key, BSONObject val, final Context context) 
        throws IOException, InterruptedException {
        String keyOut = (String)val.get("owner") + " " + (String)val.get("type");
        context.write(new Text(keyOut), new Text(val.get("_id").toString()));
    }

}

public class DeviceReducer extends Reducer<Text, Text, NullWritable, MongoUpdateWritable>{

    @Override
    public void reduce( final Text pKey, final Iterable<Text> pValues,
                        final Context pContext )
            throws IOException, InterruptedException{
        
        BasicBSONObject query = new BasicBSONObject("_id", pKey.toString());
        ArrayList<ObjectId> devices = new ArrayList<ObjectId>();
        for(Text val : pValues){
            devices.add(new ObjectId(val.toString()));
        }

        BasicBSONObject devices_list = new BasicBSONObject("devices", devices);
        BasicBSONObject update = new BasicBSONObject("$pushAll", devices_list);

        pContext.write(null, new MongoUpdateWritable(query, update, true, false));
    }
}

After phase one, the output collection documents each look like this:

{
  "_id": "1UoTcvnCTz temp",
  "devices": [
    ObjectId("51b792d381c3e67b0a18d475"),
    ObjectId("51b792d381c3e67b0a18d16d"),
    ObjectId("51b792d381c3e67b0a18d2bf"),
    
  ]
}

Phase Two

In phase two, we map over the large collection logs and compute the totals for each device owner/type. The mapper emits the device id from each log item along, and the reducer uses MongoUpdateWritable to increment counts of these into the output collection by querying the record that contains the device's _id in its devices array which we populated in phase one.

public class LogMapper extends Mapper<Object, BSONObject, Text, IntWritable>{
    @Override
    public void map(Object key, BSONObject val, 
                    final Context context)
                    throws IOException, InterruptedException{
        context.write(new Text(((ObjectId)val.get("d_id")).toString()), 
                      new IntWritable(1));
    }
}

public class LogReducer extends Reducer<Text, IntWritable, NullWritable, MongoUpdateWritable> {

    @Override
    public void reduce( final Text pKey,
                        final Iterable<IntWritable> pValues,
                        final Context pContext )
            throws IOException, InterruptedException{
        
        int count = 0;
        for(IntWritable val : pValues){
            count += val.get();
        }

        BasicBSONObject query = new BasicBSONObject("devices", new ObjectId(pKey.toString()));
        BasicBSONObject update = new BasicBSONObject("$inc", new BasicBSONObject("logs_count", count));
        pContext.write(null, new MongoUpdateWritable(query, update, true, false));
    }

}

After phase two is finished, the result documents look like this (the logs_count field is now populated with the result):

{
  "_id": "1UoTcvnCTz temp",
  "devices": [
    ObjectId("51b792d381c3e67b0a18d475"),
    ObjectId("51b792d381c3e67b0a18d16d"),
    ObjectId("51b792d381c3e67b0a18d2bf"),
    
  ],
  "logs_count": 1050616
}
Clone this wiki locally