Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extended to make the saveAsHadoop file an async operation to leverage th... #42

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions core/src/main/scala/org/apache/spark/FutureAction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.util.Try
import org.apache.spark.annotation.Experimental
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{JobFailed, JobSucceeded, JobWaiter}
import java.util.concurrent.atomic.AtomicBoolean

/**
* :: Experimental ::
Expand Down Expand Up @@ -151,6 +152,83 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
}
}

/**
* :: Experimental ::
* A [[FutureAction]] holding the result of an action that triggers a single job. Also a post
* complete function passed in for resource collection and/or job accounting, etc.
*/
@Experimental
class SimpleFutureWithPostCompleteAction[T] private[spark](
jobWaiter: JobWaiter[_], resultFunc: => T, postCompleteFunc: () => Unit)
extends FutureAction[T] {

override def cancel() {
jobWaiter.cancel()
}

override def ready(atMost: Duration)(
implicit permit: CanAwait): SimpleFutureWithPostCompleteAction.this.type = {
if (!atMost.isFinite()) {
awaitResult()
} else jobWaiter.synchronized {
val finishTime = System.currentTimeMillis() + atMost.toMillis
while (!isCompleted) {
val time = System.currentTimeMillis()
if (time >= finishTime) {
executePostCompleteFunc()
throw new TimeoutException
} else {
jobWaiter.wait(finishTime - time)
}
}
}
this
}

@throws(classOf[Exception])
override def result(atMost: Duration)(implicit permit: CanAwait): T = {
ready(atMost)(permit)
awaitResult() match {
case scala.util.Success(res) => res
case scala.util.Failure(e) => throw e
}
}

override def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext) {
executor.execute(new Runnable {
override def run() {
func(awaitResult())
}
})
}

override def isCompleted: Boolean = jobWaiter.jobFinished

override def value: Option[Try[T]] = {
if (jobWaiter.jobFinished) {
Some(awaitResult())
} else {
None
}
}

private def awaitResult(): Try[T] = {
jobWaiter.awaitResult() match {
case JobSucceeded =>
executePostCompleteFunc()
scala.util.Success(resultFunc)
case JobFailed(e: Exception) =>
executePostCompleteFunc()
scala.util.Failure(e)
}
}

private[this] def executePostCompleteFunc() {
if (!postCompleteExecuted.getAndSet(true)) postCompleteFunc()
}

private[this] val postCompleteExecuted: AtomicBoolean = new AtomicBoolean(false)
}

/**
* :: Experimental ::
Expand Down
23 changes: 23 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1119,6 +1119,29 @@ class SparkContext(config: SparkConf) extends Logging {
rdd.doCheckpoint()
}

def submitSaveAsHadoopFileJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
postCompleteFunc: () => Unit): FutureAction[Unit] = {
if (dagScheduler == null) {
throw new SparkException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
val waiter = dagScheduler.submitJob(
rdd,
cleanedFunc,
partitions,
callSite,
allowLocal,
resultHandler,
localProperties.get)
new SimpleFutureWithPostCompleteAction(waiter, Unit, postCompleteFunc)
}

/**
* Run a function on a given set of partitions in an RDD and return the results as an array. The
* allowLocal flag specifies whether the scheduler can run the computation on the driver rather
Expand Down
239 changes: 239 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1034,6 +1034,245 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.commitJob()
}

//////////////////////////////////////
// Async saveAsHadoopFile extension //
//////////////////////////////////////

/**
* Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
* supporting the key and value types K and V in this RDD.
*/
def saveAsHadoopFileAsync[F <: OutputFormat[K, V]](path: String)(
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of this section is just the change of the return type, with a FutureAction instead of Unit

implicit fm: ClassTag[F]): FutureAction[Unit] = {
saveAsHadoopFileAsync(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]])
}

/**
* Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
* supporting the key and value types K and V in this RDD. Compress the result with the
* supplied codec.
*/
def saveAsHadoopFileAsync[F <: OutputFormat[K, V]](
path: String, codec: Class[_ <: CompressionCodec]) (
implicit fm: ClassTag[F]): FutureAction[Unit] = {
val runtimeClass = fm.runtimeClass
saveAsHadoopFileAsync(path, keyClass, valueClass, runtimeClass.asInstanceOf[Class[F]], codec)
}

/**
* Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat`
* (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD.
*/
def saveAsNewAPIHadoopFileAsync[F <: NewOutputFormat[K, V]](
path: String)(
implicit fm: ClassTag[F]): FutureAction[Unit] = {
saveAsNewAPIHadoopFileAsync(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]])
}

/**
* Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat`
* (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD.
*/
def saveAsNewAPIHadoopFileAsync(
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
conf: Configuration = self.context.hadoopConfiguration): FutureAction[Unit] =
{
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
val hadoopConf = conf
val job = new NewAPIHadoopJob(hadoopConf)
job.setOutputKeyClass(keyClass)
job.setOutputValueClass(valueClass)
job.setOutputFormatClass(outputFormatClass)
job.getConfiguration.set("mapred.output.dir", path)
saveAsNewAPIHadoopDatasetAsync(job.getConfiguration)
}

/**
* Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
* supporting the key and value types K and V in this RDD. Compress with the supplied codec.
*/
def saveAsHadoopFileAsync(
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: OutputFormat[_, _]],
codec: Class[_ <: CompressionCodec]): FutureAction[Unit] = {
saveAsHadoopFileAsync(path, keyClass, valueClass, outputFormatClass,
new JobConf(self.context.hadoopConfiguration), Some(codec))
}

/**
* Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
* supporting the key and value types K and V in this RDD.
*/
def saveAsHadoopFileAsync(
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: OutputFormat[_, _]],
conf: JobConf = new JobConf(self.context.hadoopConfiguration),
codec: Option[Class[_ <: CompressionCodec]] = None): FutureAction[Unit] = {
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
val hadoopConf = conf
hadoopConf.setOutputKeyClass(keyClass)
hadoopConf.setOutputValueClass(valueClass)
// Doesn't work in Scala 2.9 due to what may be a generics bug
// TODO: Should we uncomment this for Scala 2.10?
// conf.setOutputFormat(outputFormatClass)
hadoopConf.set("mapred.output.format.class", outputFormatClass.getName)
for (c <- codec) {
hadoopConf.setCompressMapOutput(true)
hadoopConf.set("mapred.output.compress", "true")
hadoopConf.setMapOutputCompressorClass(c)
hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName)
hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
}
hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
FileOutputFormat.setOutputPath(hadoopConf,
SparkHadoopWriter.createPathFromString(path, hadoopConf))
saveAsHadoopDatasetAsync(hadoopConf)
}

/**
* Output the RDD to any Hadoop-supported storage system with new Hadoop API, using a Hadoop
* Configuration object for that storage system. The Conf should set an OutputFormat and any
* output paths required (e.g. a table name to write to) in the same way as it would be
* configured for a Hadoop MapReduce job.
*/
def saveAsNewAPIHadoopDatasetAsync(conf: Configuration): FutureAction[Unit] = {
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
val hadoopConf = conf
val job = new NewAPIHadoopJob(hadoopConf)
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
val stageId = self.id
val wrappedConf = new SerializableWritable(job.getConfiguration)
val outfmt = job.getOutputFormatClass
val jobFormat = outfmt.newInstance

if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
// FileOutputFormat ignores the filesystem parameter
jobFormat.checkOutputSpecs(job)
}

val writeShard = (context: TaskContext, iter: Iterator[(K,V)]) => {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
/* "reduce task" <split #> <attempt # = spark task #> */
val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
attemptNumber)
val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
val format = outfmt.newInstance
format match {
case c: Configurable => c.setConf(wrappedConf.value)
case _ => ()
}
val committer = format.getOutputCommitter(hadoopContext)
committer.setupTask(hadoopContext)
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
try {
while (iter.hasNext) {
val pair = iter.next()
writer.write(pair._1, pair._2)
}
} finally {
writer.close(hadoopContext)
}
committer.commitTask(hadoopContext)
1
} : Int

val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0)
val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
jobCommitter.setupJob(jobTaskContext)

val postCompleteFunc = () => { jobCommitter.commitJob(jobTaskContext) }

self.context.submitSaveAsHadoopFileJob(
self,
writeShard,
0 until self.partitions.size,
false,
(_, _: Int) => Unit,
postCompleteFunc
)

}

/**
* Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for
* that storage system. The JobConf should set an OutputFormat and any output paths required
* (e.g. a table name to write to) in the same way as it would be configured for a Hadoop
* MapReduce job.
*/
def saveAsHadoopDatasetAsync(conf: JobConf): FutureAction[Unit] = {
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
val hadoopConf = conf
val outputFormatInstance = hadoopConf.getOutputFormat
val keyClass = hadoopConf.getOutputKeyClass
val valueClass = hadoopConf.getOutputValueClass
if (outputFormatInstance == null) {
throw new SparkException("Output format class not set")
}
if (keyClass == null) {
throw new SparkException("Output key class not set")
}
if (valueClass == null) {
throw new SparkException("Output value class not set")
}
SparkHadoopUtil.get.addCredentials(hadoopConf)

logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
valueClass.getSimpleName + ")")

if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
// FileOutputFormat ignores the filesystem parameter
val ignoredFs = FileSystem.get(hadoopConf)
hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf)
}

val writer = new SparkHadoopWriter(hadoopConf)
writer.preSetup()

val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt

writer.setup(context.stageId, context.partitionId, attemptNumber)
writer.open()
try {
var count = 0
while (iter.hasNext) {
val record = iter.next()
count += 1
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
}
} finally {
writer.close()
}
writer.commit()
}

val postCompleteFunc = () => { writer.commitJob() }

self.context.submitSaveAsHadoopFileJob(
self,
writeToFile,
0 until self.partitions.size,
false,
(_, _: Unit) => Unit,
postCompleteFunc
)
}



/**
* Return an RDD with the keys of each tuple.
*/
Expand Down