-
Notifications
You must be signed in to change notification settings - Fork 141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Consumer.commit
and Consumer.commitOrRetry
methods
#1022
Conversation
@@ -31,10 +31,10 @@ private[consumer] final class RunloopAccess private ( | |||
) { | |||
|
|||
private def withRunloopZIO[E]( | |||
requireRunning: Boolean | |||
shouldStartIfNot: Boolean |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sorry @erikvanoosten but I really disliked the name you used. I prefer the one I was using. IMO, it's more explicit about what this boolean is controlling
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😄 My major gripes with shouldStartIfNot
is that it is like an unfinished sentence. If not..., if not what?
That is how I went to shouldStartIfNotRunning
and from there to requireRunning
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, it's "should start if not started" but there's IMO no need to repeat the "start" word with "started"
52c8619
to
86498ee
Compare
Consumer.commit
methodConsumer.commit
and Consumer.commitOrRetry
methods
If I understand the code right, after this change one needs a I think we should keep the notion of Or did I miss something? |
👍 |
We agree on the need not to cache the records and to only cache the minimum information needed to avoid explode memory usage 🙂 This PR is just a first step towards the goal I want to achieve These 2 functions I add here are only here to commit one record at a time - if it's what the user needs - replacing, by terms, the Replacing the trait Consumer {
// I was discussing this idea in Discord some time ago
// see: https://discord.com/channels/629491597070827530/629497941719121960/1144576343330263080
/**
* The `UIO` represents the action of putting the Record(s) in the "commit batch"
* The `Task` represents the action of commiting the batch to Kaka
*/
def commitAccumBatch(commitSchedule: Schedule)(record: Record): UIO[Task[Unit]]
def commitAccumBatch(commitschedule: Schedule)(records: Chunk[Record]): UIO[Task[Unit]]
} so the user will replace the aggregation he/she does on his side by calling one of these functions which will do the aggregation and the commits on our side. Doing the aggregation on our side means that we'll have access to the "list of pending commits" inside zio-kafka, which will help us fix the deadlock some people are experiencing (ie. #852) Do you see what I wanna do? What do you think about it? Maybe it's better to implement everything before merging these new functions so that we can bring the complete new interface and deprecate the old one all at once, and so we can ensure the new interface is sound and coherent. I'll try to make a little POC this afternoon (nothing prod-ready, just to demonstrate the idea) Edit: Edit 2: Edit 3: Edit 4: |
8134243
to
f065200
Compare
Do I understand that correctly that you want the users to add offsets to a commit batch, and then when they think it is time, they can complete the batch, effectively committing it for real? Yeah, that might work. In fact, I like that idea. If the user indicates that an offset needs to be committed at some point, we know we have to wait for it while closing the consumer (or the partition). However, I'd prefer we do not smash up committing and this new concept of intending to commit. Can we keep separate API methods for that? |
How shall we move forward with the (currently) 4 open PRs related to the commit interface? Can we summarize the interface changes we would like to see and break it down into small potential PRs? |
First step toward removing the
Offset
trait atrocity and the current OOP orientation of the commit interface proposed by zio-kafka (ie.record.offset.commit
. This is non sense which pisses me off TBH)