-
Notifications
You must be signed in to change notification settings - Fork 141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize Producer sendFromQueue implementation #1326
Conversation
// which guarantees sentResults.update executed on the latest updated version of sentResults | ||
// and currently updated version of sentResults | ||
// will be visible to the next sentResults read or update called after volatile variable read | ||
sentResults.update(resultIndex, sentResult) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since sentResults
is an Array, we can assign to any index, as long as we know there is a single writer for each index (which is the case). The only thing that needs to be multi-thread protected is sentRecordsCounter
but that is now an AtomicInteger
, so calling increaseAndGet
would be sufficient, no while loop needed.
Or did I miss something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, you are right Erik, thats awesome 🔥
Many thanks for the suggestion!
I was afraid that without having the latest updated version of sentResults
during current update
, we may run into issue with losing concurrent update result to some other index (similar to word tearing), but from some googling I can see that JVM handles this, and it is safe to do as you suggesting.
Nice work! I think it can be simplified further (see the other comment). |
Thanks! |
} | ||
) | ||
} | ||
val _ = runtime.unsafe.run(done.succeed(sentResultsChunk)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inlined exec
here, as it was the only place of usage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good work!
From java's producer send method doc:
Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or they will delay the sending of messages from other threads
.I believe we should try to keep callback implementation fast, but with recent changes we added to provide proper error responses, we make the callback slower. In general we are talking approximately about 1-3 microseconds (current master) instead of 0,1-0,2 microseconds (version 2.8.2) for single callback execution time on my laptop. I believe it was reasonable, but at the same time wanted to keep implementation fast, and provided the fastest version I come up with at that time.
Now, I got with an idea of faster implementation (with keeping the same proper error semantics). With this implementation single callback execution takes approximately 0,05-0,2 microseconds (even a bit faster than version 2.8.2).