Skip to content

Commit

Permalink
Removing redundant accumulator in unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
mccheah committed Jan 22, 2015
1 parent c9decc6 commit 6b543ba
Showing 1 changed file with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class OutputCommitCoordinatorSuite
/**
* Function that constructs a SparkHadoopWriter with a mock committer and runs its commit
*/
private class OutputCommittingFunctionWithAccumulator(var accum: Accumulator[Int])
private class OutputCommittingFunctionWithAccumulator
extends ((TaskContext, Iterator[Int]) => Int) with Serializable {

def apply(ctxt: TaskContext, it: Iterator[Int]): Int = {
Expand Down Expand Up @@ -150,8 +150,8 @@ class OutputCommitCoordinatorSuite
/**
* Function that will explicitly fail to commit on the first attempt
*/
private class FailFirstTimeCommittingFunctionWithAccumulator(accum: Accumulator[Int])
extends OutputCommittingFunctionWithAccumulator(accum) {
private class FailFirstTimeCommittingFunctionWithAccumulator
extends OutputCommittingFunctionWithAccumulator {
override def apply(ctxt: TaskContext, it: Iterator[Int]): Int = {
if (ctxt.attemptNumber == 0) {
val outputCommitter = new FakeOutputCommitter {
Expand All @@ -168,13 +168,13 @@ class OutputCommitCoordinatorSuite

test("Only one of two duplicate commit tasks should commit") {
val rdd = sc.parallelize(1 to 10, 10)
sc.runJob(rdd, new OutputCommittingFunctionWithAccumulator(accum))
sc.runJob(rdd, new OutputCommittingFunctionWithAccumulator)
assert(accum.value === 10)
}

test("If commit fails, if task is retried it should not be locked, and will succeed.") {
val rdd = sc.parallelize(Seq(1), 1)
sc.runJob(rdd, new FailFirstTimeCommittingFunctionWithAccumulator(accum))
sc.runJob(rdd, new FailFirstTimeCommittingFunctionWithAccumulator)
assert(accum.value == 1)
}
}

0 comments on commit 6b543ba

Please sign in to comment.