From 3d61f824ae5907e2d9fe0f2732e90b963046c311 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 6 Jan 2016 20:03:58 -0800 Subject: [PATCH 01/13] Ported KafkaSource from different branch --- external/kafka/pom.xml | 12 ++ .../spark/streaming/kafka/KafkaCluster.scala | 11 +- .../streaming/kafka/KafkaInputDStream.scala | 2 +- .../spark/streaming/kafka/KafkaRDD.scala | 10 +- .../spark/streaming/kafka/KafkaSource.scala | 169 ++++++++++++++++++ .../streaming/kafka/KafkaTestUtils.scala | 33 ++-- .../spark/streaming/kafka/KafkaUtils.scala | 12 +- .../streaming/kafka/KafkaSourceSuite.scala | 62 +++++++ .../sql/execution/streaming/Source.scala | 3 - 9 files changed, 280 insertions(+), 34 deletions(-) create mode 100644 external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSource.scala create mode 100644 external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceSuite.scala diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml index 5180ab6dbafbd..82581529b6075 100644 --- a/external/kafka/pom.xml +++ b/external/kafka/pom.xml @@ -41,6 +41,11 @@ ${project.version} provided + + org.apache.spark + spark-sql_${scala.binary.version} + ${project.version} + org.apache.spark spark-core_${scala.binary.version} @@ -48,6 +53,13 @@ test-jar test + + org.apache.spark + spark-sql_${scala.binary.version} + ${project.version} + test-jar + test + org.apache.kafka kafka_${scala.binary.version} diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala index c4e18d92eefa9..8465432c5850f 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala @@ -17,17 +17,14 @@ package org.apache.spark.streaming.kafka -import java.util.Properties - -import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer -import scala.util.Random import scala.util.control.NonFatal - +import scala.util.Random +import scala.collection.mutable.ArrayBuffer +import scala.collection.JavaConverters._ +import java.util.Properties import kafka.api._ import kafka.common.{ErrorMapping, OffsetAndMetadata, OffsetMetadataAndError, TopicAndPartition} import kafka.consumer.{ConsumerConfig, SimpleConsumer} - import org.apache.spark.SparkException /** diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala index 67f2360896b16..38730fecf332a 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala @@ -22,7 +22,7 @@ import java.util.Properties import scala.collection.Map import scala.reflect.{classTag, ClassTag} -import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream} +import kafka.consumer.{KafkaStream, Consumer, ConsumerConfig, ConsumerConnector} import kafka.serializer.Decoder import kafka.utils.VerifiableProperties diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala index 603be22818206..ea5f842c6cafe 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala @@ -20,6 +20,11 @@ package org.apache.spark.streaming.kafka import scala.collection.mutable.ArrayBuffer import scala.reflect.{classTag, ClassTag} +import org.apache.spark.{Logging, Partition, SparkContext, SparkException, TaskContext} +import org.apache.spark.partial.{PartialResult, BoundedDouble} +import org.apache.spark.rdd.RDD +import org.apache.spark.util.NextIterator + import kafka.api.{FetchRequestBuilder, FetchResponse} import kafka.common.{ErrorMapping, TopicAndPartition} import kafka.consumer.SimpleConsumer @@ -27,11 +32,6 @@ import kafka.message.{MessageAndMetadata, MessageAndOffset} import kafka.serializer.Decoder import kafka.utils.VerifiableProperties -import org.apache.spark.{Logging, Partition, SparkContext, SparkException, TaskContext} -import org.apache.spark.partial.{BoundedDouble, PartialResult} -import org.apache.spark.rdd.RDD -import org.apache.spark.util.NextIterator - /** * A batch-oriented interface for consuming from Kafka. * Starting and ending offsets are specified in advance, diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSource.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSource.scala new file mode 100644 index 0000000000000..d850462079a90 --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSource.scala @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import kafka.common.TopicAndPartition +import kafka.serializer._ + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Dataset, SQLContext} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.execution.streaming.{Batch, StreamingRelation, Offset, Source} +import org.apache.spark.sql.types.StructType + + + + + +private[kafka] +case class KafkaSourceOffset(offsets: Map[TopicAndPartition, Long]) extends Offset { + /** + * Returns a negative integer, zero, or a positive integer as this object is less than, equal to, + * or greater than the specified object. + */ + override def compareTo(other: Offset): Int = other match { + case KafkaSourceOffset(otherOffsets) => + val allTopicAndPartitions = (this.offsets.keySet ++ otherOffsets.keySet).toSeq + + val comparisons = allTopicAndPartitions.map { tp => + (this.offsets.get(tp), otherOffsets.get(tp)) match { + + case (Some(a), Some(b)) => + if (a < b) { + -1 + } else if (a > b) { + 1 + } else { + 0 + } + case (None, None) => 0 + case (None, _) => -1 + case (_, None) => 1 + } + } + val signs = comparisons.distinct + if (signs.size != 1) { + throw new IllegalArgumentException( + s"Invalid comparison between to sets of Kafka offets: $this <=> $other") + } + signs.head + case _ => + throw new IllegalArgumentException(s"Cannot compare $this <=> $other") + } + + override def toString(): String = offsets.toSeq.mkString("[", ", ", "]") +} + +private[kafka] object KafkaSourceOffset { + def fromOffset(offset: Offset): KafkaSourceOffset = { + offset match { + case o: KafkaSourceOffset => o + case _ => + throw new IllegalArgumentException( + s"Invalid conversion from offset of ${offset.getClass} to $getClass") + } + } +} + + +private[kafka] case class KafkaSource( + topics: Set[String], params: Map[String, String])(implicit sqlContext: SQLContext) extends Source { + + type OffsetMap = Map[TopicAndPartition, Long] + implicit private val encoder = ExpressionEncoder.tuple( + ExpressionEncoder[Array[Byte]](), ExpressionEncoder[Array[Byte]]()) + + @transient private val logicalPlan = StreamingRelation(this) + @transient private val kc = new KafkaCluster(params) + @transient private val topicAndPartitions = KafkaCluster.checkErrors(kc.getPartitions(topics)) + @transient private lazy val initialOffsets = getInitialOffsets() + + override def schema: StructType = encoder.schema + + /** + * Returns the next batch of data that is available after `start`, if any is available. + */ + override def getNextBatch(start: Option[Offset]): Option[Batch] = { + val latestOffset = getLatestOffsets() + val offsetRanges = getOffsetRanges( + start.map(KafkaSourceOffset.fromOffset(_).offsets), latestOffset) + + val kafkaParams = params + val encodingFunc = encoder.toRow _ + val sparkContext = sqlContext.sparkContext + + println("Creating DF with offset ranges: " + offsetRanges) + if (offsetRanges.nonEmpty) { + val rdd = KafkaUtils.createRDD[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder]( + sparkContext, kafkaParams, offsetRanges.toArray) + Some(new Batch(KafkaSourceOffset(latestOffset), sqlContext.createDataset(rdd).toDF)) + } else { + None + } + } + + def toDS(): Dataset[(Array[Byte], Array[Byte])] = { + toDF.as[(Array[Byte], Array[Byte])] + } + + def toDF(): DataFrame = { + new DataFrame(sqlContext, logicalPlan) + } + + private def getOffsetRanges(start: Option[OffsetMap], end: OffsetMap): Seq[OffsetRange] = { + // Get the offsets from which to start reading. If its none, find initial offsets to start from. + val fromOffsets = start.getOrElse { initialOffsets } + + // Get the latest offsets + val untilOffsets = getLatestOffsets() + + // Get all the partitions referenced in both sets of offsets + val allTopicAndPartitions = (fromOffsets.keySet ++ untilOffsets.keySet).toSeq + + // For each partition, figure out the non-empty ranges of offsets + allTopicAndPartitions.flatMap { tp => + (fromOffsets.get(tp), untilOffsets.get(tp)) match { + + // Data was read till fromOffset and needs to be read till untilOffset + case (Some(fromOffset), Some(untilOffset)) => + if (untilOffset > fromOffset) { + Some(OffsetRange(tp, fromOffset, untilOffset)) + } else None + + case _ => + None + } + } + } + + private def getLatestOffsets(): OffsetMap = { + val partitionLeaders = KafkaCluster.checkErrors(kc.findLeaders(topicAndPartitions)) + val leadersAndOffsets = KafkaCluster.checkErrors(kc.getLatestLeaderOffsets(topicAndPartitions)) + println("Getting offsets " + leadersAndOffsets) + leadersAndOffsets.map { x => (x._1, x._2.offset) } + } + + private def getInitialOffsets(): OffsetMap = { + if (params.get("auto.offset.reset").map(_.toLowerCase) == Some("smallest")) { + KafkaCluster.checkErrors(kc.getEarliestLeaderOffsets(topicAndPartitions)).mapValues(_.offset) + } else Map.empty + } + + override def toString(): String = s"KafkaSource[${topics.mkString(", ")}]" +} diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala index a76fa6671a4b0..6a765e54f0af8 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -20,8 +20,8 @@ package org.apache.spark.streaming.kafka import java.io.File import java.lang.{Integer => JInt} import java.net.InetSocketAddress +import java.util.concurrent.{TimeUnit, TimeoutException} import java.util.{Map => JMap, Properties} -import java.util.concurrent.TimeoutException import scala.annotation.tailrec import scala.collection.JavaConverters._ @@ -30,16 +30,16 @@ import scala.util.control.NonFatal import kafka.admin.AdminUtils import kafka.api.Request -import kafka.producer.{KeyedMessage, Producer, ProducerConfig} -import kafka.serializer.StringEncoder import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.{ZKStringSerializer, ZkUtils} import org.I0Itec.zkclient.ZkClient +import org.apache.kafka.clients.producer._ +import org.apache.kafka.common.serialization.StringSerializer import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} -import org.apache.spark.{Logging, SparkConf} import org.apache.spark.streaming.Time import org.apache.spark.util.Utils +import org.apache.spark.{Logging, SparkConf} /** * This is a helper class for Kafka test suites. This has the functionality to set up @@ -170,11 +170,19 @@ private[kafka] class KafkaTestUtils extends Logging { } /** Send the array of messages to the Kafka broker */ - def sendMessages(topic: String, messages: Array[String]): Unit = { - producer = new Producer[String, String](new ProducerConfig(producerConfiguration)) - producer.send(messages.map { new KeyedMessage[String, String](topic, _ ) }: _*) - producer.close() - producer = null + def sendMessages(topic: String, messages: Array[String]): Seq[RecordMetadata] = { + producer = new KafkaProducer[String, String](producerConfiguration) + val offsets = try { + messages.map { m => + producer.send(new ProducerRecord[String, String](topic, m)).get(10, TimeUnit.SECONDS) + } + } finally { + if (producer != null) { + producer.close() + producer = null + } + } + offsets } private def brokerConfiguration: Properties = { @@ -191,10 +199,11 @@ private[kafka] class KafkaTestUtils extends Logging { private def producerConfiguration: Properties = { val props = new Properties() - props.put("metadata.broker.list", brokerAddress) - props.put("serializer.class", classOf[StringEncoder].getName) + props.put("bootstrap.servers", brokerAddress) + props.put("value.serializer", classOf[StringSerializer].getName) + props.put("key.serializer", classOf[StringSerializer].getName) // wait for all in-sync replicas to ack sends - props.put("request.required.acks", "-1") + props.put("acks", "-1") props } diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index 0cb875c9758f9..2de1b3ca7ae09 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -27,19 +27,19 @@ import scala.reflect.ClassTag import com.google.common.base.Charsets.UTF_8 import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata -import kafka.serializer.{Decoder, DefaultDecoder, StringDecoder} -import net.razorvine.pickle.{IObjectPickler, Opcodes, Pickler} +import kafka.serializer.{DefaultDecoder, Decoder, StringDecoder} +import net.razorvine.pickle.{Opcodes, Pickler, IObjectPickler} -import org.apache.spark.{SparkContext, SparkException} -import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext} import org.apache.spark.api.java.function.{Function => JFunction} +import org.apache.spark.streaming.util.WriteAheadLogUtils +import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD} import org.apache.spark.api.python.SerDeUtil import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.api.java._ import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream} -import org.apache.spark.streaming.util.WriteAheadLogUtils object KafkaUtils { /** @@ -173,7 +173,7 @@ object KafkaUtils { } /** get leaders for the given offset ranges, or throw an exception */ - private def leadersForRanges( + private[spark] def leadersForRanges( kc: KafkaCluster, offsetRanges: Array[OffsetRange]): Map[TopicAndPartition, (String, Int)] = { val topics = offsetRanges.map(o => TopicAndPartition(o.topic, o.partition)).toSet diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceSuite.scala new file mode 100644 index 0000000000000..546cd41855228 --- /dev/null +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceSuite.scala @@ -0,0 +1,62 @@ +package org.apache.spark.streaming.kafka + +import kafka.common.TopicAndPartition +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.sql.StreamTest +import org.apache.spark.sql.execution.streaming.{Offset, Source} +import org.apache.spark.sql.test.SharedSQLContext + + +class KafkaSourceSuite extends StreamTest with SharedSQLContext { + + import testImplicits._ + + private var testUtils: KafkaTestUtils = _ + private var kafkaParams: Map[String, String] = _ + + override val streamingTimout = 30.seconds + + case class AddKafkaData(kafkaSource: KafkaSource, topic: String, data: Int*) extends AddData { + override def addData(): Offset = { + val sentMetadata = testUtils.sendMessages(topic, data.map{ _.toString}.toArray) + val lastMetadata = sentMetadata.maxBy(_.offset) + KafkaSourceOffset( + Map(TopicAndPartition(topic, lastMetadata.partition) -> lastMetadata.offset)) + } + + override def source: Source = kafkaSource + } + + override def beforeAll(): Unit = { + super.beforeAll() + testUtils = new KafkaTestUtils + testUtils.setup() + kafkaParams = Map( + "metadata.broker.list" -> testUtils.brokerAddress, + "auto.offset.reset" -> "smallest" + ) + } + + override def afterAll(): Unit = { + if (testUtils != null) { + testUtils.teardown() + testUtils = null + super.afterAll() + } + } + + test("basic receiving") { + val topic = "topic1" + testUtils.createTopic(topic) + + val kafkaSource = KafkaSource(Set(topic), kafkaParams) + val mapped = kafkaSource.toDS().map[Int]((kv: (Array[Byte], Array[Byte])) => + new String(kv._2).toInt + 1) + + testStream(mapped)( + AddKafkaData(kafkaSource, topic, 1, 2, 3), + CheckAnswer(2, 3, 4) + ) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala index 456ed0a174aa8..25922979ac83e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala @@ -17,9 +17,6 @@ package org.apache.spark.sql.execution.streaming -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types.StructType /** From 5fa1a20a12ff1c2c30fb56f6136c5b5954899d94 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 8 Jan 2016 01:43:14 -0800 Subject: [PATCH 02/13] Fixed race condition, and updated tests --- .../spark/streaming/kafka/KafkaSource.scala | 27 +++++----- .../streaming/kafka/KafkaSourceSuite.scala | 51 ++++++++++++++----- .../execution/streaming/StreamExecution.scala | 2 +- .../execution/streaming/StreamProgress.scala | 3 +- 4 files changed, 54 insertions(+), 29 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSource.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSource.scala index d850462079a90..9e942580f2f5a 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSource.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSource.scala @@ -20,15 +20,11 @@ package org.apache.spark.streaming.kafka import kafka.common.TopicAndPartition import kafka.serializer._ -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, Dataset, SQLContext} -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.Logging import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.execution.streaming.{Batch, StreamingRelation, Offset, Source} +import org.apache.spark.sql.execution.streaming.{Batch, Offset, Source, StreamingRelation} import org.apache.spark.sql.types.StructType - - - +import org.apache.spark.sql.{DataFrame, Dataset, SQLContext} private[kafka] @@ -83,16 +79,18 @@ private[kafka] object KafkaSourceOffset { private[kafka] case class KafkaSource( - topics: Set[String], params: Map[String, String])(implicit sqlContext: SQLContext) extends Source { + topics: Set[String], + params: Map[String, String])(implicit sqlContext: SQLContext) extends Source with Logging { type OffsetMap = Map[TopicAndPartition, Long] + implicit private val encoder = ExpressionEncoder.tuple( ExpressionEncoder[Array[Byte]](), ExpressionEncoder[Array[Byte]]()) @transient private val logicalPlan = StreamingRelation(this) @transient private val kc = new KafkaCluster(params) @transient private val topicAndPartitions = KafkaCluster.checkErrors(kc.getPartitions(topics)) - @transient private lazy val initialOffsets = getInitialOffsets() + @transient private[kafka] val initialOffsets = getInitialOffsets() override def schema: StructType = encoder.schema @@ -101,6 +99,8 @@ private[kafka] case class KafkaSource( */ override def getNextBatch(start: Option[Offset]): Option[Batch] = { val latestOffset = getLatestOffsets() + logDebug(s"Latest offset: ${KafkaSourceOffset(latestOffset)}") + val offsetRanges = getOffsetRanges( start.map(KafkaSourceOffset.fromOffset(_).offsets), latestOffset) @@ -108,10 +108,10 @@ private[kafka] case class KafkaSource( val encodingFunc = encoder.toRow _ val sparkContext = sqlContext.sparkContext - println("Creating DF with offset ranges: " + offsetRanges) if (offsetRanges.nonEmpty) { val rdd = KafkaUtils.createRDD[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder]( sparkContext, kafkaParams, offsetRanges.toArray) + logInfo("Creating DF with offset ranges: " + offsetRanges) Some(new Batch(KafkaSourceOffset(latestOffset), sqlContext.createDataset(rdd).toDF)) } else { None @@ -131,7 +131,7 @@ private[kafka] case class KafkaSource( val fromOffsets = start.getOrElse { initialOffsets } // Get the latest offsets - val untilOffsets = getLatestOffsets() + val untilOffsets = end // Get all the partitions referenced in both sets of offsets val allTopicAndPartitions = (fromOffsets.keySet ++ untilOffsets.keySet).toSeq @@ -155,14 +155,15 @@ private[kafka] case class KafkaSource( private def getLatestOffsets(): OffsetMap = { val partitionLeaders = KafkaCluster.checkErrors(kc.findLeaders(topicAndPartitions)) val leadersAndOffsets = KafkaCluster.checkErrors(kc.getLatestLeaderOffsets(topicAndPartitions)) - println("Getting offsets " + leadersAndOffsets) leadersAndOffsets.map { x => (x._1, x._2.offset) } } private def getInitialOffsets(): OffsetMap = { if (params.get("auto.offset.reset").map(_.toLowerCase) == Some("smallest")) { KafkaCluster.checkErrors(kc.getEarliestLeaderOffsets(topicAndPartitions)).mapValues(_.offset) - } else Map.empty + } else { + getLatestOffsets() + } } override def toString(): String = s"KafkaSource[${topics.mkString(", ")}]" diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceSuite.scala index 546cd41855228..ac9e294803474 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceSuite.scala @@ -21,8 +21,12 @@ class KafkaSourceSuite extends StreamTest with SharedSQLContext { override def addData(): Offset = { val sentMetadata = testUtils.sendMessages(topic, data.map{ _.toString}.toArray) val lastMetadata = sentMetadata.maxBy(_.offset) - KafkaSourceOffset( - Map(TopicAndPartition(topic, lastMetadata.partition) -> lastMetadata.offset)) + + // Expected offset to ensure this data is read is last offset of this data + 1 + val offset = KafkaSourceOffset( + Map(TopicAndPartition(topic, lastMetadata.partition) -> (lastMetadata.offset + 1))) + logInfo(s"Added data, expected offset $offset") + offset } override def source: Source = kafkaSource @@ -32,10 +36,7 @@ class KafkaSourceSuite extends StreamTest with SharedSQLContext { super.beforeAll() testUtils = new KafkaTestUtils testUtils.setup() - kafkaParams = Map( - "metadata.broker.list" -> testUtils.brokerAddress, - "auto.offset.reset" -> "smallest" - ) + kafkaParams = Map("metadata.broker.list" -> testUtils.brokerAddress) } override def afterAll(): Unit = { @@ -46,17 +47,39 @@ class KafkaSourceSuite extends StreamTest with SharedSQLContext { } } - test("basic receiving") { + test("basic receiving from latest offset with 1 topic and 1 partition") { val topic = "topic1" testUtils.createTopic(topic) - val kafkaSource = KafkaSource(Set(topic), kafkaParams) - val mapped = kafkaSource.toDS().map[Int]((kv: (Array[Byte], Array[Byte])) => - new String(kv._2).toInt + 1) - testStream(mapped)( - AddKafkaData(kafkaSource, topic, 1, 2, 3), - CheckAnswer(2, 3, 4) - ) + // Add data in multiple rounds to the same topic and test whether + for (i <- 0 until 5) { + logInfo(s"Round $i") + + // Create Kafka source that reads from latest offset + val kafkaSource = KafkaSource(Set(topic), kafkaParams) + + val mapped = + kafkaSource + .toDS() + .map[Int]((kv: (Array[Byte], Array[Byte])) => new String(kv._2).toInt + 1) + + logInfo(s"Initial offsets: ${kafkaSource.initialOffsets}") + + testStream(mapped)( + AddKafkaData(kafkaSource, topic, 1, 2, 3), + CheckAnswer(2, 3, 4), + StopStream , + DropBatches(1), // Lose last batch in the sink + StartStream, + CheckAnswer(2, 3, 4), // Should get the data back on recovery + StopStream, + AddKafkaData(kafkaSource, topic, 4, 5, 6), // Add data when stream is stopped + StartStream, + CheckAnswer(2, 3, 4, 5, 6, 7), // Should get the added data + AddKafkaData(kafkaSource, topic, 7), + CheckAnswer(2, 3, 4, 5, 6, 7, 8) + ) + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index acad1c1bf1b1e..e99bfa431ca9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -38,7 +38,7 @@ class StreamExecution( val sink: Sink) extends Logging { /** Minimum amount of time in between the start of each batch. */ - val minBatchTime = 10 + val minBatchTime = 100 /** Tracks how much data we have processed from each input source. */ private[sql] val currentOffsets = new StreamProgress diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala index 08fdb53a5ed1a..01c4a262570db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala @@ -27,7 +27,8 @@ class StreamProgress extends Serializable { with mutable.SynchronizedMap[Source, Offset] private[streaming] def update(source: Source, newOffset: Offset): Unit = { - currentOffsets.get(source).foreach(old => assert(newOffset > old)) + currentOffsets.get(source).foreach(old => + assert(newOffset > old, s"New offset $newOffset not more than older offset $old ")) currentOffsets.put(source, newOffset) } From 426cd7a09fa2257b608a0f1c6e08c3cef30fa4cf Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 8 Jan 2016 13:35:48 -0800 Subject: [PATCH 03/13] Added license --- .../streaming/kafka/KafkaSourceSuite.scala | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceSuite.scala index ac9e294803474..538324ff6b067 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceSuite.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.streaming.kafka import kafka.common.TopicAndPartition @@ -82,4 +99,8 @@ class KafkaSourceSuite extends StreamTest with SharedSQLContext { ) } } + + test("KafkaSourceOffset comparison") { + + } } From 4079217d63bdb21882ea74e6ecb04651e59fd038 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 8 Jan 2016 20:46:28 -0800 Subject: [PATCH 04/13] Fixed offset comparison bugs --- .../spark/streaming/kafka/KafkaSource.scala | 124 ++++++++++-------- .../streaming/kafka/KafkaTestUtils.scala | 16 ++- .../streaming/kafka/KafkaSourceSuite.scala | 71 ++++++++-- .../execution/streaming/CompositeOffset.scala | 23 +++- .../sql/execution/streaming/Offset.scala | 2 + ...ongOffsetSuite.scala => OffsetSuite.scala} | 56 +++++++- 6 files changed, 212 insertions(+), 80 deletions(-) rename sql/core/src/test/scala/org/apache/spark/sql/streaming/{LongOffsetSuite.scala => OffsetSuite.scala} (52%) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSource.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSource.scala index 9e942580f2f5a..685283b0771c3 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSource.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSource.scala @@ -17,6 +17,8 @@ package org.apache.spark.streaming.kafka +import scala.util.Try + import kafka.common.TopicAndPartition import kafka.serializer._ @@ -27,6 +29,7 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, Dataset, SQLContext} +/** An [[Offset]] for the [[KafkaSource]]. */ private[kafka] case class KafkaSourceOffset(offsets: Map[TopicAndPartition, Long]) extends Offset { /** @@ -39,7 +42,6 @@ case class KafkaSourceOffset(offsets: Map[TopicAndPartition, Long]) extends Offs val comparisons = allTopicAndPartitions.map { tp => (this.offsets.get(tp), otherOffsets.get(tp)) match { - case (Some(a), Some(b)) => if (a < b) { -1 @@ -48,36 +50,77 @@ case class KafkaSourceOffset(offsets: Map[TopicAndPartition, Long]) extends Offs } else { 0 } - case (None, None) => 0 - case (None, _) => -1 - case (_, None) => 1 + case (None, _) => + -1 + case (_, None) => + 1 + case _ => + throw new IllegalArgumentException( + s"Invalid comparison between two sets of Kafka offsets: $this <=> $other") } } - val signs = comparisons.distinct - if (signs.size != 1) { - throw new IllegalArgumentException( - s"Invalid comparison between to sets of Kafka offets: $this <=> $other") + val nonZeroSigns = comparisons.filter { _ != 0 }.toSet + nonZeroSigns.size match { + case 0 => 0 // if both empty or only 0s + case 1 => nonZeroSigns.head // if there are only (0s and 1s) or (0s and -1s) + case _ => // there are both 1s and -1s + throw new IllegalArgumentException( + s"Invalid comparison between non-linear histories: $this <=> $other") } - signs.head + case _ => throw new IllegalArgumentException(s"Cannot compare $this <=> $other") } - override def toString(): String = offsets.toSeq.mkString("[", ", ", "]") + override def ==(other: Offset): Boolean = Try(compareTo(other) == 0).getOrElse(false) + + /** Returns a set of offset ranges between `this` and `other` */ + def to(other: KafkaSourceOffset): Seq[OffsetRange] = { + // Get all the partitions referenced in both sets of offsets + val allTopicAndPartitions = (this.offsets.keySet ++ other.offsets.keySet).toSeq + + // For each partition, figure out the non-empty ranges of offsets + allTopicAndPartitions.flatMap { tp => + (this.offsets.get(tp), other.offsets.get(tp)) match { + + // Data was read till fromOffset and needs to be read till untilOffset + case (Some(fromOffset), Some(untilOffset)) => + if (untilOffset > fromOffset) { + Some(OffsetRange(tp, fromOffset, untilOffset)) + } else None + + case _ => + None + } + } + } + + override def toString(): String = { + offsets.toSeq.sortBy(_._1.topic).mkString("[", ", ", "]") + } } +/** Companion object of the [[KafkaSourceOffset]] */ private[kafka] object KafkaSourceOffset { - def fromOffset(offset: Offset): KafkaSourceOffset = { - offset match { - case o: KafkaSourceOffset => o - case _ => - throw new IllegalArgumentException( - s"Invalid conversion from offset of ${offset.getClass} to $getClass") + def from(offsetOption: Option[Offset]): Option[KafkaSourceOffset] = { + offsetOption.map { offset => + offset match { + case o: KafkaSourceOffset => o + case _ => + throw new IllegalArgumentException( + s"Invalid conversion from offset of ${offset.getClass} to $getClass") + } } } -} + def apply(data: (String, Int, Long)*): KafkaSourceOffset = { + val map = data.map { case (topic, partition, offset) => + TopicAndPartition(topic, partition) -> offset }.toMap + KafkaSourceOffset(map) + } +} +/** A [[Source]] that reads data from Kafka */ private[kafka] case class KafkaSource( topics: Set[String], params: Map[String, String])(implicit sqlContext: SQLContext) extends Source with Logging { @@ -98,12 +141,11 @@ private[kafka] case class KafkaSource( * Returns the next batch of data that is available after `start`, if any is available. */ override def getNextBatch(start: Option[Offset]): Option[Batch] = { + val beginOffset: KafkaSourceOffset = KafkaSourceOffset.from(start).getOrElse(initialOffsets) val latestOffset = getLatestOffsets() - logDebug(s"Latest offset: ${KafkaSourceOffset(latestOffset)}") - - val offsetRanges = getOffsetRanges( - start.map(KafkaSourceOffset.fromOffset(_).offsets), latestOffset) + logDebug(s"Latest offset: $latestOffset") + val offsetRanges = beginOffset to latestOffset val kafkaParams = params val encodingFunc = encoder.toRow _ val sparkContext = sqlContext.sparkContext @@ -112,7 +154,7 @@ private[kafka] case class KafkaSource( val rdd = KafkaUtils.createRDD[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder]( sparkContext, kafkaParams, offsetRanges.toArray) logInfo("Creating DF with offset ranges: " + offsetRanges) - Some(new Batch(KafkaSourceOffset(latestOffset), sqlContext.createDataset(rdd).toDF)) + Some(new Batch(latestOffset, sqlContext.createDataset(rdd).toDF)) } else { None } @@ -126,41 +168,19 @@ private[kafka] case class KafkaSource( new DataFrame(sqlContext, logicalPlan) } - private def getOffsetRanges(start: Option[OffsetMap], end: OffsetMap): Seq[OffsetRange] = { - // Get the offsets from which to start reading. If its none, find initial offsets to start from. - val fromOffsets = start.getOrElse { initialOffsets } - - // Get the latest offsets - val untilOffsets = end - - // Get all the partitions referenced in both sets of offsets - val allTopicAndPartitions = (fromOffsets.keySet ++ untilOffsets.keySet).toSeq - - // For each partition, figure out the non-empty ranges of offsets - allTopicAndPartitions.flatMap { tp => - (fromOffsets.get(tp), untilOffsets.get(tp)) match { - - // Data was read till fromOffset and needs to be read till untilOffset - case (Some(fromOffset), Some(untilOffset)) => - if (untilOffset > fromOffset) { - Some(OffsetRange(tp, fromOffset, untilOffset)) - } else None - - case _ => - None - } - } - } - - private def getLatestOffsets(): OffsetMap = { + /** Get latest offsets from Kafka. */ + private def getLatestOffsets(): KafkaSourceOffset = { val partitionLeaders = KafkaCluster.checkErrors(kc.findLeaders(topicAndPartitions)) val leadersAndOffsets = KafkaCluster.checkErrors(kc.getLatestLeaderOffsets(topicAndPartitions)) - leadersAndOffsets.map { x => (x._1, x._2.offset) } + KafkaSourceOffset(leadersAndOffsets.map { x => (x._1, x._2.offset) }) } - private def getInitialOffsets(): OffsetMap = { + /** Get the initial offsets from Kafka for the source to start from. */ + private def getInitialOffsets(): KafkaSourceOffset = { if (params.get("auto.offset.reset").map(_.toLowerCase) == Some("smallest")) { - KafkaCluster.checkErrors(kc.getEarliestLeaderOffsets(topicAndPartitions)).mapValues(_.offset) + val offsetMap = KafkaCluster.checkErrors( + kc.getEarliestLeaderOffsets(topicAndPartitions)).mapValues(_.offset) + KafkaSourceOffset(offsetMap) } else { getLatestOffsets() } diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala index 6a765e54f0af8..898b88977f87a 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -153,9 +153,15 @@ private[kafka] class KafkaTestUtils extends Logging { /** Create a Kafka topic and wait until it is propagated to the whole cluster */ def createTopic(topic: String): Unit = { - AdminUtils.createTopic(zkClient, topic, 1, 1) + createTopic(topic, 1) + } + + def createTopic(topic: String, partitions: Int): Unit = { + AdminUtils.createTopic(zkClient, topic, partitions, 1) // wait until metadata is propagated - waitUntilMetadataIsPropagated(topic, 0) + for (p <- 0 until partitions) { + waitUntilMetadataIsPropagated(topic, p) + } } /** Java-friendly function for sending messages to the Kafka broker */ @@ -170,11 +176,13 @@ private[kafka] class KafkaTestUtils extends Logging { } /** Send the array of messages to the Kafka broker */ - def sendMessages(topic: String, messages: Array[String]): Seq[RecordMetadata] = { + def sendMessages(topic: String, messages: Array[String]): Seq[(String, RecordMetadata)] = { producer = new KafkaProducer[String, String](producerConfiguration) val offsets = try { messages.map { m => - producer.send(new ProducerRecord[String, String](topic, m)).get(10, TimeUnit.SECONDS) + val metadata = + producer.send(new ProducerRecord[String, String](topic, m)).get(10, TimeUnit.SECONDS) + (m, metadata) } } finally { if (producer != null) { diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceSuite.scala index 538324ff6b067..cc7e14d85448d 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceSuite.scala @@ -17,31 +17,49 @@ package org.apache.spark.streaming.kafka +import scala.util.Try + import kafka.common.TopicAndPartition +import org.apache.kafka.clients.producer.RecordMetadata import org.scalatest.time.SpanSugar._ import org.apache.spark.sql.StreamTest import org.apache.spark.sql.execution.streaming.{Offset, Source} +import org.apache.spark.sql.streaming.OffsetSuite import org.apache.spark.sql.test.SharedSQLContext -class KafkaSourceSuite extends StreamTest with SharedSQLContext { +class KafkaSourceSuite extends StreamTest with SharedSQLContext with OffsetSuite { import testImplicits._ private var testUtils: KafkaTestUtils = _ private var kafkaParams: Map[String, String] = _ - override val streamingTimout = 30.seconds + override val streamingTimout = 10.seconds + + case class AddKafkaData( + kafkaSource: KafkaSource, + topic: String, data: Int*)(implicit multiPartitionCheck: Boolean = true) extends AddData { - case class AddKafkaData(kafkaSource: KafkaSource, topic: String, data: Int*) extends AddData { override def addData(): Offset = { val sentMetadata = testUtils.sendMessages(topic, data.map{ _.toString}.toArray) - val lastMetadata = sentMetadata.maxBy(_.offset) + val latestOffsetMap = sentMetadata + .groupBy { _._2.partition } + .map { case (partition, msgAndMetadata) => + val maxOffsetInPartition = msgAndMetadata.map(_._2).maxBy(_.offset).offset() + (TopicAndPartition(topic, partition), maxOffsetInPartition + 1) + } + + def metadataToStr(m: (String, RecordMetadata)): String = { + s"Sent ${m._1} to partition ${m._2.partition()}, offset ${m._2.offset()}" + } + + assert(latestOffsetMap.size > 1, + s"Added data does not test multiple partitions: " + sentMetadata.map(metadataToStr)) // Expected offset to ensure this data is read is last offset of this data + 1 - val offset = KafkaSourceOffset( - Map(TopicAndPartition(topic, lastMetadata.partition) -> (lastMetadata.offset + 1))) + val offset = KafkaSourceOffset(latestOffsetMap) logInfo(s"Added data, expected offset $offset") offset } @@ -64,10 +82,9 @@ class KafkaSourceSuite extends StreamTest with SharedSQLContext { } } - test("basic receiving from latest offset with 1 topic and 1 partition") { + test("basic receiving from latest offset with 1 topic and 2 partitions") { val topic = "topic1" - testUtils.createTopic(topic) - + testUtils.createTopic(topic, partitions = 2) // Add data in multiple rounds to the same topic and test whether for (i <- 0 until 5) { @@ -94,13 +111,39 @@ class KafkaSourceSuite extends StreamTest with SharedSQLContext { AddKafkaData(kafkaSource, topic, 4, 5, 6), // Add data when stream is stopped StartStream, CheckAnswer(2, 3, 4, 5, 6, 7), // Should get the added data - AddKafkaData(kafkaSource, topic, 7), - CheckAnswer(2, 3, 4, 5, 6, 7, 8) + AddKafkaData(kafkaSource, topic, 7, 8), + CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9) ) } } - test("KafkaSourceOffset comparison") { - - } + compare( + one = KafkaSourceOffset(("t", 0, 1L)), + two = KafkaSourceOffset(("t", 0, 2L)) + ) + + compare( + one = KafkaSourceOffset(("t", 0, 1L), ("t", 1, 0L)), + two = KafkaSourceOffset(("t", 0, 2L), ("t", 1, 1L)) + ) + + compare( + one = KafkaSourceOffset(("t", 0, 1L), ("T", 0, 0L)), + two = KafkaSourceOffset(("t", 0, 2L), ("T", 0, 1L)) + ) + + compare( + one = KafkaSourceOffset(("t", 0, 1L)), + two = KafkaSourceOffset(("t", 0, 2L), ("t", 1, 1L)) + ) + + compareInvalid( + one = KafkaSourceOffset(("t", 1, 1L)), + two = KafkaSourceOffset(("t", 0, 2L)) + ) + + compareInvalid( + one = KafkaSourceOffset(("t", 0, 1L)), + two = KafkaSourceOffset(("T", 0, 2L)) + ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala index 9babcff5adb1e..737909d94a1cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.streaming +import scala.util.Try + /** * An ordered collection of offsets, used to track the progress of processing data from one or more * [[Source]]s that are present in a streaming query. This is similar to simplified, single-instance @@ -35,19 +37,30 @@ case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset { case (None, _) => -1 case (_, None) => 1 } - val signs = comparisons.map(sign).distinct - if (signs.size != 1) { - throw new IllegalArgumentException( - s"Invalid comparison between non-linear histories: $this <=> $other") + val nonZeroSigns = comparisons.map(sign).filter { _ != 0}.toSet + nonZeroSigns.size match { + case 0 => 0 // if both empty or only 0s + case 1 => nonZeroSigns.head // if there are only (0s and 1s) or (0s and -1s) + case _ => // there are both 1s and -1s + throw new IllegalArgumentException( + s"Invalid comparison between non-linear histories: $this <=> $other") } - signs.head case _ => throw new IllegalArgumentException(s"Cannot compare $this <=> $other") } + override def ==(other: Offset): Boolean = Try(compareTo(other) == 0).getOrElse(false) + private def sign(num: Int): Int = num match { case i if i < 0 => -1 case i if i == 0 => 0 case i if i > 0 => 1 } } + +object CompositeOffset { + + def fill(offsets: Offset*): CompositeOffset = { + CompositeOffset(offsets.map(Option(_))) + } +} \ No newline at end of file diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala index 704a027c12b8d..a1d927d780bd9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.streaming +import scala.util.Try + /** * A offset is a monotonically increasing metric used to track progress in the computation of a * stream. An [[Offset]] must be comparable. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/LongOffsetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala similarity index 52% rename from sql/core/src/test/scala/org/apache/spark/sql/streaming/LongOffsetSuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala index 0e1a90ab56e33..e2ae5bdbf6c37 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/LongOffsetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala @@ -33,22 +33,68 @@ trait OffsetSuite extends SparkFunSuite { assert(two >= one) assert(one >= one) assert(one == one) + assert(two == two) + assert(one != two) + assert(two != one) + } + } + + /** Creates test to check that non-equality comparisons throw exception. */ + def compareInvalid(one: Offset, two: Offset): Unit = { + test(s"invalid comparison $one <=> $two") { + intercept[IllegalArgumentException] { + assert(one < two) + } + + intercept[IllegalArgumentException] { + assert(one <= two) + } + + intercept[IllegalArgumentException] { + assert(one > two) + } + + intercept[IllegalArgumentException] { + assert(one >= two) + } + + assert(!(one == two)) + assert(!(two == one)) + assert(one != two) + assert(two != one) } } } class LongOffsetSuite extends OffsetSuite { - val one = new LongOffset(1) - val two = new LongOffset(2) + val one = LongOffset(1) + val two = LongOffset(2) compare(one, two) } class CompositeOffsetSuite extends OffsetSuite { compare( - one = CompositeOffset(Some(new LongOffset(1)) :: Nil), - two = CompositeOffset(Some(new LongOffset(2)) :: Nil)) + one = CompositeOffset(Some(LongOffset(1)) :: Nil), + two = CompositeOffset(Some(LongOffset(2)) :: Nil)) compare( one = CompositeOffset(None :: Nil), - two = CompositeOffset(Some(new LongOffset(2)) :: Nil)) + two = CompositeOffset(Some(LongOffset(2)) :: Nil)) + + compareInvalid( + one = CompositeOffset(Nil), // sizes must be same + two = CompositeOffset(Some(LongOffset(2)) :: Nil)) + + compare( + one = CompositeOffset.fill(LongOffset(0), LongOffset(1)), + two = CompositeOffset.fill(LongOffset(1), LongOffset(2))) + + compare( + one = CompositeOffset.fill(LongOffset(1), LongOffset(1)), + two = CompositeOffset.fill(LongOffset(1), LongOffset(2))) + + compareInvalid( + one = CompositeOffset.fill(LongOffset(2), LongOffset(1)), + two = CompositeOffset.fill(LongOffset(1), LongOffset(2))) } + From 29900398cf892842bfc13f4d4fcc3b32f3d43e23 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 8 Jan 2016 20:50:58 -0800 Subject: [PATCH 05/13] formatting change --- .../streaming/kafka/KafkaSourceSuite.scala | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceSuite.scala index cc7e14d85448d..fc4925e07f565 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceSuite.scala @@ -119,31 +119,25 @@ class KafkaSourceSuite extends StreamTest with SharedSQLContext with OffsetSuite compare( one = KafkaSourceOffset(("t", 0, 1L)), - two = KafkaSourceOffset(("t", 0, 2L)) - ) + two = KafkaSourceOffset(("t", 0, 2L))) compare( one = KafkaSourceOffset(("t", 0, 1L), ("t", 1, 0L)), - two = KafkaSourceOffset(("t", 0, 2L), ("t", 1, 1L)) - ) + two = KafkaSourceOffset(("t", 0, 2L), ("t", 1, 1L))) compare( one = KafkaSourceOffset(("t", 0, 1L), ("T", 0, 0L)), - two = KafkaSourceOffset(("t", 0, 2L), ("T", 0, 1L)) - ) + two = KafkaSourceOffset(("t", 0, 2L), ("T", 0, 1L))) compare( one = KafkaSourceOffset(("t", 0, 1L)), - two = KafkaSourceOffset(("t", 0, 2L), ("t", 1, 1L)) - ) + two = KafkaSourceOffset(("t", 0, 2L), ("t", 1, 1L))) compareInvalid( one = KafkaSourceOffset(("t", 1, 1L)), - two = KafkaSourceOffset(("t", 0, 2L)) - ) + two = KafkaSourceOffset(("t", 0, 2L))) compareInvalid( one = KafkaSourceOffset(("t", 0, 1L)), - two = KafkaSourceOffset(("T", 0, 2L)) - ) + two = KafkaSourceOffset(("T", 0, 2L))) } From c4e284993dcceea8e1066e3c235da9e70bb90b58 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 11 Jan 2016 11:33:06 -0800 Subject: [PATCH 06/13] Fix style --- .../apache/spark/sql/execution/streaming/CompositeOffset.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala index 737909d94a1cd..2d8acae34f922 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala @@ -63,4 +63,4 @@ object CompositeOffset { def fill(offsets: Offset*): CompositeOffset = { CompositeOffset(offsets.map(Option(_))) } -} \ No newline at end of file +} From 2767a0628f09886b28eadb61ee58e9e9746b64d5 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 11 Jan 2016 12:05:40 -0800 Subject: [PATCH 07/13] Added more docs --- .../spark/streaming/kafka/KafkaSource.scala | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSource.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSource.scala index 685283b0771c3..0890eb79981ab 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSource.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSource.scala @@ -76,6 +76,7 @@ case class KafkaSourceOffset(offsets: Map[TopicAndPartition, Long]) extends Offs /** Returns a set of offset ranges between `this` and `other` */ def to(other: KafkaSourceOffset): Seq[OffsetRange] = { + // Get all the partitions referenced in both sets of offsets val allTopicAndPartitions = (this.offsets.keySet ++ other.offsets.keySet).toSeq @@ -89,6 +90,9 @@ case class KafkaSourceOffset(offsets: Map[TopicAndPartition, Long]) extends Offs Some(OffsetRange(tp, fromOffset, untilOffset)) } else None + // TODO: Support cases where topic+partitions are missing from one. Can happen in case of + // repartitioning. + case _ => None } @@ -102,6 +106,8 @@ case class KafkaSourceOffset(offsets: Map[TopicAndPartition, Long]) extends Offs /** Companion object of the [[KafkaSourceOffset]] */ private[kafka] object KafkaSourceOffset { + + /** Returns [[KafkaSourceOffset]] from a Option[Offset]. */ def from(offsetOption: Option[Offset]): Option[KafkaSourceOffset] = { offsetOption.map { offset => offset match { @@ -113,6 +119,10 @@ private[kafka] object KafkaSourceOffset { } } + /** + * Returns [[KafkaSourceOffset]] from a variable sequence of (topic, partitionId, offset) + * tuples. + */ def apply(data: (String, Int, Long)*): KafkaSourceOffset = { val map = data.map { case (topic, partition, offset) => TopicAndPartition(topic, partition) -> offset }.toMap @@ -120,6 +130,7 @@ private[kafka] object KafkaSourceOffset { } } + /** A [[Source]] that reads data from Kafka */ private[kafka] case class KafkaSource( topics: Set[String], @@ -137,9 +148,7 @@ private[kafka] case class KafkaSource( override def schema: StructType = encoder.schema - /** - * Returns the next batch of data that is available after `start`, if any is available. - */ + /** Returns the next batch of data that is available after `start`, if any is available. */ override def getNextBatch(start: Option[Offset]): Option[Batch] = { val beginOffset: KafkaSourceOffset = KafkaSourceOffset.from(start).getOrElse(initialOffsets) val latestOffset = getLatestOffsets() From 97c4aa5287964e880092706de4a5b20f0b740029 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 11 Jan 2016 12:11:42 -0800 Subject: [PATCH 08/13] More docs --- .../spark/sql/execution/streaming/CompositeOffset.scala | 4 +++- .../scala/org/apache/spark/sql/streaming/OffsetSuite.scala | 6 +++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala index 2d8acae34f922..d09a2569e51b6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala @@ -59,7 +59,9 @@ case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset { } object CompositeOffset { - + /** + * Returns a [[CompositeOffset]] with a variable sequence of offsets. + * `nulls` in the sequence are converted to `None`s. */ def fill(offsets: Offset*): CompositeOffset = { CompositeOffset(offsets.map(Option(_))) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala index e2ae5bdbf6c37..faf9c70a6204f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala @@ -81,8 +81,8 @@ class CompositeOffsetSuite extends OffsetSuite { one = CompositeOffset(None :: Nil), two = CompositeOffset(Some(LongOffset(2)) :: Nil)) - compareInvalid( - one = CompositeOffset(Nil), // sizes must be same + compareInvalid( // sizes must be same + one = CompositeOffset(Nil), two = CompositeOffset(Some(LongOffset(2)) :: Nil)) compare( @@ -94,7 +94,7 @@ class CompositeOffsetSuite extends OffsetSuite { two = CompositeOffset.fill(LongOffset(1), LongOffset(2))) compareInvalid( - one = CompositeOffset.fill(LongOffset(2), LongOffset(1)), + one = CompositeOffset.fill(LongOffset(2), LongOffset(1)), // vector time inconsistent two = CompositeOffset.fill(LongOffset(1), LongOffset(2))) } From 35dea1bdf82638a004292726abcc94b31c4b568a Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 11 Jan 2016 13:45:13 -0800 Subject: [PATCH 09/13] Addressed comments form Michael --- .../spark/streaming/kafka/KafkaCluster.scala | 10 ++-- .../streaming/kafka/KafkaInputDStream.scala | 4 +- .../spark/streaming/kafka/KafkaRDD.scala | 12 ++--- .../spark/streaming/kafka/KafkaSource.scala | 13 ++--- .../kafka/KafkaSourceOffsetSuite.scala | 47 +++++++++++++++++++ .../streaming/kafka/KafkaSourceSuite.scala | 29 +----------- .../execution/streaming/CompositeOffset.scala | 9 ++-- .../sql/execution/streaming/Offset.scala | 2 - .../execution/streaming/StreamExecution.scala | 2 +- 9 files changed, 72 insertions(+), 56 deletions(-) create mode 100644 external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceOffsetSuite.scala diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala index 8465432c5850f..21e971bc4d23b 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala @@ -17,14 +17,16 @@ package org.apache.spark.streaming.kafka -import scala.util.control.NonFatal -import scala.util.Random -import scala.collection.mutable.ArrayBuffer -import scala.collection.JavaConverters._ import java.util.Properties + +import scala.collection.mutable.ArrayBuffer +import scala.util.Random +import scala.util.control.NonFatal + import kafka.api._ import kafka.common.{ErrorMapping, OffsetAndMetadata, OffsetMetadataAndError, TopicAndPartition} import kafka.consumer.{ConsumerConfig, SimpleConsumer} + import org.apache.spark.SparkException /** diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala index 38730fecf332a..2274ccd34d1aa 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala @@ -20,9 +20,9 @@ package org.apache.spark.streaming.kafka import java.util.Properties import scala.collection.Map -import scala.reflect.{classTag, ClassTag} +import scala.reflect.{ClassTag, classTag} -import kafka.consumer.{KafkaStream, Consumer, ConsumerConfig, ConsumerConnector} +import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream} import kafka.serializer.Decoder import kafka.utils.VerifiableProperties diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala index ea5f842c6cafe..fec638f70f34b 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala @@ -18,12 +18,7 @@ package org.apache.spark.streaming.kafka import scala.collection.mutable.ArrayBuffer -import scala.reflect.{classTag, ClassTag} - -import org.apache.spark.{Logging, Partition, SparkContext, SparkException, TaskContext} -import org.apache.spark.partial.{PartialResult, BoundedDouble} -import org.apache.spark.rdd.RDD -import org.apache.spark.util.NextIterator +import scala.reflect.{ClassTag, classTag} import kafka.api.{FetchRequestBuilder, FetchResponse} import kafka.common.{ErrorMapping, TopicAndPartition} @@ -32,6 +27,11 @@ import kafka.message.{MessageAndMetadata, MessageAndOffset} import kafka.serializer.Decoder import kafka.utils.VerifiableProperties +import org.apache.spark.partial.{BoundedDouble, PartialResult} +import org.apache.spark.rdd.RDD +import org.apache.spark.util.NextIterator +import org.apache.spark.{Logging, Partition, SparkContext, SparkException, TaskContext} + /** * A batch-oriented interface for consuming from Kafka. * Starting and ending offsets are specified in advance, diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSource.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSource.scala index 0890eb79981ab..5ce41a9722aa5 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSource.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSource.scala @@ -50,13 +50,8 @@ case class KafkaSourceOffset(offsets: Map[TopicAndPartition, Long]) extends Offs } else { 0 } - case (None, _) => - -1 - case (_, None) => - 1 - case _ => - throw new IllegalArgumentException( - s"Invalid comparison between two sets of Kafka offsets: $this <=> $other") + case (None, _) => -1 + case (_, None) => 1 } } val nonZeroSigns = comparisons.filter { _ != 0 }.toSet @@ -114,7 +109,7 @@ private[kafka] object KafkaSourceOffset { case o: KafkaSourceOffset => o case _ => throw new IllegalArgumentException( - s"Invalid conversion from offset of ${offset.getClass} to $getClass") + s"Invalid conversion from offset of ${offset.getClass} to KafkaSourceOffset") } } } @@ -162,7 +157,7 @@ private[kafka] case class KafkaSource( if (offsetRanges.nonEmpty) { val rdd = KafkaUtils.createRDD[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder]( sparkContext, kafkaParams, offsetRanges.toArray) - logInfo("Creating DF with offset ranges: " + offsetRanges) + logInfo(s"Creating DF with offset ranges: $offsetRanges") Some(new Batch(latestOffset, sqlContext.createDataset(rdd).toDF)) } else { None diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceOffsetSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceOffsetSuite.scala new file mode 100644 index 0000000000000..1a1faabf747a5 --- /dev/null +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceOffsetSuite.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import org.apache.spark.sql.streaming.OffsetSuite + +class KafkaSourceOffsetSuite extends OffsetSuite { + + compare( + one = KafkaSourceOffset(("t", 0, 1L)), + two = KafkaSourceOffset(("t", 0, 2L))) + + compare( + one = KafkaSourceOffset(("t", 0, 1L), ("t", 1, 0L)), + two = KafkaSourceOffset(("t", 0, 2L), ("t", 1, 1L))) + + compare( + one = KafkaSourceOffset(("t", 0, 1L), ("T", 0, 0L)), + two = KafkaSourceOffset(("t", 0, 2L), ("T", 0, 1L))) + + compare( + one = KafkaSourceOffset(("t", 0, 1L)), + two = KafkaSourceOffset(("t", 0, 2L), ("t", 1, 1L))) + + compareInvalid( + one = KafkaSourceOffset(("t", 1, 1L)), + two = KafkaSourceOffset(("t", 0, 2L))) + + compareInvalid( + one = KafkaSourceOffset(("t", 0, 1L)), + two = KafkaSourceOffset(("T", 0, 2L))) +} diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceSuite.scala index fc4925e07f565..859112604d395 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceSuite.scala @@ -17,19 +17,16 @@ package org.apache.spark.streaming.kafka -import scala.util.Try - import kafka.common.TopicAndPartition import org.apache.kafka.clients.producer.RecordMetadata import org.scalatest.time.SpanSugar._ import org.apache.spark.sql.StreamTest import org.apache.spark.sql.execution.streaming.{Offset, Source} -import org.apache.spark.sql.streaming.OffsetSuite import org.apache.spark.sql.test.SharedSQLContext -class KafkaSourceSuite extends StreamTest with SharedSQLContext with OffsetSuite { +class KafkaSourceSuite extends StreamTest with SharedSQLContext { import testImplicits._ @@ -116,28 +113,4 @@ class KafkaSourceSuite extends StreamTest with SharedSQLContext with OffsetSuite ) } } - - compare( - one = KafkaSourceOffset(("t", 0, 1L)), - two = KafkaSourceOffset(("t", 0, 2L))) - - compare( - one = KafkaSourceOffset(("t", 0, 1L), ("t", 1, 0L)), - two = KafkaSourceOffset(("t", 0, 2L), ("t", 1, 1L))) - - compare( - one = KafkaSourceOffset(("t", 0, 1L), ("T", 0, 0L)), - two = KafkaSourceOffset(("t", 0, 2L), ("T", 0, 1L))) - - compare( - one = KafkaSourceOffset(("t", 0, 1L)), - two = KafkaSourceOffset(("t", 0, 2L), ("t", 1, 1L))) - - compareInvalid( - one = KafkaSourceOffset(("t", 1, 1L)), - two = KafkaSourceOffset(("t", 0, 2L))) - - compareInvalid( - one = KafkaSourceOffset(("t", 0, 1L)), - two = KafkaSourceOffset(("T", 0, 2L))) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala index d09a2569e51b6..a7f48988fa58d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala @@ -36,12 +36,12 @@ case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset { case (None, None) => 0 case (None, _) => -1 case (_, None) => 1 - } - val nonZeroSigns = comparisons.map(sign).filter { _ != 0}.toSet + } + val nonZeroSigns = comparisons.map(sign).filter(_ != 0).toSet nonZeroSigns.size match { case 0 => 0 // if both empty or only 0s case 1 => nonZeroSigns.head // if there are only (0s and 1s) or (0s and -1s) - case _ => // there are both 1s and -1s + case _ => // there are both 1s and -1s throw new IllegalArgumentException( s"Invalid comparison between non-linear histories: $this <=> $other") } @@ -61,7 +61,8 @@ case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset { object CompositeOffset { /** * Returns a [[CompositeOffset]] with a variable sequence of offsets. - * `nulls` in the sequence are converted to `None`s. */ + * `nulls` in the sequence are converted to `None`s. + */ def fill(offsets: Offset*): CompositeOffset = { CompositeOffset(offsets.map(Option(_))) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala index a1d927d780bd9..704a027c12b8d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.streaming -import scala.util.Try - /** * A offset is a monotonically increasing metric used to track progress in the computation of a * stream. An [[Offset]] must be comparable. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index fb065aefb1c03..b88c798917a18 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -44,7 +44,7 @@ class StreamExecution( var batchRun = false /** Minimum amount of time in between the start of each batch. */ - val minBatchTime = 100 + val minBatchTime = 10 /** Tracks how much data we have processed from each input source. */ private[sql] val currentOffsets = new StreamProgress From b6a081ba73dec00b3aed1f321df2df4e563f0daf Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 11 Jan 2016 15:00:31 -0800 Subject: [PATCH 10/13] Override equals instead of == --- .../spark/streaming/kafka/KafkaSource.scala | 2 -- .../execution/streaming/CompositeOffset.scala | 2 -- .../spark/sql/execution/streaming/Offset.scala | 18 ++++++++++++++++-- 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSource.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSource.scala index 5ce41a9722aa5..57bbfa32eb7f3 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSource.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSource.scala @@ -67,8 +67,6 @@ case class KafkaSourceOffset(offsets: Map[TopicAndPartition, Long]) extends Offs throw new IllegalArgumentException(s"Cannot compare $this <=> $other") } - override def ==(other: Offset): Boolean = Try(compareTo(other) == 0).getOrElse(false) - /** Returns a set of offset ranges between `this` and `other` */ def to(other: KafkaSourceOffset): Seq[OffsetRange] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala index a7f48988fa58d..d2cb20ef8b819 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala @@ -49,8 +49,6 @@ case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset { throw new IllegalArgumentException(s"Cannot compare $this <=> $other") } - override def ==(other: Offset): Boolean = Try(compareTo(other) == 0).getOrElse(false) - private def sign(num: Int): Int = num match { case i if i < 0 => -1 case i if i == 0 => 0 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala index 704a027c12b8d..54a9d4e7ee2f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala @@ -25,7 +25,9 @@ trait Offset extends Serializable { /** * Returns a negative integer, zero, or a positive integer as this object is less than, equal to, - * or greater than the specified object. + * or greater than the specified object. Any error is comparison should be throw as + * `IllegalArgumentException`, so that operations like `equals` can catch it and handle it + * accordingly. */ def compareTo(other: Offset): Int @@ -33,5 +35,17 @@ trait Offset extends Serializable { def <(other: Offset): Boolean = compareTo(other) < 0 def <=(other: Offset): Boolean = compareTo(other) <= 0 def >=(other: Offset): Boolean = compareTo(other) >= 0 - def ==(other: Offset): Boolean = compareTo(other) == 0 + + override def equals(other: Any): Boolean = { + try { + other match { + case o: Offset => compareTo(o) == 0 + case _ => + false + } + } catch { + case e: IllegalArgumentException => + false + } + } } From d747866178ee3a0c86599a84eb2618976f27ba07 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 11 Jan 2016 18:00:33 -0800 Subject: [PATCH 11/13] Addressed one more comment --- .../apache/spark/streaming/kafka/KafkaSourceSuite.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceSuite.scala index 859112604d395..4e139ad1955ff 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceSuite.scala @@ -37,7 +37,7 @@ class KafkaSourceSuite extends StreamTest with SharedSQLContext { case class AddKafkaData( kafkaSource: KafkaSource, - topic: String, data: Int*)(implicit multiPartitionCheck: Boolean = true) extends AddData { + topic: String, data: Int*) extends AddData { override def addData(): Offset = { val sentMetadata = testUtils.sendMessages(topic, data.map{ _.toString}.toArray) @@ -52,8 +52,9 @@ class KafkaSourceSuite extends StreamTest with SharedSQLContext { s"Sent ${m._1} to partition ${m._2.partition()}, offset ${m._2.offset()}" } - assert(latestOffsetMap.size > 1, - s"Added data does not test multiple partitions: " + sentMetadata.map(metadataToStr)) + // Verify that the test data gets inserted into multiple partitions + require(latestOffsetMap.size > 1, + s"Added data does not test multiple partitions: ${sentMetadata.map(metadataToStr)}") // Expected offset to ensure this data is read is last offset of this data + 1 val offset = KafkaSourceOffset(latestOffsetMap) From 81cda63e03a5921a14e9d32253f2599ffb1ffc67 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 12 Jan 2016 13:27:45 -0800 Subject: [PATCH 12/13] Added stress test --- .../spark/streaming/kafka/KafkaSource.scala | 2 - .../streaming/kafka/KafkaTestUtils.scala | 9 ++ .../streaming/kafka/KafkaSourceSuite.scala | 89 +++++++++++-------- .../org/apache/spark/sql/StreamTest.scala | 32 ++++--- 4 files changed, 81 insertions(+), 51 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSource.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSource.scala index 57bbfa32eb7f3..05e2fb111c596 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSource.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSource.scala @@ -17,8 +17,6 @@ package org.apache.spark.streaming.kafka -import scala.util.Try - import kafka.common.TopicAndPartition import kafka.serializer._ diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala index 898b88977f87a..7cec53b43890d 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -30,6 +30,7 @@ import scala.util.control.NonFatal import kafka.admin.AdminUtils import kafka.api.Request +import kafka.common.TopicAndPartition import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.{ZKStringSerializer, ZkUtils} import org.I0Itec.zkclient.ZkClient @@ -193,6 +194,14 @@ private[kafka] class KafkaTestUtils extends Logging { offsets } + /** Get the latest offset of all the partitions in a topic */ + def getLatestOffsets(topics: Set[String]): Map[TopicAndPartition, Long] = { + val kc = new KafkaCluster(Map("metadata.broker.list" -> brokerAddress)) + val topicPartitions = kc.getPartitions(topics).right.get + val offsets = kc.getLatestLeaderOffsets(topicPartitions).right.get + offsets.mapValues(_.offset) + } + private def brokerConfiguration: Properties = { val props = new Properties() props.put("broker.id", "0") diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceSuite.scala index 4e139ad1955ff..832aaf38f92ba 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaSourceSuite.scala @@ -17,7 +17,8 @@ package org.apache.spark.streaming.kafka -import kafka.common.TopicAndPartition +import scala.util.Random + import org.apache.kafka.clients.producer.RecordMetadata import org.scalatest.time.SpanSugar._ @@ -35,36 +36,6 @@ class KafkaSourceSuite extends StreamTest with SharedSQLContext { override val streamingTimout = 10.seconds - case class AddKafkaData( - kafkaSource: KafkaSource, - topic: String, data: Int*) extends AddData { - - override def addData(): Offset = { - val sentMetadata = testUtils.sendMessages(topic, data.map{ _.toString}.toArray) - val latestOffsetMap = sentMetadata - .groupBy { _._2.partition } - .map { case (partition, msgAndMetadata) => - val maxOffsetInPartition = msgAndMetadata.map(_._2).maxBy(_.offset).offset() - (TopicAndPartition(topic, partition), maxOffsetInPartition + 1) - } - - def metadataToStr(m: (String, RecordMetadata)): String = { - s"Sent ${m._1} to partition ${m._2.partition()}, offset ${m._2.offset()}" - } - - // Verify that the test data gets inserted into multiple partitions - require(latestOffsetMap.size > 1, - s"Added data does not test multiple partitions: ${sentMetadata.map(metadataToStr)}") - - // Expected offset to ensure this data is read is last offset of this data + 1 - val offset = KafkaSourceOffset(latestOffsetMap) - logInfo(s"Added data, expected offset $offset") - offset - } - - override def source: Source = kafkaSource - } - override def beforeAll(): Unit = { super.beforeAll() testUtils = new KafkaTestUtils @@ -94,24 +65,72 @@ class KafkaSourceSuite extends StreamTest with SharedSQLContext { val mapped = kafkaSource .toDS() - .map[Int]((kv: (Array[Byte], Array[Byte])) => new String(kv._2).toInt + 1) + .map((kv: (Array[Byte], Array[Byte])) => new String(kv._2).toInt + 1) logInfo(s"Initial offsets: ${kafkaSource.initialOffsets}") testStream(mapped)( - AddKafkaData(kafkaSource, topic, 1, 2, 3), + AddKafkaData(kafkaSource, Set(topic), 1, 2, 3), CheckAnswer(2, 3, 4), StopStream , DropBatches(1), // Lose last batch in the sink StartStream, CheckAnswer(2, 3, 4), // Should get the data back on recovery StopStream, - AddKafkaData(kafkaSource, topic, 4, 5, 6), // Add data when stream is stopped + AddKafkaData(kafkaSource, Set(topic), 4, 5, 6), // Add data when stream is stopped StartStream, CheckAnswer(2, 3, 4, 5, 6, 7), // Should get the added data - AddKafkaData(kafkaSource, topic, 7, 8), + AddKafkaData(kafkaSource, Set(topic), 7, 8), CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9) ) } } + + test("stress test with multiple topics and partitions") { + val topics = (1 to 5).map (i => s"stress$i").toSet + topics.foreach(testUtils.createTopic(_, partitions = Random.nextInt(10))) + + for (i <- 1 to 5) { + // Create Kafka source that reads from latest offset + val kafkaSource = KafkaSource(topics, kafkaParams) + + val mapped = + kafkaSource + .toDS() + .map(r => new String(r._2).toInt + 1) + + createStressTest( + mapped, + d => AddKafkaData(kafkaSource, topics, d: _*)(ensureDataInMultiplePartition = false), + iterations = 30) + } + } + + + case class AddKafkaData( + kafkaSource: KafkaSource, + topics: Set[String], data: Int*)(implicit ensureDataInMultiplePartition: Boolean = true) + extends AddData { + + override def addData(): Offset = { + val topic = topics.toSeq(Random.nextInt(topics.size)) + val sentMetadata = testUtils.sendMessages(topic, data.map{ _.toString}.toArray) + + def metadataToStr(m: (String, RecordMetadata)): String = { + s"Sent ${m._1} to partition ${m._2.partition()}, offset ${m._2.offset()}" + } + + // Verify that the test data gets inserted into multiple partitions + if (ensureDataInMultiplePartition) { + require(sentMetadata.groupBy(_._2.partition).size > 1, + s"Added data does not test multiple partitions: ${sentMetadata.map(metadataToStr)}") + } + + val offset = KafkaSourceOffset(testUtils.getLatestOffsets(topics)) + logInfo(s"Added data, expected offset $offset") + offset + } + + override def source: Source = kafkaSource + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala index dab54c7becfc7..c005514e7e993 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala @@ -20,18 +20,17 @@ package org.apache.spark.sql import java.lang.Thread.UncaughtExceptionHandler import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Random import org.scalatest.concurrent.Timeouts import org.scalatest.time.SpanSugar._ -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder, encoderFor} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.streaming._ -import scala.collection.mutable.ArrayBuffer -import scala.util.Random - /** * A framework for implementing tests for streaming queries and sources. * @@ -297,7 +296,10 @@ trait StreamTest extends QueryTest with Timeouts { * @param addData and add data action that adds the given numbers to the stream, encoding them * as needed */ - def createStressTest(ds: Dataset[Int], addData: Seq[Int] => StreamAction): Unit = { + def createStressTest( + ds: Dataset[Int], + addData: Seq[Int] => StreamAction, + iterations: Int = 500): Unit = { implicit val intEncoder = ExpressionEncoder[Int] var dataPos = 0 var running = true @@ -305,15 +307,20 @@ trait StreamTest extends QueryTest with Timeouts { def addCheck() = { actions += CheckAnswer(1 to dataPos: _*) } - (1 to 500).foreach { i => + def addRandomData() = { + val numItems = Random.nextInt(10) + val data = dataPos until (dataPos + numItems) + dataPos += numItems + actions += addData(data) + } + + (1 to iterations).foreach { i => val rand = Random.nextDouble() if(!running) { rand match { case r if r < 0.7 => // AddData - val numItems = Random.nextInt(10) - val data = dataPos until (dataPos + numItems) - dataPos += numItems - actions += addData(data) + addRandomData() + case _ => // StartStream actions += StartStream running = true @@ -324,10 +331,7 @@ trait StreamTest extends QueryTest with Timeouts { addCheck() case r if r < 0.7 => // AddData - val numItems = Random.nextInt(10) - val data = dataPos until (dataPos + numItems) - dataPos += numItems - actions += addData(data) + addRandomData() case _ => // StartStream actions += StopStream From bcfd6a961f0121226d03cd7b3c31b183eaac3407 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 12 Jan 2016 14:10:31 -0800 Subject: [PATCH 13/13] Removed equals --- .../sql/execution/streaming/Offset.scala | 20 +++---------------- 1 file changed, 3 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala index 54a9d4e7ee2f0..0f5d6445b1e2b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala @@ -19,15 +19,14 @@ package org.apache.spark.sql.execution.streaming /** * A offset is a monotonically increasing metric used to track progress in the computation of a - * stream. An [[Offset]] must be comparable. + * stream. An [[Offset]] must be comparable, and the result of `compareTo` must be consistent + * with `equals` and `hashcode`. */ trait Offset extends Serializable { /** * Returns a negative integer, zero, or a positive integer as this object is less than, equal to, - * or greater than the specified object. Any error is comparison should be throw as - * `IllegalArgumentException`, so that operations like `equals` can catch it and handle it - * accordingly. + * or greater than the specified object. */ def compareTo(other: Offset): Int @@ -35,17 +34,4 @@ trait Offset extends Serializable { def <(other: Offset): Boolean = compareTo(other) < 0 def <=(other: Offset): Boolean = compareTo(other) <= 0 def >=(other: Offset): Boolean = compareTo(other) >= 0 - - override def equals(other: Any): Boolean = { - try { - other match { - case o: Offset => compareTo(o) == 0 - case _ => - false - } - } catch { - case e: IllegalArgumentException => - false - } - } }