-
Notifications
You must be signed in to change notification settings - Fork 602
Enron Emails Example
This sample can be run from the root directory with ./gradlew enronEmails
. This will download the Enron email corpus and import them
automatically in to mongodb. You can manually download a copy of the data set
here. Each document in the data set contains a single e-mail,
including headers containing sender and recipient information. In this example we will build a list of the unique sender/recipient pairs,
counting how many times each pair occurs.
Abbreviated code snippets shown below - to see the full source for this example, please see here
The mapper class will get the headers
field from each document, parse out the sender from the From
field and the recipients from
the To
field, and construct a MailPair
object containing each pair which will act as the key. Then we emit the value 1
for each key.
MailPair
is just a simple "POJO" that contains Strings for the from
and to
values, and implements WritableComparable
so that it
can be serialized across Hadoop nodes and sorted.
@Override
public void map(NullWritable key, BSONObject val, final Context context)
throws IOException, InterruptedException{
if (val.containsKey("headers")) {
BSONObject headers = (BSONObject)val.get("headers");
if (headers.containsKey("From") && headers.containsKey("To")){
String from = (String)headers.get("From");
String to = (String)headers.get("To");
String[] recips = to.split(",");
for(int i=0;i<recips.length;i++){
String recip = recips[i].trim();
if (recip.length() > 0) {
context.write(new MailPair(from, recip), new IntWritable(1));
}
}
}
}
}
The reduce class will take the collected values for each key, sum them together, and record the output.
@Override
public void reduce( final MailPair pKey,
final Iterable<IntWritable> pValues,
final Context pContext )
throws IOException, InterruptedException{
int sum = 0;
for ( final IntWritable value : pValues ){
sum += value.get();
}
BSONObject outDoc = new BasicDBObjectBuilder().start().add( "f" , pKey.from).add( "t" , pKey.to ).get();
BSONWritable pkeyOut = new BSONWritable(outDoc);
pContext.write( pkeyOut, new IntWritable(sum) );
}
To accomplish the same with pig, but with much less work:
REGISTER ../mongo-2.10.1.jar;
REGISTER ../core/target/mongo-hadoop-core_cdh4.3.0-1.1.0.jar
REGISTER ../pig/target/mongo-hadoop-pig_cdh4.3.0-1.1.0.jar
raw = LOAD 'file:///Users/mike/dump/enron_mail/messages.bson' using com.mongodb.hadoop.pig.BSONLoader('','headers:[]') ;
send_recip = FOREACH raw GENERATE $0#'From' as from, $0#'To' as to;
send_recip_filtered = FILTER send_recip BY to IS NOT NULL;
send_recip_split = FOREACH send_recip_filtered GENERATE from as from, FLATTEN(TOKENIZE(to)) as to;
send_recip_split_trimmed = FOREACH send_recip_split GENERATE from as from, TRIM(to) as to;
send_recip_grouped = GROUP send_recip_split_trimmed BY (from, to);
send_recip_counted = FOREACH send_recip_grouped GENERATE group, COUNT($1) as count;
STORE send_recip_counted INTO 'file:///tmp/enron_emailcounts.bson' using com.mongodb.hadoop.pig.BSONStorage;