Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create committableSink as combination of Producer.flexiFlow an… #932

Merged
merged 9 commits into from
Oct 17, 2019

Conversation

ennru
Copy link
Member

@ennru ennru commented Oct 15, 2019

Purpose

For the common consume-produce flow, a combination of producer and committer is useful. It conceptually consists of a Producer.flexiFlow with a Committer.sink, but could be implemented as a specific sink stage later.

Changes

  • Add committableSink which combines flexiFlow with Committer.sink
  • Add committableSinkWithOffsetContext which combines flowWithOffsetContext with Committer.sinkWithOffsetContext (but is implemented simpler than that)

Background Context

The committer uses commit batching internally while the producer committableSink sends commits one-by-one.

@ennru
Copy link
Member Author

ennru commented Oct 15, 2019

As an alternative to this, the combination could be pre-packaged for now and get combined implementation with a specific stage later.

Copy link
Contributor

@seglo seglo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an alternative to this, the combination could be pre-packaged for now and get combined implementation with a specific stage later.

I think this is the best path forward. Also, I noticed there are some deprecated Producer.committerSink methods from 1.0-RC1. Are these safe to remove in 2.0?

@ennru ennru changed the title Recommend Producer.flexiFlow with Committer.sink Create committingSink as combination of Producer.flexiFlow and Committer.sink Oct 15, 2019
@ennru
Copy link
Member Author

ennru commented Oct 15, 2019

Re-targeted this PR to introduce new sinks for the produce-and-commit combination.

The existing committableSinks could be deprecated now.

Copy link
Contributor

@seglo seglo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we remove the committableSinks methods in 2.0? I think all the options and similar names will get confusing for users. They've been deprecated since before 1.0 final, so it's probably safe to remove them right?

@@ -152,6 +153,116 @@ object Producer {
producer: org.apache.kafka.clients.producer.Producer[K, V]
): Sink[Envelope[K, V, ConsumerMessage.Committable], CompletionStage[Done]] = committableSink(settings, producer)

/**
* Create a sink that is aware of the [[ConsumerMessage.CommittableOffset committable offset]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo [[ConsumerMessage.CommittableOffset committable offset]]. The other new factory methods in the Scala and Java DSLs have the same typo.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a typo a link to a class or method can use any text, but I figured these links should link to Committable instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I misunderstood the scaladocs syntax.

* Note that there is a risk that something fails after publishing but before
* committing, so it is "at-least once delivery" semantics.
*/
def sinkWithOffsetContext[K, V, IN <: Envelope[K, V, _], C <: CommittableOffset](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be called committingSinkWithOffsetContext? I know it's a mouthful..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed to sinkWithCommitting and sinkWithCommittingOffsetContext instead.

@ennru
Copy link
Member Author

ennru commented Oct 16, 2019

I had to upgrade to Scala 2.13.1 as scaladoc hit some bug.

@ennru ennru changed the title Create committingSink as combination of Producer.flexiFlow and Committer.sink Create sinkWithCommitting as combination of Producer.flexiFlow and Committer.sink Oct 16, 2019
@ennru ennru force-pushed the produce-and-commit-docs branch from a4cfdd4 to 9295c3e Compare October 16, 2019 15:59
@ennru ennru changed the title Create sinkWithCommitting as combination of Producer.flexiFlow and Committer.sink Create committableSink as combination of Producer.flexiFlow and Committer.sink Oct 16, 2019
@ennru
Copy link
Member Author

ennru commented Oct 16, 2019

Ok, @seglo and I figured committableSink overloads are the best naming alternative we see right now.

Copy link
Contributor

@seglo seglo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@ennru ennru changed the title Create committableSink as combination of Producer.flexiFlow and Committer.sink Create committableSink as combination of Producer.flexiFlow an… Oct 17, 2019
@ennru ennru merged commit bd3a3c4 into akka:master Oct 17, 2019
@ennru ennru deleted the produce-and-commit-docs branch October 17, 2019 12:41
@ennru ennru added this to the 1.1.1 milestone Oct 17, 2019
@ennru ennru mentioned this pull request Oct 19, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants