-
Notifications
You must be signed in to change notification settings - Fork 602
Enron Emails Example
This sample can be run from the root directory with ./gradlew enronEmails
. This will download the Enron email corpus and import them
automatically in to mongodb. You can manually download a copy of the data set
here.
Abbreviated code snippets shown below, but you may also check out the full source.
Each document in the data set contains a single e-mail, including headers for sender and recipients. In this example we will build a list of the unique sender/recipient pairs, counting how many times each pair occurs.
The mapper class will get the headers
field from each document, parse out the sender from the From
field and the recipients from
the To
field, and construct a MailPair
object containing each pair which will act as the key. Then we emit the value 1
for each key.
MailPair
is just a simple "POJO" that contains Strings for the from
and to
values, and implements WritableComparable
so that it
can be serialized across Hadoop nodes and sorted.
@Override
public void map(NullWritable key, BSONObject val, final Context context)
throws IOException, InterruptedException{
if (val.containsKey("headers")) {
BSONObject headers = (BSONObject)val.get("headers");
if (headers.containsKey("From") && headers.containsKey("To")){
String from = (String)headers.get("From");
String to = (String)headers.get("To");
String[] recips = to.split(",");
for(int i=0;i<recips.length;i++){
String recip = recips[i].trim();
if (recip.length() > 0) {
context.write(new MailPair(from, recip), new IntWritable(1));
}
}
}
}
}
The reduce class will take the collected values for each key, sum them together, and record the output.
@Override
public void reduce( final MailPair pKey,
final Iterable<IntWritable> pValues,
final Context pContext )
throws IOException, InterruptedException{
int sum = 0;
for ( final IntWritable value : pValues ){
sum += value.get();
}
BSONObject outDoc = new BasicDBObjectBuilder().start().add( "f" , pKey.from).add( "t" , pKey.to ).get();
BSONWritable pkeyOut = new BSONWritable(outDoc);
pContext.write( pkeyOut, new IntWritable(sum) );
}
To accomplish the same with pig, but with much less work:
REGISTER ../mongo-2.10.1.jar;
REGISTER ../core/target/mongo-hadoop-core_cdh4.3.0-1.1.0.jar
REGISTER ../pig/target/mongo-hadoop-pig_cdh4.3.0-1.1.0.jar
raw = LOAD 'file:///Users/mike/dump/enron_mail/messages.bson' using com.mongodb.hadoop.pig.BSONLoader('','headers:[]') ;
send_recip = FOREACH raw GENERATE $0#'From' as from, $0#'To' as to;
send_recip_filtered = FILTER send_recip BY to IS NOT NULL;
send_recip_split = FOREACH send_recip_filtered GENERATE from as from, FLATTEN(TOKENIZE(to)) as to;
send_recip_split_trimmed = FOREACH send_recip_split GENERATE from as from, TRIM(to) as to;
send_recip_grouped = GROUP send_recip_split_trimmed BY (from, to);
send_recip_counted = FOREACH send_recip_grouped GENERATE group, COUNT($1) as count;
STORE send_recip_counted INTO 'file:///tmp/enron_emailcounts.bson' using com.mongodb.hadoop.pig.BSONStorage;
The MongoDB Hadoop Connector can also read data from S3 buckets. You can exercise this functionality through this example by doing the following:
- Download the enron data set and put
messages.bson
into an S3 bucket. - Set the environment variables
AWS_SECRET_ACCESS_KEY
andAWS_ACCESS_KEY_ID
per your AWS account. - Run
./examples/elastic-mapreduce/update_s3.sh
from the root of this project. This will placeemr-bootstrap.sh
and the necessary JAR files into the S3 bucket and sets their permissions so that they're readable. Note: This script requires thes3cp
utility, which you can install usingget install s3cp
or clone from Github.emr-bootstrap.sh
will download dependencies like the MongoDB Java Driver and the MongoDB Hadoop Connector core jar as well as set the classpath. - Run
./examples/elastic-mapreduce/run_emr_job.sh
to submit the job to Elastic MapReduce. Note: This requireselastic-mapreduce-ruby
.