Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate committableSink #805

Closed
ennru opened this issue May 10, 2019 · 5 comments
Closed

Deprecate committableSink #805

ennru opened this issue May 10, 2019 · 5 comments
Milestone

Comments

@ennru
Copy link
Member

ennru commented May 10, 2019

committableSink uses per element committing which performs much worse than Committer.sink. The recommendation should be to use flexiFlow with Committer.sink instead.

@patricknoir
Copy link

Hi ennru, I also prefer the Committer.sink but right now I'm facing an issue as this is not supporting implementation of CommittableOffset. Digging through the code there is a pattern matching cast in the MessageBulder class (line 221 of v1.0.5) which expect CommittableOffset to be always the internal implementation. Any plan to fix this?

@ennru
Copy link
Member Author

ennru commented Aug 20, 2019

What other implementation of CommittableOffset are you expecting it to handle? Committable and CommittableOffset are not supposed to be extended outside of Alpakka Kafka.

@patricknoir
Copy link

Hi Ennru, apologies I have just seen the annotation regarding NoInheritance. To describe my use case, I have created an Envelope[P] message which extended your CommittableOffset to be able to detect when commitScaladsl() call is performed to do some other operations (i.e. : complete a span etc...). Find below code snippet (Open to suggestions)
`

trait Envelope[+P] extends Committable { self =>
val headers: Map[String, String]
val traceId:String
val payload: Either[Throwable, P]
val span: Span
...
override def commitScaladsl(): Future[Done] = {
val future = initCommittable.commitScaladsl()
future.onComplete {
case Failure(exception) =>
Kamon.runWithSpan(this.span) {
span.fail("producer.error", exception)
span.finish()
}
case Success(_) =>
Kamon.runWithSpan(span) {
span.finish()
}
}(ExecutionContext.global) //Use the global execution context as this is not a critical IO operation.
future
}
override def commitJavadsl(): CompletionStage[Done] = initCommittable.commitJavadsl()
override def batchSize: Long = initCommittable.batchSize

`

@ennru
Copy link
Member Author

ennru commented Aug 29, 2019

You may try to use the pass-through instead of extending Committable. As you found out, the implementation requires certain implementations to be used.

@ennru
Copy link
Member Author

ennru commented Oct 19, 2019

Fixed with #932

@ennru ennru closed this as completed Oct 19, 2019
@ennru ennru added this to the 1.1.1 milestone Oct 19, 2019
@ennru ennru modified the milestones: 1.1.1, 2.0.0 Jan 20, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants