Skip to content

Commit

Permalink
For outputformats that are Configurable, call setConf before sending …
Browse files Browse the repository at this point in the history
…data to them.

[SPARK-1108] This allows us to use, e.g. HBase's TableOutputFormat with PairRDDFunctions.saveAsNewAPIHadoopFile, which otherwise would throw NullPointerException because the output table name hasn't been configured.

Note this bug also affects branch-0.9

Author: Bryn Keller <bryn.keller@intel.com>

Closes apache#638 from xoltar/SPARK-1108 and squashes the following commits:

7e94e7d [Bryn Keller] Import, comment, and format cleanup per code review
7cbcaa1 [Bryn Keller] For outputformats that are Configurable, call setConf before sending data to them. This allows us to use, e.g. HBase TableOutputFormat, which otherwise would throw NullPointerException because the output table name hasn't been configured
(cherry picked from commit 4d88030)

Conflicts:

	core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
	core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
  • Loading branch information
xoltar authored and pwendell committed Mar 10, 2014
1 parent 0f0d044 commit 6f0db0a
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
import scala.reflect.{ClassTag, classTag}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.CompressionCodec
Expand Down Expand Up @@ -620,6 +620,10 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
attemptNumber)
val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
val format = outputFormatClass.newInstance
format match {
case c: Configurable => c.setConf(wrappedConf.value)
case _ => ()
}
val committer = format.getOutputCommitter(hadoopContext)
committer.setupTask(hadoopContext)
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import scala.util.Random
import org.scalatest.FunSuite

import com.google.common.io.Files
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.conf.{Configuration, Configurable}

import org.apache.spark.SparkContext._
import org.apache.spark.{Partitioner, SharedSparkContext}

Expand Down Expand Up @@ -331,4 +334,77 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
(1, ArrayBuffer(1)),
(2, ArrayBuffer(1))))
}

test("saveNewAPIHadoopFile should call setConf if format is configurable") {
val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))

// No error, non-configurable formats still work
pairs.saveAsNewAPIHadoopFile[FakeFormat]("ignored")

/*
Check that configurable formats get configured:
ConfigTestFormat throws an exception if we try to write
to it when setConf hasn't been called first.
Assertion is in ConfigTestFormat.getRecordWriter.
*/
pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored")
}
}

/*
These classes are fakes for testing
"saveNewAPIHadoopFile should call setConf if format is configurable".
Unfortunately, they have to be top level classes, and not defined in
the test method, because otherwise Scala won't generate no-args constructors
and the test will therefore throw InstantiationException when saveAsNewAPIHadoopFile
tries to instantiate them with Class.newInstance.
*/
class FakeWriter extends RecordWriter[Integer, Integer] {

def close(p1: TaskAttemptContext) = ()

def write(p1: Integer, p2: Integer) = ()

}

class FakeCommitter extends OutputCommitter {
def setupJob(p1: JobContext) = ()

def needsTaskCommit(p1: TaskAttemptContext): Boolean = false

def setupTask(p1: TaskAttemptContext) = ()

def commitTask(p1: TaskAttemptContext) = ()

def abortTask(p1: TaskAttemptContext) = ()
}

class FakeFormat() extends OutputFormat[Integer, Integer]() {

def checkOutputSpecs(p1: JobContext) = ()

def getRecordWriter(p1: TaskAttemptContext): RecordWriter[Integer, Integer] = {
new FakeWriter()
}

def getOutputCommitter(p1: TaskAttemptContext): OutputCommitter = {
new FakeCommitter()
}
}

class ConfigTestFormat() extends FakeFormat() with Configurable {

var setConfCalled = false
def setConf(p1: Configuration) = {
setConfCalled = true
()
}

def getConf: Configuration = null

override def getRecordWriter(p1: TaskAttemptContext): RecordWriter[Integer, Integer] = {
assert(setConfCalled, "setConf was never called")
super.getRecordWriter(p1)
}
}

0 comments on commit 6f0db0a

Please sign in to comment.