Skip to content

Commit

Permalink
Add Scalafmt 1.5.1 (#489)
Browse files Browse the repository at this point in the history
* Replace scalariform with Scalafmt
* Run Scalafmt check on Travis
* Scalafmt reformat
ennru authored Jun 7, 2018
1 parent ecf11dd commit 557927a
Showing 57 changed files with 1,773 additions and 1,165 deletions.
9 changes: 9 additions & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
style = defaultWithAlign

align.tokens = [off]
danglingParentheses = true
docstrings = JavaDoc
indentOperator = spray
maxColumn = 120
rewrite.rules = [RedundantBraces, RedundantParens, SortImports]
unindentTopLevelOperators = true
10 changes: 10 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -6,6 +6,12 @@ services:

jobs:
include:
- stage: check
script: sbt scalafmtCheck || { echo "[error] Unformatted code found. Please run 'test:compile' and commit the reformatted code."; false; }
env: SCALAFMT_CHECK
- script: sbt scalafmtSbtCheck || { echo "[error] Unformatted sbt code found. Please run 'scalafmtSbt' and commit the reformatted code."; false; }
env: SCALAFMT_SBT_CHECK

- stage: test
script: sbt -jvm-opts .jvmopts-travis +test
jdk: oraclejdk8
@@ -29,6 +35,10 @@ jobs:
jdk: oraclejdk8

stages:
# runs on master commits and PRs
- name: check
if: NOT tag =~ ^v

# runs on master commits and PRs
- name: test
if: NOT tag =~ ^v
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -113,8 +113,8 @@ Example:
## How To Enforce These Guidelines?

1. [Travis CI](https://travis-ci.org/akka/reactive-kafka) automatically merges the code, builds it, runs the tests and sets Pull Request status accordingly of results in GitHub.
1. [Scalafmt](http://scalameta.org/scalafmt/) enforces some of the code style rules.
1. [sbt-header plugin](https://github.com/sbt/sbt-header) manages consistent copyright headers in every source file.
1. The [sbt-whitesourece plugin](https://github.com/lightbend/sbt-whitesource) checks licensing models of all (transitive) dependencies.
1. A GitHub bot checks whether you've signed the Lightbend CLA.
1. [Scalariform](https://github.com/daniel-trinh/scalariform) enforces some of the code style rules.

30 changes: 21 additions & 9 deletions benchmarks/src/main/scala/akka.kafka.benchmarks/Benchmarks.scala
Original file line number Diff line number Diff line change
@@ -14,35 +14,47 @@ import scala.concurrent.duration._

object Benchmarks {

def run(cmd: RunTestCommand)(implicit actorSystem: ActorSystem, mat: Materializer): Unit = {

def run(cmd: RunTestCommand)(implicit actorSystem: ActorSystem, mat: Materializer): Unit =
cmd.testName match {
case "plain-consumer-nokafka" =>
runPerfTest(cmd, KafkaConsumerFixtures.noopFixtureGen(cmd), KafkaConsumerBenchmarks.consumePlainNoKafka)
case "akka-plain-consumer-nokafka" =>
runPerfTest(cmd, ReactiveKafkaConsumerFixtures.noopFixtureGen(cmd), ReactiveKafkaConsumerBenchmarks.consumePlainNoKafka)
runPerfTest(cmd,
ReactiveKafkaConsumerFixtures.noopFixtureGen(cmd),
ReactiveKafkaConsumerBenchmarks.consumePlainNoKafka)
case "plain-consumer" =>
runPerfTest(cmd, KafkaConsumerFixtures.filledTopics(cmd), KafkaConsumerBenchmarks.consumePlain)
case "akka-plain-consumer" =>
runPerfTest(cmd, ReactiveKafkaConsumerFixtures.plainSources(cmd), ReactiveKafkaConsumerBenchmarks.consumePlain)
case "batched-consumer" =>
runPerfTest(cmd, KafkaConsumerFixtures.filledTopics(cmd), KafkaConsumerBenchmarks.consumerAtLeastOnceBatched(batchSize = 1000))
runPerfTest(cmd,
KafkaConsumerFixtures.filledTopics(cmd),
KafkaConsumerBenchmarks.consumerAtLeastOnceBatched(batchSize = 1000))
case "akka-batched-consumer" =>
runPerfTest(cmd, ReactiveKafkaConsumerFixtures.commitableSources(cmd), ReactiveKafkaConsumerBenchmarks.consumerAtLeastOnceBatched(batchSize = 1000))
runPerfTest(cmd,
ReactiveKafkaConsumerFixtures.commitableSources(cmd),
ReactiveKafkaConsumerBenchmarks.consumerAtLeastOnceBatched(batchSize = 1000))
case "at-most-once-consumer" =>
runPerfTest(cmd, KafkaConsumerFixtures.filledTopics(cmd), KafkaConsumerBenchmarks.consumeCommitAtMostOnce)
case "akka-at-most-once-consumer" =>
runPerfTest(cmd, ReactiveKafkaConsumerFixtures.commitableSources(cmd), ReactiveKafkaConsumerBenchmarks.consumeCommitAtMostOnce)
runPerfTest(cmd,
ReactiveKafkaConsumerFixtures.commitableSources(cmd),
ReactiveKafkaConsumerBenchmarks.consumeCommitAtMostOnce)
case "plain-producer" =>
runPerfTest(cmd, KafkaProducerFixtures.initializedProducer(cmd), KafkaProducerBenchmarks.plainFlow)
case "akka-plain-producer" =>
runPerfTest(cmd, ReactiveKafkaProducerFixtures.flowFixture(cmd), ReactiveKafkaProducerBenchmarks.plainFlow)
case "transactions" =>
runPerfTest(cmd, KafkaTransactionFixtures.initialize(cmd), KafkaTransactionBenchmarks.consumeTransformProduceTransaction(commitInterval = 100.milliseconds))
runPerfTest(cmd,
KafkaTransactionFixtures.initialize(cmd),
KafkaTransactionBenchmarks.consumeTransformProduceTransaction(commitInterval = 100.milliseconds))
case "akka-transactions" =>
runPerfTest(cmd, ReactiveKafkaTransactionFixtures.transactionalSourceAndSink(cmd, commitInterval = 100.milliseconds), ReactiveKafkaTransactionBenchmarks.consumeTransformProduceTransaction)
runPerfTest(
cmd,
ReactiveKafkaTransactionFixtures.transactionalSourceAndSink(cmd, commitInterval = 100.milliseconds),
ReactiveKafkaTransactionBenchmarks.consumeTransformProduceTransaction
)
case _ => Future.failed(new IllegalArgumentException(s"Unrecognized test name: ${cmd.testName}"))
}
}

}
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@ package akka.kafka.benchmarks
import java.util
import com.codahale.metrics.Meter
import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.clients.consumer.{OffsetCommitCallback, OffsetAndMetadata}
import org.apache.kafka.clients.consumer.{OffsetAndMetadata, OffsetCommitCallback}
import org.apache.kafka.common.TopicPartition

import scala.annotation.tailrec
@@ -23,15 +23,14 @@ object KafkaConsumerBenchmarks extends LazyLogging {
def consumePlainNoKafka(fixture: KafkaConsumerTestFixture, meter: Meter): Unit = {

@tailrec
def pollInLoop(readLimit: Int, readSoFar: Int = 0): Int = {
def pollInLoop(readLimit: Int, readSoFar: Int = 0): Int =
if (readSoFar >= readLimit)
readSoFar
else {
logger.debug(s"Polling")
meter.mark()
pollInLoop(readLimit, readSoFar + 1)
}
}
meter.mark()
pollInLoop(readLimit = fixture.msgCount)
}
@@ -43,7 +42,7 @@ object KafkaConsumerBenchmarks extends LazyLogging {
val consumer = fixture.consumer

@tailrec
def pollInLoop(readLimit: Int, readSoFar: Int = 0): Int = {
def pollInLoop(readLimit: Int, readSoFar: Int = 0): Int =
if (readSoFar >= readLimit)
readSoFar
else {
@@ -55,7 +54,6 @@ object KafkaConsumerBenchmarks extends LazyLogging {
logger.debug(s"${readSoFar + recordCount} records read. Limit = $readLimit")
pollInLoop(readLimit, readSoFar + recordCount)
}
}
meter.mark()
pollInLoop(readLimit = fixture.msgCount)
fixture.close()
@@ -77,15 +75,17 @@ object KafkaConsumerBenchmarks extends LazyLogging {
accumulatedMsgCount = 0
val offsetMap = Map(new TopicPartition(fixture.topic, 0) -> new OffsetAndMetadata(lastProcessedOffset))
logger.debug("Committing offset " + offsetMap.head._2.offset())
consumer.commitAsync(offsetMap.asJava, new OffsetCommitCallback {
override def onComplete(map: util.Map[TopicPartition, OffsetAndMetadata], e: Exception): Unit = {
commitInProgress = false
consumer.commitAsync(
offsetMap.asJava,
new OffsetCommitCallback {
override def onComplete(map: util.Map[TopicPartition, OffsetAndMetadata], e: Exception): Unit =
commitInProgress = false
}
})
)
}

@tailrec
def pollInLoop(readLimit: Int, readSoFar: Int = 0): Int = {
def pollInLoop(readLimit: Int, readSoFar: Int = 0): Int =
if (readSoFar >= readLimit)
readSoFar
else {
@@ -101,16 +101,14 @@ object KafkaConsumerBenchmarks extends LazyLogging {
if (!commitInProgress) {
commitInProgress = true
doCommit()
}
else // previous commit still in progress
} else // previous commit still in progress
consumer.pause(assignment)
}
}
val recordCount = records.count()
logger.debug(s"${readSoFar + recordCount} records read. Limit = $readLimit")
pollInLoop(readLimit, readSoFar + recordCount)
}
}

pollInLoop(readLimit = fixture.msgCount)
fixture.close()
@@ -123,7 +121,7 @@ object KafkaConsumerBenchmarks extends LazyLogging {
val consumer = fixture.consumer
val assignment = consumer.assignment()
@tailrec
def pollInLoop(readLimit: Int, readSoFar: Int = 0): Int = {
def pollInLoop(readLimit: Int, readSoFar: Int = 0): Int =
if (readSoFar >= readLimit)
readSoFar
else {
@@ -136,18 +134,19 @@ object KafkaConsumerBenchmarks extends LazyLogging {
val offsetMap = Map(new TopicPartition(fixture.topic, 0) -> new OffsetAndMetadata(record.offset()))
consumer.pause(assignment)

consumer.commitAsync(offsetMap.asJava, new OffsetCommitCallback {
override def onComplete(map: util.Map[TopicPartition, OffsetAndMetadata], e: Exception): Unit = {
consumer.resume(assignment)
consumer.commitAsync(
offsetMap.asJava,
new OffsetCommitCallback {
override def onComplete(map: util.Map[TopicPartition, OffsetAndMetadata], e: Exception): Unit =
consumer.resume(assignment)
}
})
)
}

val recordCount = records.count()
logger.debug(s"${readSoFar + recordCount} records read. Limit = $readLimit")
pollInLoop(readLimit, readSoFar + recordCount)
}
}

pollInLoop(readLimit = fixture.msgCount)
fixture.close()
Original file line number Diff line number Diff line change
@@ -18,24 +18,27 @@ case class KafkaConsumerTestFixture(topic: String, msgCount: Int, consumer: Kafk
object KafkaConsumerFixtures extends PerfFixtureHelpers {

def noopFixtureGen(c: RunTestCommand) = FixtureGen[KafkaConsumerTestFixture](
c, msgCount => {
KafkaConsumerTestFixture("topic", msgCount, null)
}
c,
msgCount => {
KafkaConsumerTestFixture("topic", msgCount, null)
}
)

def filledTopics(c: RunTestCommand) = FixtureGen[KafkaConsumerTestFixture](
c, msgCount => {
val topic = randomId()
fillTopic(c.kafkaHost, topic, msgCount)
val consumerJavaProps = new java.util.Properties
consumerJavaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, c.kafkaHost)
consumerJavaProps.put(ConsumerConfig.CLIENT_ID_CONFIG, randomId())
consumerJavaProps.put(ConsumerConfig.GROUP_ID_CONFIG, randomId())
consumerJavaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

val consumer = new KafkaConsumer[Array[Byte], String](consumerJavaProps, new ByteArrayDeserializer, new StringDeserializer)
consumer.subscribe(Set(topic).asJava)
KafkaConsumerTestFixture(topic, msgCount, consumer)
}
c,
msgCount => {
val topic = randomId()
fillTopic(c.kafkaHost, topic, msgCount)
val consumerJavaProps = new java.util.Properties
consumerJavaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, c.kafkaHost)
consumerJavaProps.put(ConsumerConfig.CLIENT_ID_CONFIG, randomId())
consumerJavaProps.put(ConsumerConfig.GROUP_ID_CONFIG, randomId())
consumerJavaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

val consumer =
new KafkaConsumer[Array[Byte], String](consumerJavaProps, new ByteArrayDeserializer, new StringDeserializer)
consumer.subscribe(Set(topic).asJava)
KafkaConsumerTestFixture(topic, msgCount, consumer)
}
)
}
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@ import scala.concurrent.duration._
object KafkaProducerBenchmarks extends LazyLogging {

val logStep = 100000

/**
* Streams generated numbers to a Kafka producer. Does not commit.
*/
Original file line number Diff line number Diff line change
@@ -15,16 +15,18 @@ case class KafkaProducerTestFixture(topic: String, msgCount: Int, producer: Kafk
object KafkaProducerFixtures extends PerfFixtureHelpers {

def noopFixtureGen(c: RunTestCommand) = FixtureGen[KafkaProducerTestFixture](
c, msgCount => {
KafkaProducerTestFixture("topic", msgCount, null)
}
c,
msgCount => {
KafkaProducerTestFixture("topic", msgCount, null)
}
)

def initializedProducer(c: RunTestCommand) = FixtureGen[KafkaProducerTestFixture](
c, msgCount => {
val topic = randomId()
val rawProducer = initTopicAndProducer(c.kafkaHost, topic)
KafkaProducerTestFixture(topic, msgCount, rawProducer)
}
c,
msgCount => {
val topic = randomId()
val rawProducer = initTopicAndProducer(c.kafkaHost, topic)
KafkaProducerTestFixture(topic, msgCount, rawProducer)
}
)
}
Original file line number Diff line number Diff line change
@@ -21,7 +21,8 @@ object KafkaTransactionBenchmarks extends LazyLogging {
/**
* Process records in a consume-transform-produce transacational workflow and commit every interval.
*/
def consumeTransformProduceTransaction(commitInterval: FiniteDuration)(fixture: KafkaTransactionTestFixture, meter: Meter): Unit = {
def consumeTransformProduceTransaction(commitInterval: FiniteDuration)(fixture: KafkaTransactionTestFixture,
meter: Meter): Unit = {
val consumer = fixture.consumer
val producer = fixture.producer
val msgCount = fixture.msgCount
@@ -49,12 +50,11 @@ object KafkaTransactionBenchmarks extends LazyLogging {
}

@tailrec
def pollInLoop(readLimit: Int, readSoFar: Int = 0): Int = {
def pollInLoop(readLimit: Int, readSoFar: Int = 0): Int =
if (readSoFar >= readLimit) {
doCommit()
readSoFar
}
else {
} else {
logger.debug("Polling")
val records = consumer.poll(pollTimeoutMs)
for (record <- records.iterator().asScala) {
@@ -64,7 +64,9 @@ object KafkaTransactionBenchmarks extends LazyLogging {
val producerRecord = new ProducerRecord(fixture.sinkTopic, record.partition(), record.key(), record.value())
producer.send(producerRecord)
if (lastProcessedOffset % loggedStep == 0)
logger.info(s"Transformed $lastProcessedOffset elements to Kafka (${100 * lastProcessedOffset / msgCount}%)")
logger.info(
s"Transformed $lastProcessedOffset elements to Kafka (${100 * lastProcessedOffset / msgCount}%)"
)

if (System.nanoTime() >= lastCommit + commitInterval.toNanos) {
doCommit()
@@ -75,7 +77,6 @@ object KafkaTransactionBenchmarks extends LazyLogging {
logger.debug(s"${readSoFar + recordCount} records read. Limit = $readLimit")
pollInLoop(readLimit, readSoFar + recordCount)
}
}

meter.mark()
logger.debug("Initializing transactions")
Original file line number Diff line number Diff line change
@@ -11,13 +11,21 @@ import akka.kafka.benchmarks.app.RunTestCommand
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
import org.apache.kafka.common.requests.IsolationLevel
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}
import org.apache.kafka.common.serialization.{
ByteArrayDeserializer,
ByteArraySerializer,
StringDeserializer,
StringSerializer
}

import scala.collection.JavaConverters._

case class KafkaTransactionTestFixture(sourceTopic: String, sinkTopic: String, msgCount: Int, groupId: String,
consumer: KafkaConsumer[Array[Byte], String],
producer: KafkaProducer[Array[Byte], String]) {
case class KafkaTransactionTestFixture(sourceTopic: String,
sinkTopic: String,
msgCount: Int,
groupId: String,
consumer: KafkaConsumer[Array[Byte], String],
producer: KafkaProducer[Array[Byte], String]) {
def close(): Unit = {
consumer.close()
producer.close()
@@ -31,31 +39,36 @@ object KafkaTransactionFixtures extends PerfFixtureHelpers {
KafkaTransactionTestFixture("sourceTopic", "sinkTopic", msgCount, "groupId", consumer = null, producer = null)
})

def initialize(c: RunTestCommand) = FixtureGen[KafkaTransactionTestFixture](c, msgCount => {
val sourceTopic = randomId()
fillTopic(c.kafkaHost, sourceTopic, msgCount)
val groupId = randomId()
val sinkTopic = randomId()

val consumerJavaProps = new java.util.Properties
consumerJavaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, c.kafkaHost)
consumerJavaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer])
consumerJavaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
consumerJavaProps.put(ConsumerConfig.CLIENT_ID_CONFIG, randomId())
consumerJavaProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
consumerJavaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
consumerJavaProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString.toLowerCase(Locale.ENGLISH))
val consumer = new KafkaConsumer[Array[Byte], String](consumerJavaProps)
consumer.subscribe(Set(sourceTopic).asJava)

val producerJavaProps = new java.util.Properties
producerJavaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer])
producerJavaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
producerJavaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, c.kafkaHost)
producerJavaProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true.toString)
producerJavaProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, randomId())
val producer = new KafkaProducer[Array[Byte], String](producerJavaProps)

KafkaTransactionTestFixture(sourceTopic, sinkTopic, msgCount, groupId, consumer, producer)
})
def initialize(c: RunTestCommand) =
FixtureGen[KafkaTransactionTestFixture](
c,
msgCount => {
val sourceTopic = randomId()
fillTopic(c.kafkaHost, sourceTopic, msgCount)
val groupId = randomId()
val sinkTopic = randomId()

val consumerJavaProps = new java.util.Properties
consumerJavaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, c.kafkaHost)
consumerJavaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer])
consumerJavaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
consumerJavaProps.put(ConsumerConfig.CLIENT_ID_CONFIG, randomId())
consumerJavaProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
consumerJavaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
consumerJavaProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString.toLowerCase(Locale.ENGLISH))
val consumer = new KafkaConsumer[Array[Byte], String](consumerJavaProps)
consumer.subscribe(Set(sourceTopic).asJava)

val producerJavaProps = new java.util.Properties
producerJavaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer])
producerJavaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
producerJavaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, c.kafkaHost)
producerJavaProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true.toString)
producerJavaProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, randomId())
val producer = new KafkaProducer[Array[Byte], String](producerJavaProps)

KafkaTransactionTestFixture(sourceTopic, sinkTopic, msgCount, groupId, consumer, producer)
}
)
}
Original file line number Diff line number Diff line change
@@ -28,27 +28,29 @@ private[benchmarks] trait PerfFixtureHelpers extends LazyLogging {
def initTopicAndProducer(kafkaHost: String, topic: String, msgCount: Int = 1): KafkaProducer[Array[Byte], String] = {
val producerJavaProps = new java.util.Properties
producerJavaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost)
val producer = new KafkaProducer[Array[Byte], String](producerJavaProps, new ByteArraySerializer, new StringSerializer)
val producer =
new KafkaProducer[Array[Byte], String](producerJavaProps, new ByteArraySerializer, new StringSerializer)
val lastElementStoredPromise = Promise[Unit]
val loggedStep = if (msgCount > logPercentStep) msgCount / (100 / logPercentStep) else 1
for (i <- 0L to msgCount.toLong) {
if (!lastElementStoredPromise.isCompleted) {
producer.send(new ProducerRecord[Array[Byte], String](topic, i.toString), new Callback {
override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
if (e == null) {
if (i % loggedStep == 0)
logger.info(s"Written $i elements to Kafka (${100 * i / msgCount}%)")
if (recordMetadata.offset() == msgCount - 1 && !lastElementStoredPromise.isCompleted)
lastElementStoredPromise.success(())
}
else {
if (!lastElementStoredPromise.isCompleted) {
e.printStackTrace()
lastElementStoredPromise.failure(e)
producer.send(
new ProducerRecord[Array[Byte], String](topic, i.toString),
new Callback {
override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit =
if (e == null) {
if (i % loggedStep == 0)
logger.info(s"Written $i elements to Kafka (${100 * i / msgCount}%)")
if (recordMetadata.offset() == msgCount - 1 && !lastElementStoredPromise.isCompleted)
lastElementStoredPromise.success(())
} else {
if (!lastElementStoredPromise.isCompleted) {
e.printStackTrace()
lastElementStoredPromise.failure(e)
}
}
}
}
})
)
}
}
val lastElementStoredFuture = lastElementStoredPromise.future
Original file line number Diff line number Diff line change
@@ -29,10 +29,11 @@ object ReactiveKafkaConsumerBenchmarks extends LazyLogging {
def consumePlainNoKafka(fixture: NonCommitableFixture, meter: Meter)(implicit mat: Materializer): Unit = {
logger.debug("Creating and starting a stream")
meter.mark()
val future = Source.repeat("dummy")
val future = Source
.repeat("dummy")
.take(fixture.msgCount.toLong)
.map {
msg => meter.mark(); msg
.map { msg =>
meter.mark(); msg
}
.runWith(Sink.ignore)
Await.result(future, atMost = streamingTimeout)
@@ -46,8 +47,8 @@ object ReactiveKafkaConsumerBenchmarks extends LazyLogging {
logger.debug("Creating and starting a stream")
val future = fixture.source
.take(fixture.msgCount.toLong)
.map {
msg => meter.mark(); msg
.map { msg =>
meter.mark(); msg
}
.runWith(Sink.ignore)
Await.result(future, atMost = streamingTimeout)
@@ -58,12 +59,13 @@ object ReactiveKafkaConsumerBenchmarks extends LazyLogging {
* Reads elements from Kafka source and commits a batch as soon as it's possible. Backpressures when batch max of
* size is accumulated.
*/
def consumerAtLeastOnceBatched(batchSize: Int)(fixture: CommitableFixture, meter: Meter)(implicit mat: Materializer): Unit = {
def consumerAtLeastOnceBatched(batchSize: Int)(fixture: CommitableFixture,
meter: Meter)(implicit mat: Materializer): Unit = {
logger.debug("Creating and starting a stream")
val promise = Promise[Unit]
val control = fixture.source
.map {
msg => msg.committableOffset
.map { msg =>
msg.committableOffset
}
.batch(batchSize.toLong, first => CommittableOffsetBatch(first)) { (batch, elem) =>
meter.mark()
Original file line number Diff line number Diff line change
@@ -25,26 +25,36 @@ object ReactiveKafkaConsumerFixtures extends PerfFixtureHelpers {
.withClientId(randomId())
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

def plainSources(c: RunTestCommand)(implicit actorSystem: ActorSystem) = FixtureGen[ReactiveKafkaConsumerTestFixture[ConsumerRecord[Array[Byte], String]]](c, msgCount => {
val topic = randomId()
fillTopic(c.kafkaHost, topic, msgCount)
val settings = createConsumerSettings(c.kafkaHost)
val source = Consumer.plainSource(settings, Subscriptions.topics(topic))
ReactiveKafkaConsumerTestFixture(topic, msgCount, source)
})

def commitableSources(c: RunTestCommand)(implicit actorSystem: ActorSystem) = FixtureGen[ReactiveKafkaConsumerTestFixture[CommittableMessage[Array[Byte], String]]](c, msgCount => {
val topic = randomId()
fillTopic(c.kafkaHost, topic, msgCount)
val settings = createConsumerSettings(c.kafkaHost)
val source = Consumer.committableSource(settings, Subscriptions.topics(topic))
ReactiveKafkaConsumerTestFixture(topic, msgCount, source)
})

def noopFixtureGen(c: RunTestCommand) = FixtureGen[ReactiveKafkaConsumerTestFixture[ConsumerRecord[Array[Byte], String]]](
c, msgCount => {
ReactiveKafkaConsumerTestFixture("topic", msgCount, null)
}
)
def plainSources(c: RunTestCommand)(implicit actorSystem: ActorSystem) =
FixtureGen[ReactiveKafkaConsumerTestFixture[ConsumerRecord[Array[Byte], String]]](
c,
msgCount => {
val topic = randomId()
fillTopic(c.kafkaHost, topic, msgCount)
val settings = createConsumerSettings(c.kafkaHost)
val source = Consumer.plainSource(settings, Subscriptions.topics(topic))
ReactiveKafkaConsumerTestFixture(topic, msgCount, source)
}
)

def commitableSources(c: RunTestCommand)(implicit actorSystem: ActorSystem) =
FixtureGen[ReactiveKafkaConsumerTestFixture[CommittableMessage[Array[Byte], String]]](
c,
msgCount => {
val topic = randomId()
fillTopic(c.kafkaHost, topic, msgCount)
val settings = createConsumerSettings(c.kafkaHost)
val source = Consumer.committableSource(settings, Subscriptions.topics(topic))
ReactiveKafkaConsumerTestFixture(topic, msgCount, source)
}
)

def noopFixtureGen(c: RunTestCommand) =
FixtureGen[ReactiveKafkaConsumerTestFixture[ConsumerRecord[Array[Byte], String]]](
c,
msgCount => {
ReactiveKafkaConsumerTestFixture("topic", msgCount, null)
}
)

}
Original file line number Diff line number Diff line change
@@ -32,17 +32,23 @@ object ReactiveKafkaProducerFixtures extends PerfFixtureHelpers {
.withBootstrapServers(kafkaHost)
.withParallelism(Parallelism)

def flowFixture(c: RunTestCommand)(implicit actorSystem: ActorSystem) = FixtureGen[ReactiveKafkaProducerTestFixture[Int]](c, msgCount => {
val flow: FlowType[Int] = Producer.flexiFlow(createProducerSettings(c.kafkaHost))
val topic = randomId()
initTopicAndProducer(c.kafkaHost, topic)
ReactiveKafkaProducerTestFixture(topic, msgCount, flow)
})

def noopFixtureGen(c: RunTestCommand) = FixtureGen[ReactiveKafkaConsumerTestFixture[ConsumerRecord[Array[Byte], String]]](
c, msgCount => {
ReactiveKafkaConsumerTestFixture("topic", msgCount, null)
}
)
def flowFixture(c: RunTestCommand)(implicit actorSystem: ActorSystem) =
FixtureGen[ReactiveKafkaProducerTestFixture[Int]](
c,
msgCount => {
val flow: FlowType[Int] = Producer.flexiFlow(createProducerSettings(c.kafkaHost))
val topic = randomId()
initTopicAndProducer(c.kafkaHost, topic)
ReactiveKafkaProducerTestFixture(topic, msgCount, flow)
}
)

def noopFixtureGen(c: RunTestCommand) =
FixtureGen[ReactiveKafkaConsumerTestFixture[ConsumerRecord[Array[Byte], String]]](
c,
msgCount => {
ReactiveKafkaConsumerTestFixture("topic", msgCount, null)
}
)

}
Original file line number Diff line number Diff line change
@@ -26,7 +26,8 @@ object ReactiveKafkaTransactionBenchmarks extends LazyLogging {
/**
* Process records in a consume-transform-produce transactional workflow and commit every interval.
*/
def consumeTransformProduceTransaction(fixture: TransactionFixture, meter: Meter)(implicit mat: Materializer): Unit = {
def consumeTransformProduceTransaction(fixture: TransactionFixture,
meter: Meter)(implicit mat: Materializer): Unit = {
logger.debug("Creating and starting a stream")
val msgCount = fixture.msgCount
val sinkTopic = fixture.sinkTopic
@@ -38,21 +39,20 @@ object ReactiveKafkaTransactionBenchmarks extends LazyLogging {

val control = source
.map { msg =>
ProducerMessage.Message(
new ProducerRecord[Array[Byte], String](sinkTopic, msg.record.value()), msg.partitionOffset)
ProducerMessage.Message(new ProducerRecord[Array[Byte], String](sinkTopic, msg.record.value()),
msg.partitionOffset)
}
.via(fixture.flow)
.toMat(
Sink.foreach {
case result: Result[Key, Val, PassThrough] =>
val offset = result.offset
if (result.offset % loggedStep == 0)
logger.info(s"Transformed $offset elements to Kafka (${100 * offset / msgCount}%)")
if (result.offset >= fixture.msgCount - 1)
promise.complete(Success(()))
case other: Results[Key, Val, PassThrough] =>
.toMat(Sink.foreach {
case result: Result[Key, Val, PassThrough] =>
val offset = result.offset
if (result.offset % loggedStep == 0)
logger.info(s"Transformed $offset elements to Kafka (${100 * offset / msgCount}%)")
if (result.offset >= fixture.msgCount - 1)
promise.complete(Success(()))
})(Keep.left)
case other: Results[Key, Val, PassThrough] =>
promise.complete(Success(()))
})(Keep.left)
.run()

Await.result(promise.future, streamingTimeout)
Original file line number Diff line number Diff line change
@@ -15,13 +15,20 @@ import akka.kafka.scaladsl.Transactional
import akka.kafka.{ConsumerMessage, ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.scaladsl.{Flow, Source}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}
import org.apache.kafka.common.serialization.{
ByteArrayDeserializer,
ByteArraySerializer,
StringDeserializer,
StringSerializer
}

import scala.concurrent.duration.FiniteDuration

case class ReactiveKafkaTransactionTestFixture[SOut, FIn, FOut](sourceTopic: String, sinkTopic: String, msgCount: Int,
source: Source[SOut, Control],
flow: Flow[FIn, FOut, NotUsed])
case class ReactiveKafkaTransactionTestFixture[SOut, FIn, FOut](sourceTopic: String,
sinkTopic: String,
msgCount: Int,
source: Source[SOut, Control],
flow: Flow[FIn, FOut, NotUsed])

object ReactiveKafkaTransactionFixtures extends PerfFixtureHelpers {
type Key = Array[Byte]
@@ -38,24 +45,34 @@ object ReactiveKafkaTransactionFixtures extends PerfFixtureHelpers {
.withClientId(randomId())
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

private def createProducerSettings(kafkaHost: String)(implicit actorSystem: ActorSystem): ProducerSettings[Array[Byte], String] =
private def createProducerSettings(
kafkaHost: String
)(implicit actorSystem: ActorSystem): ProducerSettings[Array[Byte], String] =
ProducerSettings(actorSystem, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(kafkaHost)

def transactionalSourceAndSink(c: RunTestCommand, commitInterval: FiniteDuration)(implicit actorSystem: ActorSystem) =
FixtureGen[ReactiveKafkaTransactionTestFixture[KTransactionMessage, KProducerMessage, KResult]](c, msgCount => {
val sourceTopic = randomId()
fillTopic(c.kafkaHost, sourceTopic, msgCount)
val sinkTopic = randomId()
FixtureGen[ReactiveKafkaTransactionTestFixture[KTransactionMessage, KProducerMessage, KResult]](
c,
msgCount => {
val sourceTopic = randomId()
fillTopic(c.kafkaHost, sourceTopic, msgCount)
val sinkTopic = randomId()

val consumerSettings = createConsumerSettings(c.kafkaHost)
val source: Source[KTransactionMessage, Control] = Transactional.source(consumerSettings, Subscriptions.topics(sourceTopic))
val consumerSettings = createConsumerSettings(c.kafkaHost)
val source: Source[KTransactionMessage, Control] =
Transactional.source(consumerSettings, Subscriptions.topics(sourceTopic))

val producerSettings = createProducerSettings(c.kafkaHost).withEosCommitInterval(commitInterval)
val flow: Flow[KProducerMessage, KResult, NotUsed] = Transactional.flow(producerSettings, randomId())
val producerSettings = createProducerSettings(c.kafkaHost).withEosCommitInterval(commitInterval)
val flow: Flow[KProducerMessage, KResult, NotUsed] = Transactional.flow(producerSettings, randomId())

ReactiveKafkaTransactionTestFixture[KTransactionMessage, KProducerMessage, KResult](sourceTopic, sinkTopic, msgCount, source, flow)
})
ReactiveKafkaTransactionTestFixture[KTransactionMessage, KProducerMessage, KResult](sourceTopic,
sinkTopic,
msgCount,
source,
flow)
}
)

def noopFixtureGen(c: RunTestCommand) =
FixtureGen[ReactiveKafkaTransactionTestFixture[KTransactionMessage, KProducerMessage, KResult]](c, msgCount => {
3 changes: 1 addition & 2 deletions benchmarks/src/main/scala/akka.kafka.benchmarks/Timed.scala
Original file line number Diff line number Diff line change
@@ -18,13 +18,12 @@ object Timed extends LazyLogging {

implicit val ec = ExecutionContext.fromExecutor(new ForkJoinPool)

def reporter(metricRegistry: MetricRegistry): ScheduledReporter = {
def reporter(metricRegistry: MetricRegistry): ScheduledReporter =
Slf4jReporter
.forRegistry(metricRegistry)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build()
}

def runPerfTest[F](command: RunTestCommand, fixtureGen: FixtureGen[F], testBody: (F, Meter) => Unit): Unit = {
val name = command.testName
70 changes: 33 additions & 37 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
enablePlugins(AutomateHeaderPlugin)

import scalariform.formatter.preferences._

name := "akka-stream-kafka"

val akkaVersion = "2.5.13"
@@ -27,8 +25,8 @@ val coreDependencies = Seq(
"ch.qos.logback" % "logback-classic" % "1.2.3" % Test,
"org.slf4j" % "log4j-over-slf4j" % "1.7.25" % Test,
"org.mockito" % "mockito-core" % "2.15.0" % Test,
"net.manub" %% "scalatest-embedded-kafka" % "1.0.0" % Test exclude("log4j", "log4j"),
"org.apache.kafka" %% "kafka" % kafkaVersion % Test exclude("org.slf4j", "slf4j-log4j12")
"net.manub" %% "scalatest-embedded-kafka" % "1.0.0" % Test exclude ("log4j", "log4j"),
"org.apache.kafka" %% "kafka" % kafkaVersion % Test exclude ("org.slf4j", "slf4j-log4j12")
)

resolvers in ThisBuild ++= Seq(Resolver.bintrayRepo("manub", "maven"))
@@ -45,21 +43,21 @@ val commonSettings = Seq(
homepage := Some(url("https://github.com/akka/reactive-kafka")),
scmInfo := Some(ScmInfo(url("https://github.com/akka/reactive-kafka"), "git@github.com:akka/reactive-kafka.git")),
developers += Developer("contributors",
"Contributors",
"https://gitter.im/akka/dev",
url("https://github.com/akka/reactive-kafka/graphs/contributors")),
"Contributors",
"https://gitter.im/akka/dev",
url("https://github.com/akka/reactive-kafka/graphs/contributors")),
startYear := Some(2014),
licenses := Seq("Apache-2.0" -> url("http://opensource.org/licenses/Apache-2.0")),
crossScalaVersions := Seq("2.12.6", "2.11.12"),
scalaVersion := crossScalaVersions.value.head,
crossVersion := CrossVersion.binary,
scalariformAutoformat := true,
javacOptions ++= Seq(
"-Xlint:deprecation"
),
scalacOptions ++= Seq(
"-deprecation",
"-encoding", "UTF-8", // yes, this is 2 args
"-encoding",
"UTF-8", // yes, this is 2 args
"-feature",
"-unchecked",
"-Xlint",
@@ -70,31 +68,28 @@ val commonSettings = Seq(
),
testOptions += Tests.Argument("-oD"),
testOptions += Tests.Argument(TestFrameworks.JUnit, "-q", "-v"),
scalariformPreferences := scalariformPreferences.value
.setPreference(DoubleIndentConstructorArguments, true)
.setPreference(PreserveSpaceBeforeArguments, true)
.setPreference(CompactControlReadability, true)
.setPreference(DanglingCloseParenthesis, Preserve)
.setPreference(NewlineAtEndOfFile, true)
.setPreference(SpacesAroundMultiImports, false),
headerLicense := Some(HeaderLicense.Custom(
"""|Copyright (C) 2014 - 2016 Softwaremill <http://softwaremill.com>
|Copyright (C) 2016 - 2018 Lightbend Inc. <http://www.lightbend.com>
|""".stripMargin
)),
scalafmtOnCompile := true,
headerLicense := Some(
HeaderLicense.Custom(
"""|Copyright (C) 2014 - 2016 Softwaremill <http://softwaremill.com>
|Copyright (C) 2016 - 2018 Lightbend Inc. <http://www.lightbend.com>
|""".stripMargin
)
),
bintrayOrganization := Some("akka"),
bintrayPackage := "alpakka-kafka",
bintrayRepository := (if (isSnapshot.value) "snapshots" else "maven"),
)

lazy val `alpakka-kafka` =
project.in(file("."))
project
.in(file("."))
.settings(commonSettings)
.settings(
skip in publish := true,
dockerComposeIgnore := true,
onLoadMessage :=
"""
"""
|** Welcome to the Alpakka Kafka connector! **
|
|The build has three modules
@@ -118,8 +113,8 @@ lazy val core = project
.settings(
name := "akka-stream-kafka",
AutomaticModuleName.settings("akka.stream.alpakka.kafka"),
Test/fork := true,
Test/parallelExecution := false,
Test / fork := true,
Test / parallelExecution := false,
libraryDependencies ++= commonDependencies ++ coreDependencies ++ Seq(
"com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % "it",
"org.scalatest" %% "scalatest" % scalatestVersion % "it",
@@ -130,7 +125,8 @@ lazy val core = project
.settings(Defaults.itSettings)
.configs(IntegrationTest)

lazy val docs = project.in(file("docs"))
lazy val docs = project
.in(file("docs"))
.enablePlugins(ParadoxPlugin)
.dependsOn(core)
.settings(commonSettings)
@@ -141,18 +137,18 @@ lazy val docs = project.in(file("docs"))
paradoxNavigationDepth := 3,
paradoxGroups := Map("Language" -> Seq("Java", "Scala")),
paradoxProperties ++= Map(
"version" -> version.value,
"akkaVersion" -> akkaVersion,
"kafkaVersion" -> kafkaVersion,
"scalaVersion" -> scalaVersion.value,
"scalaBinaryVersion" -> scalaBinaryVersion.value,
"extref.akka-docs.base_url" -> s"https://doc.akka.io/docs/akka/$akkaVersion/%s",
"extref.kafka-docs.base_url" -> s"https://kafka.apache.org/$kafkaVersionForDocs/documentation/%s",
"scaladoc.scala.base_url" -> s"https://www.scala-lang.org/api/current/",
"scaladoc.akka.base_url" -> s"https://doc.akka.io/api/akka/$akkaVersion",
"scaladoc.akka.kafka.base_url" -> s"https://doc.akka.io/api/akka-stream-kafka/${version.value}/",
"version" -> version.value,
"akkaVersion" -> akkaVersion,
"kafkaVersion" -> kafkaVersion,
"scalaVersion" -> scalaVersion.value,
"scalaBinaryVersion" -> scalaBinaryVersion.value,
"extref.akka-docs.base_url" -> s"https://doc.akka.io/docs/akka/$akkaVersion/%s",
"extref.kafka-docs.base_url" -> s"https://kafka.apache.org/$kafkaVersionForDocs/documentation/%s",
"scaladoc.scala.base_url" -> s"https://www.scala-lang.org/api/current/",
"scaladoc.akka.base_url" -> s"https://doc.akka.io/api/akka/$akkaVersion",
"scaladoc.akka.kafka.base_url" -> s"https://doc.akka.io/api/akka-stream-kafka/${version.value}/",
"scaladoc.com.typesafe.config.base_url" -> s"https://lightbend.github.io/config/latest/api/",
"javadoc.org.apache.kafka.base_url" -> s"https://kafka.apache.org/$kafkaVersionForDocs/javadoc/"
"javadoc.org.apache.kafka.base_url" -> s"https://kafka.apache.org/$kafkaVersionForDocs/javadoc/"
),
paradoxLocalApiKey := "scaladoc.akka.kafka.base_url",
paradoxLocalApiDir := (core / Compile / doc).value,
6 changes: 5 additions & 1 deletion core/src/main/scala/akka/kafka/ConsumerMessage.scala
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@ import scala.concurrent.Future
* [[scaladsl.Consumer]].
*/
object ConsumerMessage {

/**
* Output element of `committableSource`.
* The offset can be committed via the included [[CommittableOffset]].
@@ -92,7 +93,9 @@ object ConsumerMessage {
* Create an offset batch out of a list of offsets.
*/
def apply(offsets: Seq[CommittableOffset]): CommittableOffsetBatch =
offsets.foldLeft(CommittableOffsetBatch.empty) { (batch, elem) => batch.updated(elem) }
offsets.foldLeft(CommittableOffsetBatch.empty) { (batch, elem) =>
batch.updated(elem)
}
}

val emptyCommittableOffsetBatch: CommittableOffsetBatch = CommittableOffsetBatch.empty
@@ -118,6 +121,7 @@ object ConsumerMessage {
* the [[CommittableOffsetBatch#empty empty]] batch.
*/
trait CommittableOffsetBatch extends Committable {

/**
* Add/overwrite an offset position for the given groupId, topic, partition.
*/
128 changes: 77 additions & 51 deletions core/src/main/scala/akka/kafka/ConsumerSettings.scala
Original file line number Diff line number Diff line change
@@ -26,9 +26,9 @@ object ConsumerSettings {
* Key or value deserializer can be passed explicitly or retrieved from configuration.
*/
def apply[K, V](
system: ActorSystem,
keyDeserializer: Option[Deserializer[K]],
valueDeserializer: Option[Deserializer[V]]
system: ActorSystem,
keyDeserializer: Option[Deserializer[K]],
valueDeserializer: Option[Deserializer[V]]
): ConsumerSettings[K, V] = {
val config = system.settings.config.getConfig("akka.kafka.consumer")
apply(config, keyDeserializer, valueDeserializer)
@@ -40,19 +40,19 @@ object ConsumerSettings {
* Key or value deserializer can be passed explicitly or retrieved from configuration.
*/
def apply[K, V](
config: Config,
keyDeserializer: Option[Deserializer[K]],
valueDeserializer: Option[Deserializer[V]]
config: Config,
keyDeserializer: Option[Deserializer[K]],
valueDeserializer: Option[Deserializer[V]]
): ConsumerSettings[K, V] = {
val properties = ConfigSettings.parseKafkaClientsProperties(config.getConfig("kafka-clients"))
require(
keyDeserializer != null &&
(keyDeserializer.isDefined || properties.contains(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)),
(keyDeserializer.isDefined || properties.contains(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)),
"Key deserializer should be defined or declared in configuration"
)
require(
valueDeserializer != null &&
(valueDeserializer.isDefined || properties.contains(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)),
(valueDeserializer.isDefined || properties.contains(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)),
"Value deserializer should be defined or declared in configuration"
)
val pollInterval = config.getDuration("poll-interval", TimeUnit.MILLISECONDS).millis
@@ -67,9 +67,23 @@ object ConsumerSettings {
val dispatcher = config.getString("use-dispatcher")
val wakeupDebug = config.getBoolean("wakeup-debug")
val waitClosePartition = config.getDuration("wait-close-partition", TimeUnit.MILLISECONDS).millis
new ConsumerSettings[K, V](properties, keyDeserializer, valueDeserializer,
pollInterval, pollTimeout, stopTimeout, closeTimeout, commitTimeout, wakeupTimeout, maxWakeups,
commitRefreshInterval, dispatcher, commitTimeWarning, wakeupDebug, waitClosePartition)
new ConsumerSettings[K, V](
properties,
keyDeserializer,
valueDeserializer,
pollInterval,
pollTimeout,
stopTimeout,
closeTimeout,
commitTimeout,
wakeupTimeout,
maxWakeups,
commitRefreshInterval,
dispatcher,
commitTimeWarning,
wakeupDebug,
waitClosePartition
)
}

/**
@@ -78,35 +92,33 @@ object ConsumerSettings {
* Key and value serializer must be passed explicitly.
*/
def apply[K, V](
system: ActorSystem,
keyDeserializer: Deserializer[K],
valueDeserializer: Deserializer[V]
): ConsumerSettings[K, V] = {
system: ActorSystem,
keyDeserializer: Deserializer[K],
valueDeserializer: Deserializer[V]
): ConsumerSettings[K, V] =
apply(system, Option(keyDeserializer), Option(valueDeserializer))
}

/**
* Create settings from a configuration with the same layout as
* the default configuration `akka.kafka.consumer`.
* Key and value serializer must be passed explicitly.
*/
def apply[K, V](
config: Config,
keyDeserializer: Deserializer[K],
valueDeserializer: Deserializer[V]
): ConsumerSettings[K, V] = {
config: Config,
keyDeserializer: Deserializer[K],
valueDeserializer: Deserializer[V]
): ConsumerSettings[K, V] =
apply(config, Option(keyDeserializer), Option(valueDeserializer))
}

/**
* Java API: Create settings from the default configuration
* `akka.kafka.consumer`.
* Key or value deserializer can be passed explicitly or retrieved from configuration.
*/
def create[K, V](
system: ActorSystem,
keyDeserializer: Optional[Deserializer[K]],
valueDeserializer: Optional[Deserializer[V]]
system: ActorSystem,
keyDeserializer: Optional[Deserializer[K]],
valueDeserializer: Optional[Deserializer[V]]
): ConsumerSettings[K, V] =
apply(system, keyDeserializer.asScala, valueDeserializer.asScala)

@@ -116,9 +128,9 @@ object ConsumerSettings {
* Key or value deserializer can be passed explicitly or retrieved from configuration.
*/
def create[K, V](
config: Config,
keyDeserializer: Optional[Deserializer[K]],
valueDeserializer: Optional[Deserializer[V]]
config: Config,
keyDeserializer: Optional[Deserializer[K]],
valueDeserializer: Optional[Deserializer[V]]
): ConsumerSettings[K, V] =
apply(config, keyDeserializer.asScala, valueDeserializer.asScala)

@@ -128,9 +140,9 @@ object ConsumerSettings {
* Key and value serializer must be passed explicitly.
*/
def create[K, V](
system: ActorSystem,
keyDeserializer: Deserializer[K],
valueDeserializer: Deserializer[V]
system: ActorSystem,
keyDeserializer: Deserializer[K],
valueDeserializer: Deserializer[V]
): ConsumerSettings[K, V] =
apply(system, keyDeserializer, valueDeserializer)

@@ -140,9 +152,9 @@ object ConsumerSettings {
* Key and value serializer must be passed explicitly.
*/
def create[K, V](
config: Config,
keyDeserializer: Deserializer[K],
valueDeserializer: Deserializer[V]
config: Config,
keyDeserializer: Deserializer[K],
valueDeserializer: Deserializer[V]
): ConsumerSettings[K, V] =
apply(config, keyDeserializer, valueDeserializer)
}
@@ -250,25 +262,39 @@ class ConsumerSettings[K, V](
copy(waitClosePartition = waitClosePartition)

private def copy(
properties: Map[String, String] = properties,
keyDeserializer: Option[Deserializer[K]] = keyDeserializerOpt,
valueDeserializer: Option[Deserializer[V]] = valueDeserializerOpt,
pollInterval: FiniteDuration = pollInterval,
pollTimeout: FiniteDuration = pollTimeout,
stopTimeout: FiniteDuration = stopTimeout,
closeTimeout: FiniteDuration = closeTimeout,
commitTimeout: FiniteDuration = commitTimeout,
commitTimeWarning: FiniteDuration = commitTimeWarning,
wakeupTimeout: FiniteDuration = wakeupTimeout,
maxWakeups: Int = maxWakeups,
commitRefreshInterval: Duration = commitRefreshInterval,
dispatcher: String = dispatcher,
wakeupDebug: Boolean = wakeupDebug,
waitClosePartition: FiniteDuration = waitClosePartition
properties: Map[String, String] = properties,
keyDeserializer: Option[Deserializer[K]] = keyDeserializerOpt,
valueDeserializer: Option[Deserializer[V]] = valueDeserializerOpt,
pollInterval: FiniteDuration = pollInterval,
pollTimeout: FiniteDuration = pollTimeout,
stopTimeout: FiniteDuration = stopTimeout,
closeTimeout: FiniteDuration = closeTimeout,
commitTimeout: FiniteDuration = commitTimeout,
commitTimeWarning: FiniteDuration = commitTimeWarning,
wakeupTimeout: FiniteDuration = wakeupTimeout,
maxWakeups: Int = maxWakeups,
commitRefreshInterval: Duration = commitRefreshInterval,
dispatcher: String = dispatcher,
wakeupDebug: Boolean = wakeupDebug,
waitClosePartition: FiniteDuration = waitClosePartition
): ConsumerSettings[K, V] =
new ConsumerSettings[K, V](properties, keyDeserializer, valueDeserializer,
pollInterval, pollTimeout, stopTimeout, closeTimeout, commitTimeout, wakeupTimeout,
maxWakeups, commitRefreshInterval, dispatcher, commitTimeWarning, wakeupDebug, waitClosePartition)
new ConsumerSettings[K, V](
properties,
keyDeserializer,
valueDeserializer,
pollInterval,
pollTimeout,
stopTimeout,
closeTimeout,
commitTimeout,
wakeupTimeout,
maxWakeups,
commitRefreshInterval,
dispatcher,
commitTimeWarning,
wakeupDebug,
waitClosePartition
)

/**
* Create a `KafkaConsumer` instance from the settings.
71 changes: 55 additions & 16 deletions core/src/main/scala/akka/kafka/Metadata.scala
Original file line number Diff line number Diff line change
@@ -30,12 +30,19 @@ object Metadata {
* [[org.apache.kafka.clients.consumer.KafkaConsumer#listTopics()]]
*/
case object ListTopics extends Request with NoSerializationVerificationNeeded
final case class Topics(response: Try[Map[String, List[PartitionInfo]]]) extends Response with NoSerializationVerificationNeeded {
final case class Topics(response: Try[Map[String, List[PartitionInfo]]])
extends Response
with NoSerializationVerificationNeeded {

/**
* Java API
*/
def getResponse: Optional[java.util.Map[String, java.util.List[PartitionInfo]]] =
response.map { m => Optional.of(m.mapValues(_.asJava).asJava) }.getOrElse(Optional.empty())
response
.map { m =>
Optional.of(m.mapValues(_.asJava).asJava)
}
.getOrElse(Optional.empty())
}

/**
@@ -48,13 +55,15 @@ object Metadata {
* [[org.apache.kafka.clients.consumer.KafkaConsumer#partitionsFor()]]
*/
final case class GetPartitionsFor(topic: String) extends Request with NoSerializationVerificationNeeded
final case class PartitionsFor(response: Try[List[PartitionInfo]]) extends Response with NoSerializationVerificationNeeded {
final case class PartitionsFor(response: Try[List[PartitionInfo]])
extends Response
with NoSerializationVerificationNeeded {

/**
* Java API
*/
def getResponse: Optional[java.util.List[PartitionInfo]] = {
def getResponse: Optional[java.util.List[PartitionInfo]] =
response.map(i => Optional.of(i.asJava)).getOrElse(Optional.empty())
}
}

/**
@@ -68,13 +77,22 @@ object Metadata {
*
* Warning: KafkaConsumer documentation states that this method may block indefinitely if the partition does not exist.
*/
final case class GetBeginningOffsets(partitions: Set[TopicPartition]) extends Request with NoSerializationVerificationNeeded
final case class BeginningOffsets(response: Try[Map[TopicPartition, Long]]) extends Response with NoSerializationVerificationNeeded {
final case class GetBeginningOffsets(partitions: Set[TopicPartition])
extends Request
with NoSerializationVerificationNeeded
final case class BeginningOffsets(response: Try[Map[TopicPartition, Long]])
extends Response
with NoSerializationVerificationNeeded {

/**
* Java API
*/
def getResponse: Optional[java.util.Map[TopicPartition, java.lang.Long]] =
response.map { m => Optional.of(m.mapValues(Long.box).asJava) }.getOrElse(Optional.empty())
response
.map { m =>
Optional.of(m.mapValues(Long.box).asJava)
}
.getOrElse(Optional.empty())
}

/**
@@ -83,20 +101,28 @@ object Metadata {
*
* Warning: KafkaConsumer documentation states that this method may block indefinitely if the partition does not exist.
*/
def createGetBeginningOffsets(partitions: java.util.Set[TopicPartition]): GetBeginningOffsets = GetBeginningOffsets(partitions.asScala.toSet)
def createGetBeginningOffsets(partitions: java.util.Set[TopicPartition]): GetBeginningOffsets =
GetBeginningOffsets(partitions.asScala.toSet)

/**
* [[org.apache.kafka.clients.consumer.KafkaConsumer#endOffsets()]]
*
* Warning: KafkaConsumer documentation states that this method may block indefinitely if the partition does not exist.
*/
final case class GetEndOffsets(partitions: Set[TopicPartition]) extends Request with NoSerializationVerificationNeeded
final case class EndOffsets(response: Try[Map[TopicPartition, Long]]) extends Response with NoSerializationVerificationNeeded {
final case class EndOffsets(response: Try[Map[TopicPartition, Long]])
extends Response
with NoSerializationVerificationNeeded {

/**
* Java API
*/
def getResponse: Optional[java.util.Map[TopicPartition, java.lang.Long]] =
response.map { m => Optional.of(m.mapValues(Long.box).asJava) }.getOrElse(Optional.empty())
response
.map { m =>
Optional.of(m.mapValues(Long.box).asJava)
}
.getOrElse(Optional.empty())
}

/**
@@ -105,20 +131,30 @@ object Metadata {
*
* Warning: KafkaConsumer documentation states that this method may block indefinitely if the partition does not exist.
*/
def createGetEndOffsets(paritions: java.util.Set[TopicPartition]): GetEndOffsets = GetEndOffsets(paritions.asScala.toSet)
def createGetEndOffsets(paritions: java.util.Set[TopicPartition]): GetEndOffsets =
GetEndOffsets(paritions.asScala.toSet)

/**
* [[org.apache.kafka.clients.consumer.KafkaConsumer#offsetsForTimes()]]
*
* Warning: KafkaConsumer documentation states that this method may block indefinitely if the partition does not exist.
*/
final case class GetOffsetsForTimes(timestampsToSearch: Map[TopicPartition, Long]) extends Request with NoSerializationVerificationNeeded
final case class OffsetsForTimes(response: Try[Map[TopicPartition, OffsetAndTimestamp]]) extends Response with NoSerializationVerificationNeeded {
final case class GetOffsetsForTimes(timestampsToSearch: Map[TopicPartition, Long])
extends Request
with NoSerializationVerificationNeeded
final case class OffsetsForTimes(response: Try[Map[TopicPartition, OffsetAndTimestamp]])
extends Response
with NoSerializationVerificationNeeded {

/**
* Java API
*/
def getResponse: Optional[java.util.Map[TopicPartition, OffsetAndTimestamp]] =
response.map { m => Optional.of(m.asJava) }.getOrElse(Optional.empty())
response
.map { m =>
Optional.of(m.asJava)
}
.getOrElse(Optional.empty())
}

/**
@@ -134,7 +170,10 @@ object Metadata {
* [[org.apache.kafka.clients.consumer.KafkaConsumer#committed()]]
*/
final case class GetCommittedOffset(partition: TopicPartition) extends Request with NoSerializationVerificationNeeded
final case class CommittedOffset(response: Try[OffsetAndMetadata]) extends Response with NoSerializationVerificationNeeded {
final case class CommittedOffset(response: Try[OffsetAndMetadata])
extends Response
with NoSerializationVerificationNeeded {

/**
* Java API
*/
2 changes: 1 addition & 1 deletion core/src/main/scala/akka/kafka/ProducerMessage.scala
Original file line number Diff line number Diff line change
@@ -141,6 +141,6 @@ object ProducerMessage {
* through the flow.
*/
final case class PassThroughResult[K, V, PassThrough] private (passThrough: PassThrough)
extends Results[K, V, PassThrough]
extends Results[K, V, PassThrough]

}
87 changes: 48 additions & 39 deletions core/src/main/scala/akka/kafka/ProducerSettings.scala
Original file line number Diff line number Diff line change
@@ -26,9 +26,9 @@ object ProducerSettings {
* Key or value serializer can be passed explicitly or retrieved from configuration.
*/
def apply[K, V](
system: ActorSystem,
keySerializer: Option[Serializer[K]],
valueSerializer: Option[Serializer[V]]
system: ActorSystem,
keySerializer: Option[Serializer[K]],
valueSerializer: Option[Serializer[V]]
): ProducerSettings[K, V] =
apply(system.settings.config.getConfig("akka.kafka.producer"), keySerializer, valueSerializer)

@@ -38,27 +38,32 @@ object ProducerSettings {
* Key or value serializer can be passed explicitly or retrieved from configuration.
*/
def apply[K, V](
config: Config,
keySerializer: Option[Serializer[K]],
valueSerializer: Option[Serializer[V]]
config: Config,
keySerializer: Option[Serializer[K]],
valueSerializer: Option[Serializer[V]]
): ProducerSettings[K, V] = {
val properties = ConfigSettings.parseKafkaClientsProperties(config.getConfig("kafka-clients"))
require(
keySerializer != null &&
(keySerializer.isDefined || properties.contains(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)),
(keySerializer.isDefined || properties.contains(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)),
"Key serializer should be defined or declared in configuration"
)
require(
valueSerializer != null &&
(valueSerializer.isDefined || properties.contains(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)),
(valueSerializer.isDefined || properties.contains(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)),
"Value serializer should be defined or declared in configuration"
)
val closeTimeout = config.getDuration("close-timeout", TimeUnit.MILLISECONDS).millis
val parallelism = config.getInt("parallelism")
val dispatcher = config.getString("use-dispatcher")
val eosCommitInterval = config.getDuration("eos-commit-interval", TimeUnit.MILLISECONDS).millis
new ProducerSettings[K, V](properties, keySerializer, valueSerializer, closeTimeout, parallelism, dispatcher,
eosCommitInterval)
new ProducerSettings[K, V](properties,
keySerializer,
valueSerializer,
closeTimeout,
parallelism,
dispatcher,
eosCommitInterval)
}

/**
@@ -67,9 +72,9 @@ object ProducerSettings {
* Key and value serializer must be passed explicitly.
*/
def apply[K, V](
system: ActorSystem,
keySerializer: Serializer[K],
valueSerializer: Serializer[V]
system: ActorSystem,
keySerializer: Serializer[K],
valueSerializer: Serializer[V]
): ProducerSettings[K, V] =
apply(system, Option(keySerializer), Option(valueSerializer))

@@ -79,22 +84,21 @@ object ProducerSettings {
* Key and value serializer must be passed explicitly.
*/
def apply[K, V](
config: Config,
keySerializer: Serializer[K],
valueSerializer: Serializer[V]
): ProducerSettings[K, V] = {
config: Config,
keySerializer: Serializer[K],
valueSerializer: Serializer[V]
): ProducerSettings[K, V] =
apply(config, Option(keySerializer), Option(valueSerializer))
}

/**
* Java API: Create settings from the default configuration
* `akka.kafka.producer`.
* Key or value serializer can be passed explicitly or retrieved from configuration.
*/
def create[K, V](
system: ActorSystem,
keySerializer: Optional[Serializer[K]],
valueSerializer: Optional[Serializer[V]]
system: ActorSystem,
keySerializer: Optional[Serializer[K]],
valueSerializer: Optional[Serializer[V]]
): ProducerSettings[K, V] =
apply(system, keySerializer.asScala, valueSerializer.asScala)

@@ -104,9 +108,9 @@ object ProducerSettings {
* Key or value serializer can be passed explicitly or retrieved from configuration.
*/
def create[K, V](
config: Config,
keySerializer: Optional[Serializer[K]],
valueSerializer: Optional[Serializer[V]]
config: Config,
keySerializer: Optional[Serializer[K]],
valueSerializer: Optional[Serializer[V]]
): ProducerSettings[K, V] =
apply(config, keySerializer.asScala, valueSerializer.asScala)

@@ -116,9 +120,9 @@ object ProducerSettings {
* Key and value serializer must be passed explicitly.
*/
def create[K, V](
system: ActorSystem,
keySerializer: Serializer[K],
valueSerializer: Serializer[V]
system: ActorSystem,
keySerializer: Serializer[K],
valueSerializer: Serializer[V]
): ProducerSettings[K, V] =
apply(system, keySerializer, valueSerializer)

@@ -128,9 +132,9 @@ object ProducerSettings {
* Key and value serializer must be passed explicitly.
*/
def create[K, V](
config: Config,
keySerializer: Serializer[K],
valueSerializer: Serializer[V]
config: Config,
keySerializer: Serializer[K],
valueSerializer: Serializer[V]
): ProducerSettings[K, V] =
apply(config, keySerializer, valueSerializer)

@@ -196,16 +200,21 @@ class ProducerSettings[K, V](
copy(eosCommitInterval = eosCommitInterval)

private def copy(
properties: Map[String, String] = properties,
keySerializer: Option[Serializer[K]] = keySerializerOpt,
valueSerializer: Option[Serializer[V]] = valueSerializerOpt,
closeTimeout: FiniteDuration = closeTimeout,
parallelism: Int = parallelism,
dispatcher: String = dispatcher,
eosCommitInterval: FiniteDuration = eosCommitInterval
properties: Map[String, String] = properties,
keySerializer: Option[Serializer[K]] = keySerializerOpt,
valueSerializer: Option[Serializer[V]] = valueSerializerOpt,
closeTimeout: FiniteDuration = closeTimeout,
parallelism: Int = parallelism,
dispatcher: String = dispatcher,
eosCommitInterval: FiniteDuration = eosCommitInterval
): ProducerSettings[K, V] =
new ProducerSettings[K, V](properties, keySerializer, valueSerializer, closeTimeout, parallelism, dispatcher,
eosCommitInterval)
new ProducerSettings[K, V](properties,
keySerializer,
valueSerializer,
closeTimeout,
parallelism,
dispatcher,
eosCommitInterval)

/**
* Create a `KafkaProducer` instance from the settings.
48 changes: 35 additions & 13 deletions core/src/main/scala/akka/kafka/Subscriptions.scala
Original file line number Diff line number Diff line change
@@ -12,8 +12,10 @@ import scala.annotation.varargs
import scala.collection.JavaConverters._

sealed trait Subscription {

/** ActorRef which is to receive [[akka.kafka.ConsumerRebalanceEvent]] signals when rebalancing happens */
def rebalanceListener: Option[ActorRef]

/** Configure this actor ref to receive [[akka.kafka.ConsumerRebalanceEvent]] signals */
def withRebalanceListener(ref: ActorRef): Subscription
}
@@ -25,37 +27,51 @@ sealed trait AutoSubscription extends Subscription {
}

sealed trait ConsumerRebalanceEvent
final case class TopicPartitionsAssigned(sub: Subscription, topicPartitions: Set[TopicPartition]) extends ConsumerRebalanceEvent
final case class TopicPartitionsRevoked(sub: Subscription, topicPartitions: Set[TopicPartition]) extends ConsumerRebalanceEvent
final case class TopicPartitionsAssigned(sub: Subscription, topicPartitions: Set[TopicPartition])
extends ConsumerRebalanceEvent
final case class TopicPartitionsRevoked(sub: Subscription, topicPartitions: Set[TopicPartition])
extends ConsumerRebalanceEvent

object Subscriptions {

/** INTERNAL API */
@akka.annotation.InternalApi
private[kafka] final case class TopicSubscription(tps: Set[String], rebalanceListener: Option[ActorRef]) extends AutoSubscription {
private[kafka] final case class TopicSubscription(tps: Set[String], rebalanceListener: Option[ActorRef])
extends AutoSubscription {
def withRebalanceListener(ref: ActorRef): TopicSubscription =
TopicSubscription(tps, Some(ref))
}

/** INTERNAL API */
@akka.annotation.InternalApi
private[kafka] final case class TopicSubscriptionPattern(pattern: String, rebalanceListener: Option[ActorRef]) extends AutoSubscription {
private[kafka] final case class TopicSubscriptionPattern(pattern: String, rebalanceListener: Option[ActorRef])
extends AutoSubscription {
def withRebalanceListener(ref: ActorRef): TopicSubscriptionPattern =
TopicSubscriptionPattern(pattern, Some(ref))
}

/** INTERNAL API */
@akka.annotation.InternalApi
private[kafka] final case class Assignment(tps: Set[TopicPartition], rebalanceListener: Option[ActorRef]) extends ManualSubscription {
private[kafka] final case class Assignment(tps: Set[TopicPartition], rebalanceListener: Option[ActorRef])
extends ManualSubscription {
def withRebalanceListener(ref: ActorRef): Assignment =
Assignment(tps, Some(ref))
}

/** INTERNAL API */
@akka.annotation.InternalApi
private[kafka] final case class AssignmentWithOffset(tps: Map[TopicPartition, Long], rebalanceListener: Option[ActorRef]) extends ManualSubscription {
private[kafka] final case class AssignmentWithOffset(tps: Map[TopicPartition, Long],
rebalanceListener: Option[ActorRef])
extends ManualSubscription {
def withRebalanceListener(ref: ActorRef): AssignmentWithOffset =
AssignmentWithOffset(tps, Some(ref))
}

/** INTERNAL API */
@akka.annotation.InternalApi
private[kafka] final case class AssignmentOffsetsForTimes(timestampsToSearch: Map[TopicPartition, Long], rebalanceListener: Option[ActorRef]) extends ManualSubscription {
private[kafka] final case class AssignmentOffsetsForTimes(timestampsToSearch: Map[TopicPartition, Long],
rebalanceListener: Option[ActorRef])
extends ManualSubscription {
def withRebalanceListener(ref: ActorRef): AssignmentOffsetsForTimes =
AssignmentOffsetsForTimes(timestampsToSearch, Some(ref))
}
@@ -113,32 +129,38 @@ object Subscriptions {
* Manually assign given topics and partitions with offsets
* JAVA API
*/
def assignmentWithOffset(tps: java.util.Map[TopicPartition, java.lang.Long]): ManualSubscription = assignmentWithOffset(tps.asScala.toMap.asInstanceOf[Map[TopicPartition, Long]])
def assignmentWithOffset(tps: java.util.Map[TopicPartition, java.lang.Long]): ManualSubscription =
assignmentWithOffset(tps.asScala.toMap.asInstanceOf[Map[TopicPartition, Long]])

/**
* Manually assign given topics and partitions with offsets
*/
def assignmentWithOffset(tp: TopicPartition, offset: Long): ManualSubscription = assignmentWithOffset(Map(tp -> offset))
def assignmentWithOffset(tp: TopicPartition, offset: Long): ManualSubscription =
assignmentWithOffset(Map(tp -> offset))

/**
* Manually assign given topics and partitions with timestamps
*/
def assignmentOffsetsForTimes(tps: Map[TopicPartition, Long]): ManualSubscription = AssignmentOffsetsForTimes(tps, None)
def assignmentOffsetsForTimes(tps: Map[TopicPartition, Long]): ManualSubscription =
AssignmentOffsetsForTimes(tps, None)

/**
* Manually assign given topics and partitions with timestamps
*/
def assignmentOffsetsForTimes(tps: (TopicPartition, Long)*): ManualSubscription = AssignmentOffsetsForTimes(tps.toMap, None)
def assignmentOffsetsForTimes(tps: (TopicPartition, Long)*): ManualSubscription =
AssignmentOffsetsForTimes(tps.toMap, None)

/**
* Manually assign given topics and partitions with timestamps
* JAVA API
*/
def assignmentOffsetsForTimes(tps: java.util.Map[TopicPartition, java.lang.Long]): ManualSubscription = assignmentOffsetsForTimes(tps.asScala.toMap.asInstanceOf[Map[TopicPartition, Long]])
def assignmentOffsetsForTimes(tps: java.util.Map[TopicPartition, java.lang.Long]): ManualSubscription =
assignmentOffsetsForTimes(tps.asScala.toMap.asInstanceOf[Map[TopicPartition, Long]])

/**
* Manually assign given topics and partitions with timestamps
*/
def assignmentOffsetsForTimes(tp: TopicPartition, timestamp: Long): ManualSubscription = assignmentOffsetsForTimes(Map(tp -> timestamp))
def assignmentOffsetsForTimes(tp: TopicPartition, timestamp: Long): ManualSubscription =
assignmentOffsetsForTimes(Map(tp -> timestamp))

}
7 changes: 4 additions & 3 deletions core/src/main/scala/akka/kafka/internal/ConfigSettings.scala
Original file line number Diff line number Diff line change
@@ -19,17 +19,18 @@ private[kafka] object ConfigSettings {

def parseKafkaClientsProperties(config: Config): Map[String, String] = {
@tailrec
def collectKeys(c: ConfigObject, processedKeys: Set[String], unprocessedKeys: List[String]): Set[String] = {
def collectKeys(c: ConfigObject, processedKeys: Set[String], unprocessedKeys: List[String]): Set[String] =
if (unprocessedKeys.isEmpty) processedKeys
else {
c.toConfig.getAnyRef(unprocessedKeys.head) match {
case o: util.Map[_, _] =>
collectKeys(c, processedKeys, unprocessedKeys.tail ::: o.keySet().asScala.toList.map(unprocessedKeys.head + "." + _))
collectKeys(c,
processedKeys,
unprocessedKeys.tail ::: o.keySet().asScala.toList.map(unprocessedKeys.head + "." + _))
case _ =>
collectKeys(c, processedKeys + unprocessedKeys.head, unprocessedKeys.tail)
}
}
}

val keys = collectKeys(config.root, Set.empty[String], config.root().keySet().asScala.toList)
keys.map(key => key -> config.getString(key)).toMap
101 changes: 58 additions & 43 deletions core/src/main/scala/akka/kafka/internal/ConsumerStage.scala
Original file line number Diff line number Diff line change
@@ -34,17 +34,21 @@ import scala.concurrent.{ExecutionContext, Future, Promise}
* INTERNAL API
*/
private[kafka] object ConsumerStage {
def plainSubSource[K, V](settings: ConsumerSettings[K, V], subscription: AutoSubscription, getOffsetsOnAssign: Option[Set[TopicPartition] => Future[Map[TopicPartition, Long]]] = None, onRevoke: Set[TopicPartition] => Unit = _ => ()) = {
def plainSubSource[K, V](settings: ConsumerSettings[K, V],
subscription: AutoSubscription,
getOffsetsOnAssign: Option[Set[TopicPartition] => Future[Map[TopicPartition, Long]]] = None,
onRevoke: Set[TopicPartition] => Unit = _ => ()) =
new KafkaSourceStage[K, V, (TopicPartition, Source[ConsumerRecord[K, V], NotUsed])] {
override protected def logic(shape: SourceShape[(TopicPartition, Source[ConsumerRecord[K, V], NotUsed])]) =
new SubSourceLogic[K, V, ConsumerRecord[K, V]](shape, settings, subscription, getOffsetsOnAssign, onRevoke) with PlainMessageBuilder[K, V] with MetricsControl
new SubSourceLogic[K, V, ConsumerRecord[K, V]](shape, settings, subscription, getOffsetsOnAssign, onRevoke)
with PlainMessageBuilder[K, V] with MetricsControl
}
}

def committableSubSource[K, V](settings: ConsumerSettings[K, V], subscription: AutoSubscription) = {
def committableSubSource[K, V](settings: ConsumerSettings[K, V], subscription: AutoSubscription) =
new KafkaSourceStage[K, V, (TopicPartition, Source[CommittableMessage[K, V], NotUsed])] {
override protected def logic(shape: SourceShape[(TopicPartition, Source[CommittableMessage[K, V], NotUsed])]) =
new SubSourceLogic[K, V, CommittableMessage[K, V]](shape, settings, subscription) with CommittableMessageBuilder[K, V] with MetricsControl {
new SubSourceLogic[K, V, CommittableMessage[K, V]](shape, settings, subscription)
with CommittableMessageBuilder[K, V] with MetricsControl {

override def groupId: String = settings.properties(ConsumerConfig.GROUP_ID_CONFIG)
lazy val committer: Committer = {
@@ -53,50 +57,53 @@ private[kafka] object ConsumerStage {
}
}
}
}

def plainSource[K, V](settings: ConsumerSettings[K, V], subscription: Subscription) = {
def plainSource[K, V](settings: ConsumerSettings[K, V], subscription: Subscription) =
new KafkaSourceStage[K, V, ConsumerRecord[K, V]] {
override protected def logic(shape: SourceShape[ConsumerRecord[K, V]]) =
new SingleSourceLogic[K, V, ConsumerRecord[K, V]](shape, settings, subscription) with PlainMessageBuilder[K, V]
}
}

def externalPlainSource[K, V](consumer: ActorRef, subscription: ManualSubscription) = {
def externalPlainSource[K, V](consumer: ActorRef, subscription: ManualSubscription) =
new KafkaSourceStage[K, V, ConsumerRecord[K, V]] {
override protected def logic(shape: SourceShape[ConsumerRecord[K, V]]) =
new ExternalSingleSourceLogic[K, V, ConsumerRecord[K, V]](shape, consumer, subscription) with PlainMessageBuilder[K, V] with MetricsControl
new ExternalSingleSourceLogic[K, V, ConsumerRecord[K, V]](shape, consumer, subscription)
with PlainMessageBuilder[K, V] with MetricsControl
}
}

def committableSource[K, V](settings: ConsumerSettings[K, V], subscription: Subscription) = {
def committableSource[K, V](settings: ConsumerSettings[K, V], subscription: Subscription) =
new KafkaSourceStage[K, V, CommittableMessage[K, V]] {
override protected def logic(shape: SourceShape[CommittableMessage[K, V]]) =
new SingleSourceLogic[K, V, CommittableMessage[K, V]](shape, settings, subscription) with CommittableMessageBuilder[K, V] {
new SingleSourceLogic[K, V, CommittableMessage[K, V]](shape, settings, subscription)
with CommittableMessageBuilder[K, V] {
override def groupId: String = settings.properties(ConsumerConfig.GROUP_ID_CONFIG)
lazy val committer: Committer = {
val ec = materializer.executionContext
KafkaAsyncConsumerCommitterRef(consumer, settings.commitTimeout)(ec)
}
}
}
}

def externalCommittableSource[K, V](consumer: ActorRef, _groupId: String, commitTimeout: FiniteDuration, subscription: ManualSubscription) = {
def externalCommittableSource[K, V](consumer: ActorRef,
_groupId: String,
commitTimeout: FiniteDuration,
subscription: ManualSubscription) =
new KafkaSourceStage[K, V, CommittableMessage[K, V]] {
override protected def logic(shape: SourceShape[CommittableMessage[K, V]]) =
new ExternalSingleSourceLogic[K, V, CommittableMessage[K, V]](shape, consumer, subscription) with CommittableMessageBuilder[K, V] with MetricsControl {
new ExternalSingleSourceLogic[K, V, CommittableMessage[K, V]](shape, consumer, subscription)
with CommittableMessageBuilder[K, V] with MetricsControl {
override def groupId: String = _groupId
lazy val committer: Committer = {
val ec = materializer.executionContext
KafkaAsyncConsumerCommitterRef(consumer, commitTimeout)(ec)
}
}
}
}

def transactionalSource[K, V](consumerSettings: ConsumerSettings[K, V], subscription: Subscription) = {
require(consumerSettings.properties(ConsumerConfig.GROUP_ID_CONFIG).nonEmpty, "You must define a Consumer group.id.")
require(consumerSettings.properties(ConsumerConfig.GROUP_ID_CONFIG).nonEmpty,
"You must define a Consumer group.id.")

/**
* We set the isolation.level config to read_committed to make sure that any consumed messages are from
* committed transactions. Note that the consuming partitions may be produced by multiple producers, and these
@@ -105,18 +112,22 @@ private[kafka] object ConsumerStage {
* will still consume non-transactional messages.
*/
val txConsumerSettings = consumerSettings.withProperty(
ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString.toLowerCase(Locale.ENGLISH))
ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString.toLowerCase(Locale.ENGLISH)
)

new KafkaSourceStage[K, V, TransactionalMessage[K, V]] {
override protected def logic(shape: SourceShape[TransactionalMessage[K, V]]) =
new SingleSourceLogic[K, V, TransactionalMessage[K, V]](shape, txConsumerSettings, subscription) with TransactionalMessageBuilder[K, V] {
new SingleSourceLogic[K, V, TransactionalMessage[K, V]](shape, txConsumerSettings, subscription)
with TransactionalMessageBuilder[K, V] {
override def groupId: String = txConsumerSettings.properties(ConsumerConfig.GROUP_ID_CONFIG)
}
}
}

// This should be case class to be comparable based on ref and timeout. This comparison is used in CommittableOffsetBatchImpl
case class KafkaAsyncConsumerCommitterRef(ref: ActorRef, commitTimeout: FiniteDuration)(implicit ec: ExecutionContext) extends Committer {
case class KafkaAsyncConsumerCommitterRef(ref: ActorRef, commitTimeout: FiniteDuration)(implicit ec: ExecutionContext)
extends Committer {
import akka.pattern.ask

import scala.collection.breakOut
@@ -127,7 +138,8 @@ private[kafka] object ConsumerStage {
}(breakOut)

(ref ? Commit(offsetsMap)).mapTo[Committed].map(_ => Done).recoverWith {
case _: AskTimeoutException => Future.failed(new CommitTimeoutException(s"Kafka commit took longer than: $commitTimeout"))
case _: AskTimeoutException =>
Future.failed(new CommitTimeoutException(s"Kafka commit took longer than: $commitTimeout"))
case other => Future.failed(other)
}
}
@@ -147,15 +159,15 @@ private[kafka] object ConsumerStage {
}
Future.sequence(futures).map(_ => Done)

case _ => throw new IllegalArgumentException(
s"Unknown CommittableOffsetBatch, got [${batch.getClass.getName}], " +
case _ =>
throw new IllegalArgumentException(
s"Unknown CommittableOffsetBatch, got [${batch.getClass.getName}], " +
s"expected [${classOf[CommittableOffsetBatchImpl].getName}]"
)
)
}
}

abstract class KafkaSourceStage[K, V, Msg]()
extends GraphStageWithMaterializedValue[SourceShape[Msg], Control] {
abstract class KafkaSourceStage[K, V, Msg]() extends GraphStageWithMaterializedValue[SourceShape[Msg], Control] {
protected val out = Outlet[Msg]("out")
val shape = new SourceShape(out)
protected def logic(shape: SourceShape[Msg]): GraphStageLogic with Control
@@ -202,8 +214,9 @@ private[kafka] object ConsumerStage {
}
}

final case class CommittableOffsetImpl(override val partitionOffset: ConsumerMessage.PartitionOffset)(val committer: Committer)
extends CommittableOffset {
final case class CommittableOffsetImpl(override val partitionOffset: ConsumerMessage.PartitionOffset)(
val committer: Committer
) extends CommittableOffset {
override def commitScaladsl(): Future[Done] = committer.commit(immutable.Seq(partitionOffset))
override def commitJavadsl(): CompletionStage[Done] = commitScaladsl().toJava
}
@@ -214,8 +227,9 @@ private[kafka] object ConsumerStage {
def commit(batch: CommittableOffsetBatch): Future[Done]
}

final class CommittableOffsetBatchImpl(val offsets: Map[GroupTopicPartition, Long], val stages: Map[String, Committer])
extends CommittableOffsetBatch {
final class CommittableOffsetBatchImpl(val offsets: Map[GroupTopicPartition, Long],
val stages: Map[String, Committer])
extends CommittableOffsetBatch {

override def updated(committableOffset: CommittableOffset): CommittableOffsetBatch = {
val partitionOffset = committableOffset.partitionOffset
@@ -225,16 +239,18 @@ private[kafka] object ConsumerStage {

val stage = committableOffset match {
case c: CommittableOffsetImpl => c.committer
case _ => throw new IllegalArgumentException(
s"Unknown CommittableOffset, got [${committableOffset.getClass.getName}], " +
case _ =>
throw new IllegalArgumentException(
s"Unknown CommittableOffset, got [${committableOffset.getClass.getName}], " +
s"expected [${classOf[CommittableOffsetImpl].getName}]"
)
)
}

val newStages = stages.get(key.groupId) match {
case Some(s) =>
require(s == stage, s"CommittableOffset [$committableOffset] origin stage must be same as other " +
s"stage with same groupId. Expected [$s], got [$stage]")
require(s == stage,
s"CommittableOffset [$committableOffset] origin stage must be same as other " +
s"stage with same groupId. Expected [$s], got [$stage]")
stages
case None =>
stages.updated(key.groupId, stage)
@@ -243,20 +259,18 @@ private[kafka] object ConsumerStage {
new CommittableOffsetBatchImpl(newOffsets, newStages)
}

override def getOffsets(): JMap[GroupTopicPartition, Long] = {
override def getOffsets(): JMap[GroupTopicPartition, Long] =
offsets.asJava
}

override def toString(): String =
s"CommittableOffsetBatch(${offsets.mkString("->")})"

override def commitScaladsl(): Future[Done] = {
override def commitScaladsl(): Future[Done] =
if (offsets.isEmpty)
Future.successful(Done)
else {
stages.head._2.commit(this)
}
}

override def commitJavadsl(): CompletionStage[Done] = commitScaladsl().toJava

@@ -305,9 +319,8 @@ private[kafka] trait PromiseControl extends GraphStageLogic with Control {
case ControlShutdown => performShutdown()
})

def onStop() = {
def onStop() =
stopPromise.trySuccess(Done)
}

def onShutdown() = {
stopPromise.trySuccess(Done)
@@ -334,7 +347,9 @@ private[kafka] trait MetricsControl extends Control {
import akka.pattern.ask

import scala.concurrent.duration._
consumer.?(RequestMetrics)(Timeout(1.minute)).mapTo[ConsumerMetrics]
consumer
.?(RequestMetrics)(Timeout(1.minute))
.mapTo[ConsumerMetrics]
.map(_.metrics)(ExecutionContexts.sameThreadExecutionContext)
}
}
Original file line number Diff line number Diff line change
@@ -20,7 +20,9 @@ private[kafka] abstract class ExternalSingleSourceLogic[K, V, Msg](
val shape: SourceShape[Msg],
val consumer: ActorRef,
subscription: ManualSubscription
) extends GraphStageLogic(shape) with PromiseControl with MessageBuilder[K, V, Msg] {
) extends GraphStageLogic(shape)
with PromiseControl
with MessageBuilder[K, V, Msg] {
var tps = Set.empty[TopicPartition]
var buffer: Iterator[ConsumerRecord[K, V]] = Iterator.empty
var requested = false
@@ -38,8 +40,7 @@ private[kafka] abstract class ExternalSingleSourceLogic[K, V, Msg](
// do not use simple ++ because of https://issues.scala-lang.org/browse/SI-9766
if (buffer.hasNext) {
buffer = buffer ++ msg.messages
}
else {
} else {
buffer = msg.messages
}
pump()
@@ -71,18 +72,16 @@ private[kafka] abstract class ExternalSingleSourceLogic[K, V, Msg](
}

@tailrec
private def pump(): Unit = {
private def pump(): Unit =
if (isAvailable(shape.out)) {
if (buffer.hasNext) {
val msg = buffer.next()
push(shape.out, createMessage(msg))
pump()
}
else if (!requested && tps.nonEmpty) {
} else if (!requested && tps.nonEmpty) {
requestMessages()
}
}
}

private def requestMessages(): Unit = {
requested = true
@@ -91,14 +90,12 @@ private[kafka] abstract class ExternalSingleSourceLogic[K, V, Msg](
}

setHandler(shape.out, new OutHandler {
override def onPull(): Unit = {
override def onPull(): Unit =
pump()
}
})

override def performShutdown() = {
override def performShutdown() =
completeStage()
}

override def postStop(): Unit = {
onShutdown()
192 changes: 118 additions & 74 deletions core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala

Large diffs are not rendered by default.

120 changes: 70 additions & 50 deletions core/src/main/scala/akka/kafka/internal/ProducerStage.scala
Original file line number Diff line number Diff line change
@@ -34,8 +34,8 @@ private[kafka] object ProducerStage {
val closeTimeout: FiniteDuration,
val closeProducerOnStop: Boolean,
val producerProvider: () => Producer[K, V]
)
extends GraphStage[FlowShape[IN, Future[OUT]]] with ProducerStage[K, V, P, IN, OUT] {
) extends GraphStage[FlowShape[IN, Future[OUT]]]
with ProducerStage[K, V, P, IN, OUT] {

override def createLogic(inheritedAttributes: Attributes) =
new DefaultProducerStageLogic(this, producerProvider(), inheritedAttributes)
@@ -46,8 +46,8 @@ private[kafka] object ProducerStage {
val closeProducerOnStop: Boolean,
val producerProvider: () => Producer[K, V],
commitInterval: FiniteDuration
)
extends GraphStage[FlowShape[Envelope[K, V, P], Future[Results[K, V, P]]]] with ProducerStage[K, V, P, Envelope[K, V, P], Results[K, V, P]] {
) extends GraphStage[FlowShape[Envelope[K, V, P], Future[Results[K, V, P]]]]
with ProducerStage[K, V, P, Envelope[K, V, P], Results[K, V, P]] {

override def createLogic(inheritedAttributes: Attributes) =
new TransactionProducerStageLogic(this, producerProvider(), inheritedAttributes, commitInterval)
@@ -66,26 +66,31 @@ private[kafka] object ProducerStage {
/**
* Default Producer State Logic
*/
class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <: Results[K, V, P]](stage: ProducerStage[K, V, P, IN, OUT], producer: Producer[K, V],
inheritedAttributes: Attributes)
extends TimerGraphStageLogic(stage.shape) with StageLogging with MessageCallback[K, V, P] with ProducerCompletionState {

lazy val decider: Decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <: Results[K, V, P]](
stage: ProducerStage[K, V, P, IN, OUT],
producer: Producer[K, V],
inheritedAttributes: Attributes
) extends TimerGraphStageLogic(stage.shape)
with StageLogging
with MessageCallback[K, V, P]
with ProducerCompletionState {

lazy val decider: Decider =
inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
val awaitingConfirmation = new AtomicInteger(0)
@volatile var inIsClosed = false
var completionState: Option[Try[Unit]] = None

override protected def logSource: Class[_] = classOf[DefaultProducerStage[_, _, _, _, _]]

def checkForCompletion(): Unit = {
def checkForCompletion(): Unit =
if (isClosed(stage.in) && awaitingConfirmation.get == 0) {
completionState match {
case Some(Success(_)) => onCompletionSuccess()
case Some(Failure(ex)) => onCompletionFailure(ex)
case None => failStage(new IllegalStateException("Stage completed, but there is no info about status"))
}
}
}

override def onCompletionSuccess(): Unit = completeStage()

@@ -99,29 +104,33 @@ private[kafka] object ProducerStage {
failStage(ex)
}

override val onMessageAckCb: AsyncCallback[Envelope[K, V, P]] = getAsyncCallback[Envelope[K, V, P]] { _ => }
override val onMessageAckCb: AsyncCallback[Envelope[K, V, P]] = getAsyncCallback[Envelope[K, V, P]] { _ =>
}

setHandler(stage.out, new OutHandler {
override def onPull(): Unit = tryPull(stage.in)
})

setHandler(stage.in, new InHandler {
override def onPush(): Unit = produce(grab(stage.in))
setHandler(
stage.in,
new InHandler {
override def onPush(): Unit = produce(grab(stage.in))

override def onUpstreamFinish(): Unit = {
inIsClosed = true
completionState = Some(Success(()))
checkForCompletion()
}
override def onUpstreamFinish(): Unit = {
inIsClosed = true
completionState = Some(Success(()))
checkForCompletion()
}

override def onUpstreamFailure(ex: Throwable): Unit = {
inIsClosed = true
completionState = Some(Failure(ex))
checkForCompletion()
override def onUpstreamFailure(ex: Throwable): Unit = {
inIsClosed = true
completionState = Some(Failure(ex))
checkForCompletion()
}
}
})
)

def produce(in: Envelope[K, V, P]): Unit = {
def produce(in: Envelope[K, V, P]): Unit =
in match {
case msg: Message[K, V, P] =>
val r = Promise[Result[K, V, P]]
@@ -156,20 +165,20 @@ private[kafka] object ProducerStage {
push(stage.out, future)

}
}

private def sendCallback(promise: Promise[_], onSuccess: RecordMetadata => Unit): Callback = new Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
if (exception == null) onSuccess(metadata)
else decider(exception) match {
case Supervision.Stop =>
if (stage.closeProducerOnStop) {
producer.close(0, TimeUnit.MILLISECONDS)
}
failStageCb.invoke(exception)
case _ =>
promise.failure(exception)
}
else
decider(exception) match {
case Supervision.Stop =>
if (stage.closeProducerOnStop) {
producer.close(0, TimeUnit.MILLISECONDS)
}
failStageCb.invoke(exception)
case _ =>
promise.failure(exception)
}
if (awaitingConfirmation.decrementAndGet() == 0 && inIsClosed)
checkForCompletionCB.invoke(())
}
@@ -184,8 +193,7 @@ private[kafka] object ProducerStage {
producer.flush()
producer.close(stage.closeTimeout.toMillis, TimeUnit.MILLISECONDS)
log.debug("Producer closed")
}
catch {
} catch {
case NonFatal(ex) => log.error(ex, "Problem occurred during producer close")
}
}
@@ -197,9 +205,16 @@ private[kafka] object ProducerStage {
/**
* Transaction (Exactly-Once) Producer State Logic
*/
class TransactionProducerStageLogic[K, V, P](stage: ProducerStage[K, V, P, Envelope[K, V, P], Results[K, V, P]], producer: Producer[K, V],
inheritedAttributes: Attributes, commitInterval: FiniteDuration)
extends DefaultProducerStageLogic[K, V, P, Envelope[K, V, P], Results[K, V, P]](stage, producer, inheritedAttributes) with StageLogging with MessageCallback[K, V, P] with ProducerCompletionState {
class TransactionProducerStageLogic[K, V, P](stage: ProducerStage[K, V, P, Envelope[K, V, P], Results[K, V, P]],
producer: Producer[K, V],
inheritedAttributes: Attributes,
commitInterval: FiniteDuration)
extends DefaultProducerStageLogic[K, V, P, Envelope[K, V, P], Results[K, V, P]](stage,
producer,
inheritedAttributes)
with StageLogging
with MessageCallback[K, V, P]
with ProducerCompletionState {
private val commitSchedulerKey = "commit"
private val messageDrainInterval = 10.milliseconds

@@ -222,10 +237,14 @@ private[kafka] object ProducerStage {
}
}

private def suspendDemand(): Unit = setHandler(stage.out, new OutHandler {
// suspend demand while a commit is in process so we can drain any outstanding message acknowledgements
override def onPull(): Unit = ()
})
private def suspendDemand(): Unit =
setHandler(
stage.out,
new OutHandler {
// suspend demand while a commit is in process so we can drain any outstanding message acknowledgements
override def onPull(): Unit = ()
}
)

override protected def onTimer(timerKey: Any): Unit =
if (timerKey == commitSchedulerKey) {
@@ -313,13 +332,13 @@ private[kafka] object ProducerStage {
}

final class EmptyTransactionBatch extends TransactionBatch {
override def updated(partitionOffset: PartitionOffset): TransactionBatch = new NonemptyTransactionBatch(partitionOffset)
override def updated(partitionOffset: PartitionOffset): TransactionBatch =
new NonemptyTransactionBatch(partitionOffset)
}

final class NonemptyTransactionBatch(
head: PartitionOffset,
tail: Map[GroupTopicPartition, Long] = Map[GroupTopicPartition, Long]())
extends TransactionBatch {
final class NonemptyTransactionBatch(head: PartitionOffset,
tail: Map[GroupTopicPartition, Long] = Map[GroupTopicPartition, Long]())
extends TransactionBatch {
private val offsets = tail + (head.key -> head.offset)

def group: String = head.key.groupId
@@ -330,7 +349,8 @@ private[kafka] object ProducerStage {
override def updated(partitionOffset: PartitionOffset): TransactionBatch = {
require(
group == partitionOffset.key.groupId,
s"Transaction batch must contain messages from exactly 1 consumer group. $group != ${partitionOffset.key.groupId}")
s"Transaction batch must contain messages from exactly 1 consumer group. $group != ${partitionOffset.key.groupId}"
)
new NonemptyTransactionBatch(partitionOffset, offsets)
}
}
25 changes: 13 additions & 12 deletions core/src/main/scala/akka/kafka/internal/SingleSourceLogic.scala
Original file line number Diff line number Diff line change
@@ -25,7 +25,10 @@ private[kafka] abstract class SingleSourceLogic[K, V, Msg](
val shape: SourceShape[Msg],
settings: ConsumerSettings[K, V],
subscription: Subscription
) extends GraphStageLogic(shape) with PromiseControl with MessageBuilder[K, V, Msg] with StageLogging {
) extends GraphStageLogic(shape)
with PromiseControl
with MessageBuilder[K, V, Msg]
with StageLogging {

override protected def logSource: Class[_] = classOf[SingleSourceLogic[K, V, Msg]]

@@ -55,8 +58,7 @@ private[kafka] abstract class SingleSourceLogic[K, V, Msg](
// do not use simple ++ because of https://issues.scala-lang.org/browse/SI-9766
if (buffer.hasNext) {
buffer = buffer ++ msg.messages
}
else {
} else {
buffer = msg.messages
}
pump()
@@ -109,18 +111,16 @@ private[kafka] abstract class SingleSourceLogic[K, V, Msg](
subscription.rebalanceListener.foreach(ref ref ! TopicPartitionsRevoked(subscription, set))

@tailrec
private def pump(): Unit = {
private def pump(): Unit =
if (isAvailable(shape.out)) {
if (buffer.hasNext) {
val msg = buffer.next()
push(shape.out, createMessage(msg))
pump()
}
else if (!requested && tps.nonEmpty) {
} else if (!requested && tps.nonEmpty) {
requestMessages()
}
}
}

private def requestMessages(): Unit = {
requested = true
@@ -130,13 +130,11 @@ private[kafka] abstract class SingleSourceLogic[K, V, Msg](
}

setHandler(shape.out, new OutHandler {
override def onPull(): Unit = {
override def onPull(): Unit =
pump()
}

override def onDownstreamFinish(): Unit = {
override def onDownstreamFinish(): Unit =
performShutdown()
}
})

override def postStop(): Unit = {
@@ -161,7 +159,10 @@ private[kafka] abstract class SingleSourceLogic[K, V, Msg](
override def metrics: Future[Map[MetricName, Metric]] = {
import akka.pattern.ask
import scala.concurrent.duration._
consumer.?(RequestMetrics)(Timeout(1.minute)).mapTo[ConsumerMetrics].map(_.metrics)(ExecutionContexts.sameThreadExecutionContext)
consumer
.?(RequestMetrics)(Timeout(1.minute))
.mapTo[ConsumerMetrics]
.map(_.metrics)(ExecutionContexts.sameThreadExecutionContext)
}

}
51 changes: 26 additions & 25 deletions core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@ import akka.actor.{ActorRef, Cancellable, ExtendedActorSystem, Terminated}
import akka.kafka.Subscriptions.{TopicSubscription, TopicSubscriptionPattern}
import akka.kafka.scaladsl.Consumer.Control
import akka.kafka.{AutoSubscription, ConsumerFailed, ConsumerSettings}
import akka.pattern.{AskTimeoutException, ask}
import akka.pattern.{ask, AskTimeoutException}
import akka.stream.scaladsl.Source
import akka.stream.stage.GraphStageLogic.StageActor
import akka.stream.stage._
@@ -32,7 +32,11 @@ private[kafka] abstract class SubSourceLogic[K, V, Msg](
subscription: AutoSubscription,
getOffsetsOnAssign: Option[Set[TopicPartition] => Future[Map[TopicPartition, Long]]] = None,
onRevoke: Set[TopicPartition] => Unit = _ => ()
) extends GraphStageLogic(shape) with PromiseControl with MetricsControl with MessageBuilder[K, V, Msg] with StageLogging {
) extends GraphStageLogic(shape)
with PromiseControl
with MetricsControl
with MessageBuilder[K, V, Msg]
with StageLogging {
var consumer: ActorRef = _
var self: StageActor = _
// Kafka has notified us that we have these partitions assigned, but we have not created a source for them yet.
@@ -92,12 +96,19 @@ private[kafka] abstract class SubSourceLogic[K, V, Msg](
getOffsetsOnAssign.fold(pumpCB.invoke(partitions)) { getOffsets =>
getOffsets(partitions)
.onComplete {
case Failure(ex) => stageFailCB.invoke(new ConsumerFailed(s"Failed to fetch offset for partitions: ${partitions.mkString(", ")}.", ex))
case Failure(ex) =>
stageFailCB.invoke(
new ConsumerFailed(s"Failed to fetch offset for partitions: ${partitions.mkString(", ")}.", ex)
)
case Success(offsets) =>
consumer.ask(KafkaConsumerActor.Internal.Seek(offsets))
consumer
.ask(KafkaConsumerActor.Internal.Seek(offsets))
.map(_ => pumpCB.invoke(partitions))
.recover {
case _: AskTimeoutException => stageFailCB.invoke(new ConsumerFailed(s"Consumer failed during seek for partitions: ${partitions.mkString(", ")}."))
case _: AskTimeoutException =>
stageFailCB.invoke(
new ConsumerFailed(s"Consumer failed during seek for partitions: ${partitions.mkString(", ")}.")
)
}
}
}
@@ -141,28 +152,24 @@ private[kafka] abstract class SubSourceLogic[K, V, Msg](
// Partition was revoked while
// starting up. Kill!
control.shutdown()
}
else {
} else {
subSources += (tp -> control)
partitionsInStartup -= tp
}
}

setHandler(shape.out, new OutHandler {
override def onPull(): Unit = {
override def onPull(): Unit =
pump()
}
override def onDownstreamFinish(): Unit = {
override def onDownstreamFinish(): Unit =
performShutdown()
}
})

def createSource(tp: TopicPartition): Source[Msg, NotUsed] = {
def createSource(tp: TopicPartition): Source[Msg, NotUsed] =
Source.fromGraph(new SubSourceStage(tp, consumer))
}

@tailrec
private def pump(): Unit = {
private def pump(): Unit =
if (pendingPartitions.nonEmpty && isAvailable(shape.out)) {
val tp = pendingPartitions.head

@@ -171,7 +178,6 @@ private[kafka] abstract class SubSourceLogic[K, V, Msg](
push(shape.out, (tp, createSource(tp)))
pump()
}
}

override def postStop(): Unit = {
consumer ! KafkaConsumerActor.Internal.Stop
@@ -210,7 +216,7 @@ private[kafka] abstract class SubSourceLogic[K, V, Msg](
val out = Outlet[Msg]("out")
val shape = new SourceShape(out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with PromiseControl with MetricsControl {
def consumer = consumerRef
val shape = stage.shape
@@ -228,8 +234,7 @@ private[kafka] abstract class SubSourceLogic[K, V, Msg](
// do not use simple ++ because of https://issues.scala-lang.org/browse/SI-9766
if (buffer.hasNext) {
buffer = buffer ++ msg.messages
}
else {
} else {
buffer = msg.messages
}
pump()
@@ -245,9 +250,8 @@ private[kafka] abstract class SubSourceLogic[K, V, Msg](
}

setHandler(out, new OutHandler {
override def onPull(): Unit = {
override def onPull(): Unit =
pump()
}

override def onDownstreamFinish(): Unit = {
subsourceCancelledCB.invoke(tp)
@@ -261,20 +265,17 @@ private[kafka] abstract class SubSourceLogic[K, V, Msg](
}

@tailrec
private def pump(): Unit = {
private def pump(): Unit =
if (isAvailable(out)) {
if (buffer.hasNext) {
val msg = buffer.next()
push(out, createMessage(msg))
pump()
}
else if (!requested) {
} else if (!requested) {
requested = true
consumer.tell(requestMessages, self.ref)
}
}
}
}
}
}
}
81 changes: 57 additions & 24 deletions core/src/main/scala/akka/kafka/javadsl/Consumer.scala
Original file line number Diff line number Diff line change
@@ -73,7 +73,8 @@ object Consumer {
* one, so that the stream can be stopped in a controlled way without losing
* commits.
*/
final class DrainingControl[T] private[javadsl] (control: Control, streamCompletion: CompletionStage[T]) extends Control {
final class DrainingControl[T] private[javadsl] (control: Control, streamCompletion: CompletionStage[T])
extends Control {

override def stop(): CompletionStage[Done] = control.stop()

@@ -100,7 +101,8 @@ object Consumer {
* one, so that the stream can be stopped in a controlled way without losing
* commits.
*/
def createDrainingControl[T](pair: Pair[Control, CompletionStage[T]]) = new DrainingControl[T](pair.first, pair.second)
def createDrainingControl[T](pair: Pair[Control, CompletionStage[T]]) =
new DrainingControl[T](pair.first, pair.second)

/**
* The `plainSource` emits `ConsumerRecord` elements (as received from the underlying `KafkaConsumer`).
@@ -113,8 +115,10 @@ object Consumer {
* possible, but when it is it will make the consumption fully atomic and give "exactly once" semantics that are
* stronger than the "at-least once" semantics you get with Kafka's offset commit functionality.
*/
def plainSource[K, V](settings: ConsumerSettings[K, V], subscription: Subscription): Source[ConsumerRecord[K, V], Control] =
scaladsl.Consumer.plainSource(settings, subscription)
def plainSource[K, V](settings: ConsumerSettings[K, V],
subscription: Subscription): Source[ConsumerRecord[K, V], Control] =
scaladsl.Consumer
.plainSource(settings, subscription)
.mapMaterializedValue(new WrappedConsumerControl(_))
.asJava

@@ -131,17 +135,21 @@ object Consumer {
* If you need to store offsets in anything other than Kafka, [[#plainSource]] should be used
* instead of this API.
*/
def committableSource[K, V](settings: ConsumerSettings[K, V], subscription: Subscription): Source[CommittableMessage[K, V], Control] =
scaladsl.Consumer.committableSource(settings, subscription)
def committableSource[K, V](settings: ConsumerSettings[K, V],
subscription: Subscription): Source[CommittableMessage[K, V], Control] =
scaladsl.Consumer
.committableSource(settings, subscription)
.mapMaterializedValue(new WrappedConsumerControl(_))
.asJava

/**
* Convenience for "at-most once delivery" semantics. The offset of each message is committed to Kafka
* before emitted downstreams.
*/
def atMostOnceSource[K, V](settings: ConsumerSettings[K, V], subscription: Subscription): Source[ConsumerRecord[K, V], Control] =
scaladsl.Consumer.atMostOnceSource(settings, subscription)
def atMostOnceSource[K, V](settings: ConsumerSettings[K, V],
subscription: Subscription): Source[ConsumerRecord[K, V], Control] =
scaladsl.Consumer
.atMostOnceSource(settings, subscription)
.mapMaterializedValue(new WrappedConsumerControl(_))
.asJava

@@ -150,27 +158,37 @@ object Consumer {
* When topic-partition is assigned to a consumer this source will emit tuple with assigned topic-partition and a corresponding source
* When topic-partition is revoked then corresponding source completes
*/
def plainPartitionedSource[K, V](settings: ConsumerSettings[K, V], subscription: AutoSubscription): Source[Pair[TopicPartition, Source[ConsumerRecord[K, V], NotUsed]], Control] = {
scaladsl.Consumer.plainPartitionedSource(settings, subscription)
def plainPartitionedSource[K, V](
settings: ConsumerSettings[K, V],
subscription: AutoSubscription
): Source[Pair[TopicPartition, Source[ConsumerRecord[K, V], NotUsed]], Control] =
scaladsl.Consumer
.plainPartitionedSource(settings, subscription)
.map {
case (tp, source) => Pair(tp, source.asJava)
}
.mapMaterializedValue(new WrappedConsumerControl(_))
.asJava
}

/**
* The `plainPartitionedManualOffsetSource` is similar to [[#plainPartitionedSource]] but allows the use of an offset store outside
* of Kafka, while retaining the automatic partition assignment. When a topic-partition is assigned to a consumer, the `loadOffsetOnAssign`
* function will be called to retrieve the offset, followed by a seek to the correct spot in the partition. The `onRevoke` function gives
* the consumer a chance to store any uncommitted offsets, and do any other cleanup that is required.
*/
def plainPartitionedManualOffsetSource[K, V](settings: ConsumerSettings[K, V], subscription: AutoSubscription, getOffsetsOnAssign: java.util.function.Function[java.util.Set[TopicPartition], CompletionStage[java.util.Map[TopicPartition, Long]]]): Source[Pair[TopicPartition, Source[ConsumerRecord[K, V], NotUsed]], Control] =
def plainPartitionedManualOffsetSource[K, V](
settings: ConsumerSettings[K, V],
subscription: AutoSubscription,
getOffsetsOnAssign: java.util.function.Function[java.util.Set[TopicPartition], CompletionStage[
java.util.Map[TopicPartition, Long]
]]
): Source[Pair[TopicPartition, Source[ConsumerRecord[K, V], NotUsed]], Control] =
scaladsl.Consumer
.plainPartitionedManualOffsetSource(
settings,
subscription,
(tps: Set[TopicPartition]) => getOffsetsOnAssign(tps.asJava).toScala.map(_.asScala.toMap)(ExecutionContexts.sameThreadExecutionContext),
(tps: Set[TopicPartition]) =>
getOffsetsOnAssign(tps.asJava).toScala.map(_.asScala.toMap)(ExecutionContexts.sameThreadExecutionContext),
_ => ()
)
.map {
@@ -187,12 +205,20 @@ object Consumer {
* `onPartitionsRevoked` hook, useful for cleaning up any partition-specific resources being used by the consumer.
*
*/
def plainPartitionedManualOffsetSource[K, V](settings: ConsumerSettings[K, V], subscription: AutoSubscription, getOffsetsOnAssign: java.util.function.Function[java.util.Set[TopicPartition], CompletionStage[java.util.Map[TopicPartition, Long]]], onRevoke: java.util.function.Consumer[java.util.Set[TopicPartition]]): Source[Pair[TopicPartition, Source[ConsumerRecord[K, V], NotUsed]], Control] =
def plainPartitionedManualOffsetSource[K, V](
settings: ConsumerSettings[K, V],
subscription: AutoSubscription,
getOffsetsOnAssign: java.util.function.Function[java.util.Set[TopicPartition], CompletionStage[
java.util.Map[TopicPartition, Long]
]],
onRevoke: java.util.function.Consumer[java.util.Set[TopicPartition]]
): Source[Pair[TopicPartition, Source[ConsumerRecord[K, V], NotUsed]], Control] =
scaladsl.Consumer
.plainPartitionedManualOffsetSource(
settings,
subscription,
(tps: Set[TopicPartition]) => getOffsetsOnAssign(tps.asJava).toScala.map(_.asScala.toMap)(ExecutionContexts.sameThreadExecutionContext),
(tps: Set[TopicPartition]) =>
getOffsetsOnAssign(tps.asJava).toScala.map(_.asScala.toMap)(ExecutionContexts.sameThreadExecutionContext),
(tps: Set[TopicPartition]) => onRevoke.accept(tps.asJava)
)
.map {
@@ -204,33 +230,40 @@ object Consumer {
/**
* The same as [[#plainPartitionedSource]] but with offset commit support
*/
def committablePartitionedSource[K, V](settings: ConsumerSettings[K, V], subscription: AutoSubscription): Source[Pair[TopicPartition, Source[CommittableMessage[K, V], NotUsed]], Control] = {
scaladsl.Consumer.committablePartitionedSource(settings, subscription)
def committablePartitionedSource[K, V](
settings: ConsumerSettings[K, V],
subscription: AutoSubscription
): Source[Pair[TopicPartition, Source[CommittableMessage[K, V], NotUsed]], Control] =
scaladsl.Consumer
.committablePartitionedSource(settings, subscription)
.map {
case (tp, source) => Pair(tp, source.asJava)
}
.mapMaterializedValue(new WrappedConsumerControl(_))
.asJava
}

/**
* Special source that can use external `KafkaAsyncConsumer`. This is useful in case when
* you have lot of manually assigned topic-partitions and want to keep only one kafka consumer
*/
def plainExternalSource[K, V](consumer: ActorRef, subscription: ManualSubscription): Source[ConsumerRecord[K, V], Control] = {
scaladsl.Consumer.plainExternalSource(consumer, subscription)
def plainExternalSource[K, V](consumer: ActorRef,
subscription: ManualSubscription): Source[ConsumerRecord[K, V], Control] =
scaladsl.Consumer
.plainExternalSource(consumer, subscription)
.mapMaterializedValue(new WrappedConsumerControl(_))
.asJava
.asInstanceOf[Source[ConsumerRecord[K, V], Control]]
}

/**
* The same as [[#plainExternalSource]] but with offset commit support
*/
def committableExternalSource[K, V](consumer: ActorRef, subscription: ManualSubscription, groupId: String, commitTimeout: FiniteDuration): Source[CommittableMessage[K, V], Control] = {
scaladsl.Consumer.committableExternalSource(consumer, subscription, groupId, commitTimeout)
def committableExternalSource[K, V](consumer: ActorRef,
subscription: ManualSubscription,
groupId: String,
commitTimeout: FiniteDuration): Source[CommittableMessage[K, V], Control] =
scaladsl.Consumer
.committableExternalSource(consumer, subscription, groupId, commitTimeout)
.mapMaterializedValue(new WrappedConsumerControl(_))
.asJava
.asInstanceOf[Source[CommittableMessage[K, V], Control]]
}
}
Loading

0 comments on commit 557927a

Please sign in to comment.