diff --git a/core/src/test/scala/com/softwaremill/react/kafka/ReactiveKafkaIntegrationSpec.scala b/core/src/test/scala/com/softwaremill/react/kafka/ReactiveKafkaIntegrationSpec.scala index 625a065a4..8fcc082a0 100644 --- a/core/src/test/scala/com/softwaremill/react/kafka/ReactiveKafkaIntegrationSpec.scala +++ b/core/src/test/scala/com/softwaremill/react/kafka/ReactiveKafkaIntegrationSpec.scala @@ -1,25 +1,34 @@ package com.softwaremill.react.kafka +import java.util import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit} import akka.actor._ import akka.stream.actor._ -import akka.stream.scaladsl.{Keep, Sink} +import akka.stream.scaladsl.{Keep, Sink, Source} import akka.stream.testkit.scaladsl.{TestSink, TestSource} import akka.testkit.{ImplicitSender, TestKit} import akka.util.Timeout import com.softwaremill.react.kafka.KafkaMessages._ +import com.softwaremill.react.kafka.commit.CommitSink +import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import org.apache.kafka.common.TopicPartition +import org.scalatest.concurrent.Eventually._ +import org.scalatest.concurrent.PatienceConfiguration +import org.scalatest.concurrent.PatienceConfiguration.Interval import org.scalatest.mock.MockitoSugar +import org.scalatest.time.{Millis, Seconds, Span} import org.scalatest.{Matchers, fixture} import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.concurrent.duration._ import scala.language.postfixOps class ReactiveKafkaIntegrationSpec extends TestKit(ActorSystem("ReactiveKafkaIntegrationSpec")) - with ImplicitSender with fixture.WordSpecLike with Matchers - with ReactiveKafkaIntegrationTestSupport with MockitoSugar { + with ImplicitSender with fixture.WordSpecLike with Matchers + with ReactiveKafkaIntegrationTestSupport with MockitoSugar { implicit val timeout = Timeout(1 second) @@ -81,6 +90,48 @@ class ReactiveKafkaIntegrationSpec extends TestKit(ActorSystem("ReactiveKafkaInt verifyQueueHas(Seq("3", "4", "5")) } + "not commit the same offsets twice" in { implicit f => + // given + val consumerProps = consumerProperties(f) + .commitInterval(700 millis) + .noAutoCommit() + val mockConsumer = new CommitCountingMockConsumer[String, String] + val topicPartition1 = new TopicPartition(f.topic, 0) + val topicPartition2 = new TopicPartition(f.topic, 1) + mockConsumer.updateBeginningOffsets(Map(topicPartition1 -> java.lang.Long.valueOf(0L), topicPartition2 -> java.lang.Long.valueOf(0L)).asJava) + val mockProvider = (_: ConsumerProperties[String, String]) => mockConsumer + val reactiveConsumer: ReactiveKafkaConsumer[String, String] = new ReactiveKafkaConsumer(consumerProps, Set(topicPartition1, topicPartition2), Map(), mockProvider) + val propsWithConsumer = ConsumerWithActorProps(reactiveConsumer, Props(new KafkaActorPublisher(reactiveConsumer))) + val actor = system.actorOf(propsWithConsumer.actorProps) + val publisherWithSink = PublisherWithCommitSink[String, String]( + ActorPublisher[ConsumerRecord[String, String]](actor), actor, CommitSink.create(actor, consumerProps) + ) + val source = Source.fromPublisher(publisherWithSink.publisher) + for (i <- 0 to 9) { + mockConsumer.addRecord(new ConsumerRecord[String, String](f.topic, 0, i.toLong, s"k$i", i.toString)) + } + + // when + source + .filter(_.value().toInt < 3) + .to(publisherWithSink.offsetCommitSink) + .run() + + // then + eventually(PatienceConfiguration.Timeout(Span(5, Seconds)), Interval(Span(150, Millis))) { + mockConsumer.committed(topicPartition1) should be(new OffsetAndMetadata(3)) + } + + // trigger one more commit by adding a new record to the second partition + // this should result in committing only the new metadata + mockConsumer.addRecord(new ConsumerRecord[String, String](f.topic, 1, 10, s"k10", "1")) + + verifyNever { + val counter = mockConsumer.counter.getOrElse(topicPartition1 -> new OffsetAndMetadata(3), 0) + counter != 1 + } + } + def shouldStartConsuming(fromEnd: Boolean)(implicit f: FixtureParam) = { val producerProps = createProducerProperties(f) val producer = new KafkaProducer(producerProps.rawProperties, producerProps.keySerializer, producerProps.valueSerializer) @@ -131,4 +182,27 @@ class ReactiveTestSubscriber extends ActorSubscriber { elements = elements :+ element.asInstanceOf[StringConsumerRecord] case "get elements" => sender ! elements } +} + +class CommitCountingMockConsumer[K, V] extends MockConsumer[K, V](OffsetResetStrategy.EARLIEST) { + val counter = mutable.Map[(TopicPartition, OffsetAndMetadata), Int]() + + override def commitSync(offsets: util.Map[TopicPartition, OffsetAndMetadata]): Unit = { + for (o <- offsets.asScala) { + counter += o -> (counter.getOrElse(o, 0) + 1) + } + super.commitSync(offsets) + } + + override def poll(timeout: Long): ConsumerRecords[K, V] = { + synchronized { + super.poll(timeout) + } + } + + override def addRecord(record: ConsumerRecord[K, V]): Unit = { + synchronized { + super.addRecord(record) + } + } } \ No newline at end of file