-
Notifications
You must be signed in to change notification settings - Fork 387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Committing producer sink: producer and committer in a sink stage #963
Conversation
50aa796
to
05c7509
Compare
The same test fails on Scala 2.13 for JDK 8 and 11 on Travis as the messages appear in a different order. Locally it works nicely. I need to dig a bit. |
b5d348f
to
76c8a96
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The CommittingProducerSinkStageLogic
is very clean, I like it. The only concern I have is the duplication of code between it and DefaultProducerStageLogic
. Also, I think we try running the benchmarks on this to see how much gain it gives us over the current impl.
private final class CommittingProducerSinkStageLogic[K, V, IN <: Envelope[K, V, Committable]]( | ||
stage: CommittingProducerSinkStage[K, V, IN], | ||
inheritedAttributes: Attributes | ||
) extends TimerGraphStageLogic(stage.shape) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally, we would consolidate CommittingProducerSinkStageLogic
with DefaultProducerStageLogic
to DRY up all the async producer and producing logic. Is that the eventual goal?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I isolated the async producer creation. For producing I don't see a good case.
tests/src/test/scala/akka/kafka/internal/CommittingProducerSinkSpec.scala
Show resolved
Hide resolved
core/src/main/scala/akka/kafka/internal/CommittingProducerSinkStage.scala
Outdated
Show resolved
Hide resolved
private val collectOffsetCb: AsyncCallback[Committable] = getAsyncCallback[Committable] { offset => | ||
collectOffset(1, offset) | ||
} | ||
|
||
private val collectOffsetMultiCb: AsyncCallback[(Int, Committable)] = getAsyncCallback[(Int, Committable)] { | ||
case (count, offset) => | ||
collectOffset(count, offset) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems these could be combined into a single async callback with the functionality of collectOffsetMultiCb
, but with the name collectOffsetCb
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They could, I want to avoid the tuple creation for the most used case of just a single offset.
else if (isClosed(stage.in) && awaitingProduceResult == 0L) commit("upstream closed") | ||
} | ||
|
||
private def commit(triggeredBy: String): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the triggeredBy
feature for traceability of a commit. Should we use types instead? It would be less error-prone to use with testing, though it doesn't look like there are tests that assert the triggeredBy
of a commit.
control.drainAndShutdown().futureValue shouldBe Done | ||
} | ||
|
||
it should "produce, and commit when batch size is reached" in assertAllStagesStopped { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can we tell that the commit was triggered by the interval or reaching the batch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The interval is 10 s so we can be sure the batch size of 2 is hit before that.
|
||
val elements = immutable.Seq( | ||
consumer.message(partition, "value 1"), | ||
consumer.message(partition, "value 2") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider sending more messages than the batch size to assert batching works as expected.
val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer) | ||
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) | ||
.withProducer(producer) | ||
val commitInterval = 5.seconds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test asserts that the completion performed the commit because the commit interval will never happen in time? I suggest adding a comment here and the other tests that rely on this trick if that's the case (it's not obvious to me).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comments.
commitMsg.offsetAndMetadata.offset() shouldBe (consumer.startOffset + 1) | ||
consumer.actor.reply(Done) | ||
|
||
// TODO how should not getting a produce callback be handled? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure. I didn't see an obvious way in MockProducer
. This test seems to only assert that the 2nd commit never is acknowledged (instead a CommitTimeoutException
occurs), correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I figured asserting produce callback would require a timer for each message, the missing commit callback would fail it eventually anyway.
tests/src/test/scala/akka/kafka/internal/CommittingProducerSinkSpec.scala
Show resolved
Hide resolved
f050c56
to
81b3f90
Compare
I do not expect a real difference in the benchmarks. The Kafka overhead will hide most of the benefits, I'm afraid. |
81b3f90
to
1777758
Compare
The benchmarks show a quite relevant improvement. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Is there any reason to add/update docs content for this?
No, this is an internal concern and the |
Purpose
Introduce a sink stage which produces to Kafka, batches commit offsets and commits.
References
References #932
Changes
CommittingProducerSinkStage
Background Context
Kafka to Kafka flows with a committable source, a producer and a committer are a quite common setup which became even simpler with the
Producer.committableSink
s added in #932. This new stage replaces that implementation with this all-in-one stage.