Skip to content

Commit

Permalink
Create a saveAsNewAPIHadoopDataset method
Browse files Browse the repository at this point in the history
  • Loading branch information
CodingCat committed Mar 18, 2014
1 parent e7423d4 commit a8583ee
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 45 deletions.
10 changes: 9 additions & 1 deletion core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import com.google.common.base.Optional
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job}

import org.apache.spark.{HashPartitioner, Partitioner}
import org.apache.spark.Partitioner._
Expand Down Expand Up @@ -558,6 +558,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
}

/**
* Output the RDD to any Hadoop-supported storage system, using
* a org.apache.hadoop.mapreduce.Job object for that storage system.
*/
def saveAsNewAPIHadoopDataset(job: Job) {
rdd.saveAsNewAPIHadoopDataset(job)
}

/** Output the RDD to any Hadoop-supported file system. */
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
path: String,
Expand Down
104 changes: 60 additions & 44 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,21 @@ import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

import com.clearspring.analytics.stream.cardinality.HyperLogLog
<<<<<<< HEAD
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.fs.{FileSystem, Path}
=======
import org.apache.hadoop.conf.Configuration
>>>>>>> Create a saveAsNewAPIHadoopDataset method
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob, RecordWriter => NewRecordWriter, JobContext, SparkHadoopMapReduceUtil}
<<<<<<< HEAD
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
=======

>>>>>>> Create a saveAsNewAPIHadoopDataset method

// SparkHadoopWriter and SparkHadoopMapReduceUtil are actually source files defined in Spark.
import org.apache.hadoop.mapred.SparkHadoopWriter
Expand Down Expand Up @@ -603,50 +611,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
val job = new NewAPIHadoopJob(conf)
job.setOutputKeyClass(keyClass)
job.setOutputValueClass(valueClass)

val wrappedConf = new SerializableWritable(job.getConfiguration)
val outpath = new Path(path)
NewFileOutputFormat.setOutputPath(job, outpath)
val jobFormat = outputFormatClass.newInstance
jobFormat.checkOutputSpecs(job)
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
val stageId = self.id
def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
/* "reduce task" <split #> <attempt # = spark task #> */
val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
attemptNumber)
val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
val format = outputFormatClass.newInstance
format match {
case c: Configurable => c.setConf(wrappedConf.value)
case _ => ()
}
val committer = format.getOutputCommitter(hadoopContext)
committer.setupTask(hadoopContext)
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
while (iter.hasNext) {
val (k, v) = iter.next()
writer.write(k, v)
}
writer.close(hadoopContext)
committer.commitTask(hadoopContext)
return 1
}

/* apparently we need a TaskAttemptID to construct an OutputCommitter;
* however we're only going to use this local OutputCommitter for
* setupJob/commitJob, so we just use a dummy "map" task.
*/
val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0)
val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
jobCommitter.setupJob(jobTaskContext)
self.context.runJob(self, writeShard _)
jobCommitter.commitJob(jobTaskContext)
job.setOutputFormatClass(outputFormatClass)
job.getConfiguration.set("mapred.output.dir", path)
saveAsNewAPIHadoopDataset(job)
}

/**
Expand Down Expand Up @@ -692,6 +659,55 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
saveAsHadoopDataset(conf)
}

/**
* Output the RDD to any Hadoop-supported storage system with new Hadoop API, using a Hadoop
* Job object for that storage system. The Job should set an OutputFormat and any output paths
* required (e.g. a table name to write to) in the same way as it would be configured for a Hadoop
* MapReduce job.
*/
def saveAsNewAPIHadoopDataset(job: NewAPIHadoopJob) {
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
val stageId = self.id
val wrappedConf = new SerializableWritable(job.getConfiguration)
val outfmt = job.getOutputFormatClass
val outputFormatInstance = outfmt.newInstance()

if (outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
// FileOutputFormat ignores the filesystem parameter
val conf = job.getConfiguration
outputFormatInstance.checkOutputSpecs(job)
}

def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
/* "reduce task" <split #> <attempt # = spark task #> */
val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
attemptNumber)
val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
val format = outfmt.newInstance
val committer = format.getOutputCommitter(hadoopContext)
committer.setupTask(hadoopContext)
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
while (iter.hasNext) {
val (k, v) = iter.next()
writer.write(k, v)
}
writer.close(hadoopContext)
committer.commitTask(hadoopContext)
return 1
}
val jobFormat = outfmt.newInstance
val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0)
val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
jobCommitter.setupJob(jobTaskContext)
self.context.runJob(self, writeShard _).sum
jobCommitter.commitJob(jobTaskContext)
}

/**
* Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for
* that storage system. The JobConf should set an OutputFormat and any output paths required
Expand Down

0 comments on commit a8583ee

Please sign in to comment.