Skip to content

Commit

Permalink
[SPARK-4964] test refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
koeninger committed Feb 11, 2015
1 parent 6f8680b commit f7151d4
Showing 1 changed file with 28 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,29 @@ import scala.util.Random
import kafka.serializer.StringDecoder
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import org.scalatest.BeforeAndAfter
import org.scalatest.BeforeAndAfterAll

import org.apache.spark._
import org.apache.spark.SparkContext._

class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfterAll {
val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
var sc: SparkContext = _
before {
override def beforeAll {
sc = new SparkContext(sparkConf)

setupKafka()
}

after {
override def afterAll {
if (sc != null) {
sc.stop
sc = null
}
tearDownKafka()
}

test("Kafka RDD basic usage") {
val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
sc = new SparkContext(sparkConf)
test("basic usage") {
val topic = "topicbasic"
createTopic(topic)
val messages = Set("the", "quick", "brown", "fox")
Expand All @@ -62,11 +63,8 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
assert(received === messages)
}

test("Kafka RDD integration") {
test("iterator boundary conditions") {
// the idea is to find e.g. off-by-one errors between what kafka has available and the rdd

val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
sc = new SparkContext(sparkConf)
val topic = "topic1"
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
createTopic(topic)
Expand Down Expand Up @@ -111,26 +109,29 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
// get an rdd from the committed consumer offsets until the latest leader offsets,
private def getRdd(kc: KafkaCluster, topics: Set[String]) = {
val groupId = kc.kafkaParams("group.id")
for {
topicPartitions <- kc.getPartitions(topics).right.toOption
from <- kc.getConsumerOffsets(groupId, topicPartitions).right.toOption.orElse(
def consumerOffsets(topicPartitions: Set[TopicAndPartition]) = {
kc.getConsumerOffsets(groupId, topicPartitions).right.toOption.orElse(
kc.getEarliestLeaderOffsets(topicPartitions).right.toOption.map { offs =>
offs.map(kv => kv._1 -> kv._2.offset)
}
)
until <- kc.getLatestLeaderOffsets(topicPartitions).right.toOption
} yield {
val leaders = until.map { case (tp, lo) =>
tp -> Broker(lo.host, lo.port)
}.toMap
val offsetRanges = from.map { case (tp, f) =>
val u = until(tp)
OffsetRange(tp.topic, tp.partition, f, u.offset)
}.toArray

KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder, String](
sc, kc.kafkaParams, offsetRanges, leaders,
(mmd: MessageAndMetadata[String, String]) => s"${mmd.offset} ${mmd.message}")
}
kc.getPartitions(topics).right.toOption.flatMap { topicPartitions =>
consumerOffsets(topicPartitions).flatMap { from =>
kc.getLatestLeaderOffsets(topicPartitions).right.toOption.map { until =>
val offsetRanges = from.map { case (tp: TopicAndPartition, fromOffset: Long) =>
OffsetRange(tp.topic, tp.partition, fromOffset, until(tp).offset)
}.toArray

val leaders = until.map { case (tp: TopicAndPartition, lo: KafkaCluster.LeaderOffset) =>
tp -> Broker(lo.host, lo.port)
}.toMap

KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder, String](
sc, kc.kafkaParams, offsetRanges, leaders,
(mmd: MessageAndMetadata[String, String]) => s"${mmd.offset} ${mmd.message}")
}
}
}
}
}

0 comments on commit f7151d4

Please sign in to comment.