From f7151d4e80c9bfe72d0240902d448df4412dc172 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Tue, 10 Feb 2015 21:22:16 -0600 Subject: [PATCH] [SPARK-4964] test refactoring --- .../spark/streaming/kafka/KafkaRDDSuite.scala | 55 ++++++++++--------- 1 file changed, 28 insertions(+), 27 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala index 0597456669e71..a223da70b043f 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala @@ -22,18 +22,21 @@ import scala.util.Random import kafka.serializer.StringDecoder import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata -import org.scalatest.BeforeAndAfter +import org.scalatest.BeforeAndAfterAll import org.apache.spark._ import org.apache.spark.SparkContext._ -class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter { +class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfterAll { + val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName) var sc: SparkContext = _ - before { + override def beforeAll { + sc = new SparkContext(sparkConf) + setupKafka() } - after { + override def afterAll { if (sc != null) { sc.stop sc = null @@ -41,9 +44,7 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter { tearDownKafka() } - test("Kafka RDD basic usage") { - val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName) - sc = new SparkContext(sparkConf) + test("basic usage") { val topic = "topicbasic" createTopic(topic) val messages = Set("the", "quick", "brown", "fox") @@ -62,11 +63,8 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter { assert(received === messages) } - test("Kafka RDD integration") { + test("iterator boundary conditions") { // the idea is to find e.g. off-by-one errors between what kafka has available and the rdd - - val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName) - sc = new SparkContext(sparkConf) val topic = "topic1" val sent = Map("a" -> 5, "b" -> 3, "c" -> 10) createTopic(topic) @@ -111,26 +109,29 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter { // get an rdd from the committed consumer offsets until the latest leader offsets, private def getRdd(kc: KafkaCluster, topics: Set[String]) = { val groupId = kc.kafkaParams("group.id") - for { - topicPartitions <- kc.getPartitions(topics).right.toOption - from <- kc.getConsumerOffsets(groupId, topicPartitions).right.toOption.orElse( + def consumerOffsets(topicPartitions: Set[TopicAndPartition]) = { + kc.getConsumerOffsets(groupId, topicPartitions).right.toOption.orElse( kc.getEarliestLeaderOffsets(topicPartitions).right.toOption.map { offs => offs.map(kv => kv._1 -> kv._2.offset) } ) - until <- kc.getLatestLeaderOffsets(topicPartitions).right.toOption - } yield { - val leaders = until.map { case (tp, lo) => - tp -> Broker(lo.host, lo.port) - }.toMap - val offsetRanges = from.map { case (tp, f) => - val u = until(tp) - OffsetRange(tp.topic, tp.partition, f, u.offset) - }.toArray - - KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder, String]( - sc, kc.kafkaParams, offsetRanges, leaders, - (mmd: MessageAndMetadata[String, String]) => s"${mmd.offset} ${mmd.message}") + } + kc.getPartitions(topics).right.toOption.flatMap { topicPartitions => + consumerOffsets(topicPartitions).flatMap { from => + kc.getLatestLeaderOffsets(topicPartitions).right.toOption.map { until => + val offsetRanges = from.map { case (tp: TopicAndPartition, fromOffset: Long) => + OffsetRange(tp.topic, tp.partition, fromOffset, until(tp).offset) + }.toArray + + val leaders = until.map { case (tp: TopicAndPartition, lo: KafkaCluster.LeaderOffset) => + tp -> Broker(lo.host, lo.port) + }.toMap + + KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder, String]( + sc, kc.kafkaParams, offsetRanges, leaders, + (mmd: MessageAndMetadata[String, String]) => s"${mmd.offset} ${mmd.message}") + } + } } } }