Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Committing producer sink: producer and committer in a sink stage #963

Merged
merged 10 commits into from
Nov 27, 2019

Conversation

ennru
Copy link
Member

@ennru ennru commented Nov 5, 2019

Purpose

Introduce a sink stage which produces to Kafka, batches commit offsets and commits.

References

References #932

Changes

  • New CommittingProducerSinkStage
  • Unit tests running without a Kafka instance
  • Change some places to inject the producer via settings instead

Background Context

Kafka to Kafka flows with a committable source, a producer and a committer are a quite common setup which became even simpler with the Producer.committableSinks added in #932. This new stage replaces that implementation with this all-in-one stage.

@ennru
Copy link
Member Author

ennru commented Nov 18, 2019

The same test fails on Scala 2.13 for JDK 8 and 11 on Travis as the messages appear in a different order. Locally it works nicely. I need to dig a bit.

@ennru ennru self-assigned this Nov 18, 2019
@ennru ennru force-pushed the committing-producer-sink branch from b5d348f to 76c8a96 Compare November 19, 2019 10:16
@seglo seglo self-requested a review November 21, 2019 15:53
Copy link
Contributor

@seglo seglo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CommittingProducerSinkStageLogic is very clean, I like it. The only concern I have is the duplication of code between it and DefaultProducerStageLogic. Also, I think we try running the benchmarks on this to see how much gain it gives us over the current impl.

private final class CommittingProducerSinkStageLogic[K, V, IN <: Envelope[K, V, Committable]](
stage: CommittingProducerSinkStage[K, V, IN],
inheritedAttributes: Attributes
) extends TimerGraphStageLogic(stage.shape)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, we would consolidate CommittingProducerSinkStageLogic with DefaultProducerStageLogic to DRY up all the async producer and producing logic. Is that the eventual goal?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I isolated the async producer creation. For producing I don't see a good case.

Comment on lines +167 to +149
private val collectOffsetCb: AsyncCallback[Committable] = getAsyncCallback[Committable] { offset =>
collectOffset(1, offset)
}

private val collectOffsetMultiCb: AsyncCallback[(Int, Committable)] = getAsyncCallback[(Int, Committable)] {
case (count, offset) =>
collectOffset(count, offset)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems these could be combined into a single async callback with the functionality of collectOffsetMultiCb, but with the name collectOffsetCb

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They could, I want to avoid the tuple creation for the most used case of just a single offset.

else if (isClosed(stage.in) && awaitingProduceResult == 0L) commit("upstream closed")
}

private def commit(triggeredBy: String): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the triggeredBy feature for traceability of a commit. Should we use types instead? It would be less error-prone to use with testing, though it doesn't look like there are tests that assert the triggeredBy of a commit.

control.drainAndShutdown().futureValue shouldBe Done
}

it should "produce, and commit when batch size is reached" in assertAllStagesStopped {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we tell that the commit was triggered by the interval or reaching the batch?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The interval is 10 s so we can be sure the batch size of 2 is hit before that.


val elements = immutable.Seq(
consumer.message(partition, "value 1"),
consumer.message(partition, "value 2")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider sending more messages than the batch size to assert batching works as expected.

val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
val commitInterval = 5.seconds
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test asserts that the completion performed the commit because the commit interval will never happen in time? I suggest adding a comment here and the other tests that rely on this trick if that's the case (it's not obvious to me).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comments.

commitMsg.offsetAndMetadata.offset() shouldBe (consumer.startOffset + 1)
consumer.actor.reply(Done)

// TODO how should not getting a produce callback be handled?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure. I didn't see an obvious way in MockProducer. This test seems to only assert that the 2nd commit never is acknowledged (instead a CommitTimeoutException occurs), correct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I figured asserting produce callback would require a timer for each message, the missing commit callback would fail it eventually anyway.

@ennru ennru force-pushed the committing-producer-sink branch from f050c56 to 81b3f90 Compare November 22, 2019 09:59
@ennru
Copy link
Member Author

ennru commented Nov 22, 2019

I do not expect a real difference in the benchmarks. The Kafka overhead will hide most of the benefits, I'm afraid.
This implementation is much lower in allocation and synchronisation costs, though.

@ennru ennru force-pushed the committing-producer-sink branch from 81b3f90 to 1777758 Compare November 26, 2019 10:05
@ennru
Copy link
Member Author

ennru commented Nov 27, 2019

The benchmarks show a quite relevant improvement.

@seglo seglo self-requested a review November 27, 2019 14:55
Copy link
Contributor

@seglo seglo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Is there any reason to add/update docs content for this?

@ennru
Copy link
Member Author

ennru commented Nov 27, 2019

No, this is an internal concern and the committableSink factories are new in 2.0.0.

@ennru ennru added this to the 2.0.0 milestone Nov 27, 2019
@ennru ennru merged commit 7967fb0 into akka:master Nov 27, 2019
@ennru ennru deleted the committing-producer-sink branch November 27, 2019 18:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants