-
Notifications
You must be signed in to change notification settings - Fork 141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Alternative producer implementation (again) #1311
Conversation
7bf9a73
to
9c8d259
Compare
9c8d259
to
e8070c7
Compare
I wonder if we should try to reproduce the regression of 2.8.1 first before attempting another improvement, what do you think? |
Yes, I agree, we need to do that, but then IMHO we can continue to check the improvement as well. There are actually a lot of variables that we can check. E.g. compared to zio-kafka 2.8.2:
I am a bit concerned that the current benchmark is not sufficient as it doesn't show a significant difference so far. However, we will only know for sure that this is a problem after we've tested all the variations above. I hope you have the time to execute on this. I currently do not. |
val _ = p.send( | ||
rec, | ||
(metadata: RecordMetadata, err: Exception) => | ||
Unsafe.unsafe { implicit u => | ||
exec { | ||
if (err != null) res(idx) = Left(err) | ||
else res(idx) = Right(metadata) | ||
|
||
if (count.incrementAndGet == length) { | ||
exec { | ||
runtime.unsafe.run(done.succeed(Chunk.fromArray(res))).getOrThrowFiberFailure() | ||
} | ||
} | ||
} | ||
} | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @svroonland, @erikvanoosten, I think I found the 'cause of regression:
p.send(
is a method for asynchronous record sending, e.g. it not blocks until the record is sent to the broker, it gives a callback for that (or returns java's future).
So, in the current implementation (in master branch, NOT in this PR) sendFromQueue
stream is not actually awaits the chunk to be sent to the broker, but rather only dispatches chunk, and the actual awaiting is done by the caller of produce
, produceChunk
, ... ('cause done.succeed
called in the p.send
callback). Which is great, 'cause it handles parallel sending well.
The changes in this PR are making sendFromQueue
stream to actually await the chunk to be sent to the broker, which breaks this ability to truly handle parallel sending, 'cause now sending is performed chunk by chunk.
This sendFromQueue
implementation (from master branch) is really seem fast and readable (at least to me).
I would love if we could keep the internals of this sendFromQueue
implementation, as it seem to be fast (again), which is super important for such piece of code (per me).
I mean, if there is a nice API for users, it is really don't matter if the internals are "ugly" or not, the performance & readability of those internals are of a higher value (per me).
Wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that makes perfect sense, we totally missed that.
Agreed that we we need to keep it fast or make it faster, we will definitely make sure that any changes we make to this piece of code do not affect the performance in a negative way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ytalashko. With that information a fix is easy. I'll push it shortly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix: 9149479 🤞
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool :) What does it look like compared to 2.8.2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the graph above we see that this PR is a little bit slower than master. However, there is a lot of variance between runs so it is too early to tell whether this is a significant regression. But for sure, this PR is now much better than 2.8.1!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool :) What does it look like compared to 2.8.2?
2.8.2 is not in the graph, but it should be similar to the first 3 runs in the graph.
… implementation (#1315) Minor change (alignment), in case we gonna keep current `Producer.sendFromQueue` implementation (#1311 (comment)), otherwise, this PR can be closed.
Refactoring of the producer so that it handles errors per record. These changes were part of 2.8.1 (via #1285 and #1272) but were reverted in 2.8.2 (via #1304) after reports of performance regression. This is a retry to make it easier to run the benchmarks and make further improvements. This reverts commit 3393fbf.
It is likely that the last promise completes last. If that is the case, we only wait once, the other promises are already completed and awaiting them has no overhead.
edc9be7
to
0a12d29
Compare
Out of curiosity: |
record, | ||
(metadata: RecordMetadata, err: Exception) => | ||
unsafeRun { | ||
if (err == null) done.succeed(metadata) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of creating this many promises, alternatively we could create a Ref or even TRef and complete a single promise when the count is expected. That would be the ZIO variant of the AtomicInteger we had before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice idea. We'd still need an additional fiber though.
Another idea is that we use what is on master but continue calling send for the following records, even when there is an error. That way we also get accurate error reporting for each record. (Also see this #1311 (comment).)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of creating this many promises, alternatively we could create a Ref or even TRef and complete a single promise when the count is expected. That would be the ZIO variant of the AtomicInteger we had before.
This sounds really nice
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another idea is that we use what is on master but continue calling send for the following records, even when there is an error. That way we also get accurate error reporting for each record. (Also see this #1311 (comment).)
Maybe we can retry the send
call errors (something mentioned in #1311 (comment)).
That way we also get accurate error reporting for each record.
Yeah, this is something missing right now in master, would be great to have it
continue calling send for the following records, even when there is an error.
This can lead to broken order of produced messages, maybe it is better not to produce new records from the chunk if some is failed, wdyt?
It all started with a user that reported that spurious AuthenticationExceptions/AuthorisationExceptions are actually transient and that the request can be retried (both for consuming and producing). Spring-boot does this, so why not zio-kafka?! To be honest, I think it is a bug of the broker that this can happen but 🤷 Meanwhile, we did implement this for the consumer, and we are now trying to do this for the producer. The producer however is extremely tricky. The current producer (on master) has very weird semantics. It is very unclear where an error comes from: it can be from the call to send, from the send's callback, or from the call to send of another message in the same batch. This makes it impossible to know whether and which records can be retried after such a transient exception. The solution in this branch does not have this problem because it accurately returns the result of each send invocation. Whatever we do, we should be careful to not let performance regress to much for this weird edge case. I am not fully convinced yet that the solution in this PR fits that bill (we have more fibers and performance for single records is not good). Hopefully we'll get more ideas to decrease the gap. Otherwise, IMHO we will have to declare this as 'not possible'. (One more solution: support alternative producers but I'd rather not go there..) |
* therefore this stream is run on the blocking thread pool | ||
* Currently sending has the following characteristics: | ||
* - You can submit many chunks, they get buffered in the send queue. | ||
* - A chunk only gets send after the previous chunk completes (completes means that the callbacks for each record |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is no longer true, is it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wrote that before we learned better, it has never been true.
Erik, thanks a lot for such a descriptive clarification! First of all (not to say we should not tackle the issue, just to highlight in case solution is too costly), unlike with Consumer, for producing data, library users actually have the ability to tackle this on their own if they run into the issue:
As mentioned error can come from few places. ZIO.suspendSucceed {
val count: AtomicInteger = new AtomicInteger
...
ZIO.<foreachDiscard/loop> {
ZIO.attempt(p.send).retry(...)
}
} Basically, to shift handling errors ("trying") down to the record level, instead of chunk level as it is done now (inside More complex case if error comes into callback. I believe, it is pretty tricky to apply retries to this error.
Coming back to this, with the thoughts on callback error, it just seems to me, like the library cannot provide "full retries" with keeping ordering guarantees for this and any errors giving the Hope, this thoughts are of any help. |
@ytalashko I see you understand the problem quite well. Let's break it down. OrderingYou make some good points about whether the library should allow re-ordering. IMHO that should not be a problem as long as we document this carefully. (The same is already true for the Java Kafka library that zio-kafka uses re. retries.) However, again IMHO, it is equally valid to document how to handle Auth* exceptions in user code, for example in the way you propose. I quite like that because complicating the library a lot for those few users that are affected might be overkill. More precise errorsYou make a good case for giving the user more precise errors. I fully agree, we could/should improve this! In master, when calling In this PR we fix this by calling ImplementationThen on the topic of how to write this. Performance is important for us, we don't care if we use ZIO properly, it has to go fast and look like zio code from the outside. (Of course we will use ZIO properly when the performance is about equal.) The current producer implementation on master is a good example of optimized low level code (I still wonder who came up with it 🙂 ). The implementation in this PR on the other hand is much nicer to look at but has a small potential overhead, mainly for completing the 'done' promise (using However, this PR does not implement the idea of the previous section (to stop sending after an error). Perhaps this low level code works (its just a rough idea, can for sure be improved but I don't have time for that now): ZIO.suspendSucceed {
var sentCount = 0
var errorSeen = false
val results = Array.ofDim(records.size)
records.zipWithIndex.foreach { (i, r) =>
if (errorSeen) {
results(i) = Left(NotSentDueToEarlierErrorsException)
} else {
val result = try {
p.send(callback { results(i) = if (error) Left(e) else Right(metadata); if (errorSeen && sentCount == i) done.complete(results) })
sentCount += 1
} catch {
case NonFatal(e) =>
errorSeen = true
results(i) = Left(e)
}
}
}
if (sentCount == 0) done.complete(results)
} This doesn't work yet, there is a big race condition on sentCount that causes done.complete to be not called. Anyways, that is probably something that can be resolved. If you want to have a go at it, please do 🙏 ! |
Thanks for the reply, Erik! I feel on-board with what you are saying.
Yeah, this makes sense, if users know what they are doing, it is good (it just probably should not be a default configuration).
I like the idea with
This is exactly what I also have in mind
Sure, the initial version is here #1321 |
Note: development of this PR is on hold because #1321 looks like a much more promising approach of the problem. |
1 similar comment
Note: development of this PR is on hold because #1321 looks like a much more promising approach of the problem. |
Refactoring of the producer so that it handles errors per record.
These changes were part of 2.8.1 (via #1285 and #1272) but were reverted in 2.8.2 (via #1304) after reports of performance regression. This is a retry to make it easier to run the benchmarks and make further improvements.
This reverts commit 3393fbf, and then fixes the performance issue.
--
Improvements compared to previous attempt of the alternative implementation:
It is likely that the last promise completes last. If that is the case, we only wait once, the other promises are already completed and awaiting them has no overhead.
SendChunk is no longer awaiting the completion of sending the records. Instead it completes the promise in the background. This allows the producer stream to start sending the next chunk as soon as Kafka accepted the previous chunk. This restores behavior similar to 2.8.0.