Skip to content

Commit

Permalink
Add a test for #118
Browse files Browse the repository at this point in the history
  • Loading branch information
kciesielski committed Apr 1, 2016
1 parent 0ef7adf commit 5d80f7a
Showing 1 changed file with 77 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,25 +1,34 @@
package com.softwaremill.react.kafka

import java.util
import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit}

import akka.actor._
import akka.stream.actor._
import akka.stream.scaladsl.{Keep, Sink}
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.testkit.scaladsl.{TestSink, TestSource}
import akka.testkit.{ImplicitSender, TestKit}
import akka.util.Timeout
import com.softwaremill.react.kafka.KafkaMessages._
import com.softwaremill.react.kafka.commit.CommitSink
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.PatienceConfiguration
import org.scalatest.concurrent.PatienceConfiguration.Interval
import org.scalatest.mock.MockitoSugar
import org.scalatest.time.{Millis, Seconds, Span}
import org.scalatest.{Matchers, fixture}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.duration._
import scala.language.postfixOps

class ReactiveKafkaIntegrationSpec extends TestKit(ActorSystem("ReactiveKafkaIntegrationSpec"))
with ImplicitSender with fixture.WordSpecLike with Matchers
with ReactiveKafkaIntegrationTestSupport with MockitoSugar {
with ImplicitSender with fixture.WordSpecLike with Matchers
with ReactiveKafkaIntegrationTestSupport with MockitoSugar {

implicit val timeout = Timeout(1 second)

Expand Down Expand Up @@ -81,6 +90,48 @@ class ReactiveKafkaIntegrationSpec extends TestKit(ActorSystem("ReactiveKafkaInt
verifyQueueHas(Seq("3", "4", "5"))
}

"not commit the same offsets twice" in { implicit f =>
// given
val consumerProps = consumerProperties(f)
.commitInterval(700 millis)
.noAutoCommit()
val mockConsumer = new CommitCountingMockConsumer[String, String]
val topicPartition1 = new TopicPartition(f.topic, 0)
val topicPartition2 = new TopicPartition(f.topic, 1)
mockConsumer.updateBeginningOffsets(Map(topicPartition1 -> java.lang.Long.valueOf(0L), topicPartition2 -> java.lang.Long.valueOf(0L)).asJava)
val mockProvider = (_: ConsumerProperties[String, String]) => mockConsumer
val reactiveConsumer: ReactiveKafkaConsumer[String, String] = new ReactiveKafkaConsumer(consumerProps, Set(topicPartition1, topicPartition2), Map(), mockProvider)
val propsWithConsumer = ConsumerWithActorProps(reactiveConsumer, Props(new KafkaActorPublisher(reactiveConsumer)))
val actor = system.actorOf(propsWithConsumer.actorProps)
val publisherWithSink = PublisherWithCommitSink[String, String](
ActorPublisher[ConsumerRecord[String, String]](actor), actor, CommitSink.create(actor, consumerProps)
)
val source = Source.fromPublisher(publisherWithSink.publisher)
for (i <- 0 to 9) {
mockConsumer.addRecord(new ConsumerRecord[String, String](f.topic, 0, i.toLong, s"k$i", i.toString))
}

// when
source
.filter(_.value().toInt < 3)
.to(publisherWithSink.offsetCommitSink)
.run()

// then
eventually(PatienceConfiguration.Timeout(Span(5, Seconds)), Interval(Span(150, Millis))) {
mockConsumer.committed(topicPartition1) should be(new OffsetAndMetadata(3))
}

// trigger one more commit by adding a new record to the second partition
// this should result in committing only the new metadata
mockConsumer.addRecord(new ConsumerRecord[String, String](f.topic, 1, 10, s"k10", "1"))

verifyNever {
val counter = mockConsumer.counter.getOrElse(topicPartition1 -> new OffsetAndMetadata(3), 0)
counter != 1
}
}

def shouldStartConsuming(fromEnd: Boolean)(implicit f: FixtureParam) = {
val producerProps = createProducerProperties(f)
val producer = new KafkaProducer(producerProps.rawProperties, producerProps.keySerializer, producerProps.valueSerializer)
Expand Down Expand Up @@ -131,4 +182,27 @@ class ReactiveTestSubscriber extends ActorSubscriber {
elements = elements :+ element.asInstanceOf[StringConsumerRecord]
case "get elements" => sender ! elements
}
}

class CommitCountingMockConsumer[K, V] extends MockConsumer[K, V](OffsetResetStrategy.EARLIEST) {
val counter = mutable.Map[(TopicPartition, OffsetAndMetadata), Int]()

override def commitSync(offsets: util.Map[TopicPartition, OffsetAndMetadata]): Unit = {
for (o <- offsets.asScala) {
counter += o -> (counter.getOrElse(o, 0) + 1)
}
super.commitSync(offsets)
}

override def poll(timeout: Long): ConsumerRecords[K, V] = {
synchronized {
super.poll(timeout)
}
}

override def addRecord(record: ConsumerRecord[K, V]): Unit = {
synchronized {
super.addRecord(record)
}
}
}

0 comments on commit 5d80f7a

Please sign in to comment.