-
Notifications
You must be signed in to change notification settings - Fork 387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create committableSink
as combination of Producer.flexiFlow an…
#932
Conversation
As an alternative to this, the combination could be pre-packaged for now and get combined implementation with a specific stage later. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As an alternative to this, the combination could be pre-packaged for now and get combined implementation with a specific stage later.
I think this is the best path forward. Also, I noticed there are some deprecated Producer.committerSink
methods from 1.0-RC1
. Are these safe to remove in 2.0
?
committingSink
as combination of Producer.flexiFlow and Committer.sink
Re-targeted this PR to introduce new sinks for the produce-and-commit combination. The existing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will we remove the committableSinks
methods in 2.0? I think all the options and similar names will get confusing for users. They've been deprecated since before 1.0 final, so it's probably safe to remove them right?
@@ -152,6 +153,116 @@ object Producer { | |||
producer: org.apache.kafka.clients.producer.Producer[K, V] | |||
): Sink[Envelope[K, V, ConsumerMessage.Committable], CompletionStage[Done]] = committableSink(settings, producer) | |||
|
|||
/** | |||
* Create a sink that is aware of the [[ConsumerMessage.CommittableOffset committable offset]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo [[ConsumerMessage.CommittableOffset committable offset]]
. The other new factory methods in the Scala and Java DSLs have the same typo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not a typo a link to a class or method can use any text, but I figured these links should link to Committable
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. I misunderstood the scaladocs syntax.
* Note that there is a risk that something fails after publishing but before | ||
* committing, so it is "at-least once delivery" semantics. | ||
*/ | ||
def sinkWithOffsetContext[K, V, IN <: Envelope[K, V, _], C <: CommittableOffset]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it be called committingSinkWithOffsetContext
? I know it's a mouthful..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed to sinkWithCommitting
and sinkWithCommittingOffsetContext
instead.
I had to upgrade to Scala 2.13.1 as scaladoc hit some bug. |
committingSink
as combination of Producer.flexiFlow and Committer.sinksinkWithCommitting
as combination of Producer.flexiFlow and Committer.sink
a4cfdd4
to
9295c3e
Compare
sinkWithCommitting
as combination of Producer.flexiFlow and Committer.sinkcommittableSink
as combination of Producer.flexiFlow and Committer.sink
Ok, @seglo and I figured |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
committableSink
as combination of Producer.flexiFlow and Committer.sinkcommittableSink
as combination of Producer.flexiFlow an…
Purpose
For the common consume-produce flow, a combination of producer and committer is useful. It conceptually consists of a
Producer.flexiFlow
with aCommitter.sink
, but could be implemented as a specific sink stage later.Changes
committableSink
which combinesflexiFlow
withCommitter.sink
committableSinkWithOffsetContext
which combinesflowWithOffsetContext
withCommitter.sinkWithOffsetContext
(but is implemented simpler than that)Background Context
The committer uses commit batching internally while the producer
committableSink
sends commits one-by-one.