Skip to content

Commit

Permalink
[SPARK-4964] add test of the scala api for KafkaUtils.createRDD
Browse files Browse the repository at this point in the history
  • Loading branch information
koeninger committed Feb 10, 2015
1 parent f81e016 commit 6f8680b
Showing 1 changed file with 55 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.util.Random

import kafka.serializer.StringDecoder
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import org.scalatest.BeforeAndAfter

import org.apache.spark._
Expand All @@ -40,43 +41,70 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
tearDownKafka()
}

test("Kafka RDD") {
test("Kafka RDD basic usage") {
val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
sc = new SparkContext(sparkConf)
val topic = "topicbasic"
createTopic(topic)
val messages = Set("the", "quick", "brown", "fox")
sendMessages(topic, messages.toArray)


val kafkaParams = Map("metadata.broker.list" -> brokerAddress,
"group.id" -> s"test-consumer-${Random.nextInt(10000)}")

val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size))

val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
sc, kafkaParams, offsetRanges)

val received = rdd.map(_._2).collect.toSet
assert(received === messages)
}

test("Kafka RDD integration") {
// the idea is to find e.g. off-by-one errors between what kafka has available and the rdd

val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
sc = new SparkContext(sparkConf)
val topic = "topic1"
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
createTopic(topic)
sendMessages(topic, sent)

val kafkaParams = Map("metadata.broker.list" -> brokerAddress,
"group.id" -> s"test-consumer-${Random.nextInt(10000)}")

val kc = new KafkaCluster(kafkaParams)

val rdd = getRdd(kc, Set(topic))
// this is the "lots of messages" case
// make sure we get all of them
sendMessages(topic, sent)
// rdd defined from leaders after sending messages, should get the number sent
val rdd = getRdd(kc, Set(topic))

assert(rdd.isDefined)
assert(rdd.get.count === sent.values.sum)
assert(rdd.get.count === sent.values.sum, "didn't get all sent messages")

kc.setConsumerOffsets(
kafkaParams("group.id"),
rdd.get.offsetRanges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap)
val ranges = rdd.get.asInstanceOf[HasOffsetRanges]
.offsetRanges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap

kc.setConsumerOffsets(kafkaParams("group.id"), ranges)

val rdd2 = getRdd(kc, Set(topic))
val sent2 = Map("d" -> 1)
sendMessages(topic, sent2)
// this is the "0 messages" case
// make sure we dont get anything, since messages were sent after rdd was defined
val rdd2 = getRdd(kc, Set(topic))
// shouldn't get anything, since message is sent after rdd was defined
val sentOnlyOne = Map("d" -> 1)

sendMessages(topic, sentOnlyOne)
assert(rdd2.isDefined)
assert(rdd2.get.count === 0)
assert(rdd2.get.count === 0, "got messages when there shouldn't be any")

// this is the "exactly 1 message" case, namely the single message from sentOnlyOne above
val rdd3 = getRdd(kc, Set(topic))
// send lots of messages after rdd was defined, they shouldn't show up
sendMessages(topic, Map("extra" -> 22))
// this is the "exactly 1 message" case
// make sure we get exactly one message, despite there being lots more available

assert(rdd3.isDefined)
assert(rdd3.get.count === sent2.values.sum)
assert(rdd3.get.count === sentOnlyOne.values.sum, "didn't get exactly one message")

}

Expand All @@ -92,8 +120,17 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
)
until <- kc.getLatestLeaderOffsets(topicPartitions).right.toOption
} yield {
KafkaRDD[String, String, StringDecoder, StringDecoder, String](
sc, kc.kafkaParams, from, until, mmd => s"${mmd.offset} ${mmd.message}")
val leaders = until.map { case (tp, lo) =>
tp -> Broker(lo.host, lo.port)
}.toMap
val offsetRanges = from.map { case (tp, f) =>
val u = until(tp)
OffsetRange(tp.topic, tp.partition, f, u.offset)
}.toArray

KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder, String](
sc, kc.kafkaParams, offsetRanges, leaders,
(mmd: MessageAndMetadata[String, String]) => s"${mmd.offset} ${mmd.message}")
}
}
}

0 comments on commit 6f8680b

Please sign in to comment.