Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize Producer sendFromQueue implementation #1326

Merged
merged 2 commits into from
Sep 17, 2024

Conversation

ytalashko
Copy link
Contributor

@ytalashko ytalashko commented Sep 14, 2024

From java's producer send method doc: Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or they will delay the sending of messages from other threads.
I believe we should try to keep callback implementation fast, but with recent changes we added to provide proper error responses, we make the callback slower. In general we are talking approximately about 1-3 microseconds (current master) instead of 0,1-0,2 microseconds (version 2.8.2) for single callback execution time on my laptop. I believe it was reasonable, but at the same time wanted to keep implementation fast, and provided the fastest version I come up with at that time.
Now, I got with an idea of faster implementation (with keeping the same proper error semantics). With this implementation single callback execution takes approximately 0,05-0,2 microseconds (even a bit faster than version 2.8.2).

// which guarantees sentResults.update executed on the latest updated version of sentResults
// and currently updated version of sentResults
// will be visible to the next sentResults read or update called after volatile variable read
sentResults.update(resultIndex, sentResult)
Copy link
Collaborator

@erikvanoosten erikvanoosten Sep 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since sentResults is an Array, we can assign to any index, as long as we know there is a single writer for each index (which is the case). The only thing that needs to be multi-thread protected is sentRecordsCounter but that is now an AtomicInteger, so calling increaseAndGet would be sufficient, no while loop needed.

Or did I miss something?

Copy link
Contributor Author

@ytalashko ytalashko Sep 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you are right Erik, thats awesome 🔥
Many thanks for the suggestion!
I was afraid that without having the latest updated version of sentResults during current update, we may run into issue with losing concurrent update result to some other index (similar to word tearing), but from some googling I can see that JVM handles this, and it is safe to do as you suggesting.

@erikvanoosten
Copy link
Collaborator

Nice work! I think it can be simplified further (see the other comment).

@ytalashko
Copy link
Contributor Author

Nice work! I think it can be simplified further (see the other comment).

Thanks!
Yeah, did so

}
)
}
val _ = runtime.unsafe.run(done.succeed(sentResultsChunk))
Copy link
Contributor Author

@ytalashko ytalashko Sep 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inlined exec here, as it was the only place of usage

Copy link
Collaborator

@erikvanoosten erikvanoosten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good work!

@erikvanoosten erikvanoosten merged commit 956c2f3 into zio:master Sep 17, 2024
14 checks passed
@ytalashko ytalashko deleted the optimize-producer-send branch September 17, 2024 17:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants