Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternative producer implementation (again) #1311

Closed
wants to merge 4 commits into from

Conversation

erikvanoosten
Copy link
Collaborator

@erikvanoosten erikvanoosten commented Aug 24, 2024

Refactoring of the producer so that it handles errors per record.

These changes were part of 2.8.1 (via #1285 and #1272) but were reverted in 2.8.2 (via #1304) after reports of performance regression. This is a retry to make it easier to run the benchmarks and make further improvements.

This reverts commit 3393fbf, and then fixes the performance issue.

--
Improvements compared to previous attempt of the alternative implementation:

  • Iterate over promises in reverse order
    It is likely that the last promise completes last. If that is the case, we only wait once, the other promises are already completed and awaiting them has no overhead.
  • Restore async behavior
    SendChunk is no longer awaiting the completion of sending the records. Instead it completes the promise in the background. This allows the producer stream to start sending the next chunk as soon as Kafka accepted the previous chunk. This restores behavior similar to 2.8.0.

@erikvanoosten erikvanoosten changed the base branch from master to producer-benchmark August 24, 2024 13:31
Base automatically changed from producer-benchmark to master August 25, 2024 07:12
@svroonland
Copy link
Collaborator

I wonder if we should try to reproduce the regression of 2.8.1 first before attempting another improvement, what do you think?

@erikvanoosten
Copy link
Collaborator Author

I wonder if we should try to reproduce the regression of 2.8.1 first before attempting another improvement, what do you think?

Yes, I agree, we need to do that, but then IMHO we can continue to check the improvement as well. There are actually a lot of variables that we can check. E.g. compared to zio-kafka 2.8.2:

  • no changes (the baseline)
  • zio-kafka 2.8.0 (a check to validate that the other changes from 2.8.0 to 2.8.1 did not affect performance)
  • zio-kafka 2.8.1, the new producer
  • the new producer with the tweak (this PR, but rebased on 2.8.2)
  • upgrade zio to 2.1.8
  • upgrade kafka to 3.8.0

I am a bit concerned that the current benchmark is not sufficient as it doesn't show a significant difference so far. However, we will only know for sure that this is a problem after we've tested all the variations above.

I hope you have the time to execute on this. I currently do not.

Comment on lines -480 to -495
val _ = p.send(
rec,
(metadata: RecordMetadata, err: Exception) =>
Unsafe.unsafe { implicit u =>
exec {
if (err != null) res(idx) = Left(err)
else res(idx) = Right(metadata)

if (count.incrementAndGet == length) {
exec {
runtime.unsafe.run(done.succeed(Chunk.fromArray(res))).getOrThrowFiberFailure()
}
}
}
}
)
Copy link
Contributor

@ytalashko ytalashko Aug 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @svroonland, @erikvanoosten, I think I found the 'cause of regression:
p.send( is a method for asynchronous record sending, e.g. it not blocks until the record is sent to the broker, it gives a callback for that (or returns java's future).
So, in the current implementation (in master branch, NOT in this PR) sendFromQueue stream is not actually awaits the chunk to be sent to the broker, but rather only dispatches chunk, and the actual awaiting is done by the caller of produce, produceChunk, ... ('cause done.succeed called in the p.send callback). Which is great, 'cause it handles parallel sending well.
The changes in this PR are making sendFromQueue stream to actually await the chunk to be sent to the broker, which breaks this ability to truly handle parallel sending, 'cause now sending is performed chunk by chunk.

This sendFromQueue implementation (from master branch) is really seem fast and readable (at least to me).
I would love if we could keep the internals of this sendFromQueue implementation, as it seem to be fast (again), which is super important for such piece of code (per me).
I mean, if there is a nice API for users, it is really don't matter if the internals are "ugly" or not, the performance & readability of those internals are of a higher value (per me).
Wdyt?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that makes perfect sense, we totally missed that.

Agreed that we we need to keep it fast or make it faster, we will definitely make sure that any changes we make to this piece of code do not affect the performance in a negative way.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ytalashko. With that information a fix is easy. I'll push it shortly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix: 9149479 🤞

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it works!
The last 2 builds in this graph are from this PR. The last one has the fix, the one before does not.

afbeelding

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool :) What does it look like compared to 2.8.2?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the graph above we see that this PR is a little bit slower than master. However, there is a lot of variance between runs so it is too early to tell whether this is a significant regression. But for sure, this PR is now much better than 2.8.1!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool :) What does it look like compared to 2.8.2?

2.8.2 is not in the graph, but it should be similar to the first 3 runs in the graph.

svroonland pushed a commit that referenced this pull request Aug 26, 2024
… implementation (#1315)

Minor change (alignment), in case we gonna keep current
`Producer.sendFromQueue` implementation
(#1311 (comment)),
otherwise, this PR can be closed.
Refactoring of the producer so that it handles errors per record.

These changes were part of 2.8.1 (via #1285 and #1272) but were reverted in 2.8.2 (via #1304) after reports of performance regression. This is a retry to make it easier to run the benchmarks and make further improvements.

This reverts commit 3393fbf.
It is likely that the last promise completes last. If that is the case, we only wait once, the other promises are already completed and awaiting them has no overhead.
@ytalashko
Copy link
Contributor

Out of curiosity:
Erik, could you, please, help me with clarifications about the purpose/benefits of those changes?

record,
(metadata: RecordMetadata, err: Exception) =>
unsafeRun {
if (err == null) done.succeed(metadata)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of creating this many promises, alternatively we could create a Ref or even TRef and complete a single promise when the count is expected. That would be the ZIO variant of the AtomicInteger we had before.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice idea. We'd still need an additional fiber though.

Another idea is that we use what is on master but continue calling send for the following records, even when there is an error. That way we also get accurate error reporting for each record. (Also see this #1311 (comment).)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of creating this many promises, alternatively we could create a Ref or even TRef and complete a single promise when the count is expected. That would be the ZIO variant of the AtomicInteger we had before.

This sounds really nice

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another idea is that we use what is on master but continue calling send for the following records, even when there is an error. That way we also get accurate error reporting for each record. (Also see this #1311 (comment).)

Maybe we can retry the send call errors (something mentioned in #1311 (comment)).

That way we also get accurate error reporting for each record.

Yeah, this is something missing right now in master, would be great to have it

continue calling send for the following records, even when there is an error.

This can lead to broken order of produced messages, maybe it is better not to produce new records from the chunk if some is failed, wdyt?

@erikvanoosten
Copy link
Collaborator Author

Out of curiosity: Erik, could you, please, help me with clarifications about the purpose/benefits of those changes?

It all started with a user that reported that spurious AuthenticationExceptions/AuthorisationExceptions are actually transient and that the request can be retried (both for consuming and producing). Spring-boot does this, so why not zio-kafka?! To be honest, I think it is a bug of the broker that this can happen but 🤷

Meanwhile, we did implement this for the consumer, and we are now trying to do this for the producer. The producer however is extremely tricky. The current producer (on master) has very weird semantics. It is very unclear where an error comes from: it can be from the call to send, from the send's callback, or from the call to send of another message in the same batch. This makes it impossible to know whether and which records can be retried after such a transient exception. The solution in this branch does not have this problem because it accurately returns the result of each send invocation.

Whatever we do, we should be careful to not let performance regress to much for this weird edge case. I am not fully convinced yet that the solution in this PR fits that bill (we have more fibers and performance for single records is not good). Hopefully we'll get more ideas to decrease the gap. Otherwise, IMHO we will have to declare this as 'not possible'. (One more solution: support alternative producers but I'd rather not go there..)

* therefore this stream is run on the blocking thread pool
* Currently sending has the following characteristics:
* - You can submit many chunks, they get buffered in the send queue.
* - A chunk only gets send after the previous chunk completes (completes means that the callbacks for each record
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer true, is it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote that before we learned better, it has never been true.

@ytalashko
Copy link
Contributor

ytalashko commented Aug 30, 2024

Erik, thanks a lot for such a descriptive clarification!
Sorry for taking long to reply, just got a chance to come back here.
Just wanna share my thoughts on the spurious AuthenticationExceptions/AuthorisationExceptions:

First of all (not to say we should not tackle the issue, just to highlight in case solution is too costly), unlike with Consumer, for producing data, library users actually have the ability to tackle this on their own if they run into the issue:
Send records one by one and retry in case of any errors which you they retry.
It may be not ideal, but not far, from what can be implemented on the library side (from my perspective), let's come back to this a bit down the writing.

It is very unclear where an error comes from: it can be from the call to send, from the send's callback, or from the call to send of another message in the same batch.

As mentioned error can come from few places.
The simpler case (seems like simpler to me), if error occurs during the send method call, which typically just adds record to the producer in-memory buffer. The send method call can be retried on top changes in this PR, or even without changing current (master) sendFromQueue implementation structure, with approximately something like:

ZIO.suspendSucceed {
  val count: AtomicInteger            = new AtomicInteger
  ...
  ZIO.<foreachDiscard/loop> {
    ZIO.attempt(p.send).retry(...)
  }
}

Basically, to shift handling errors ("trying") down to the record level, instead of chunk level as it is done now (inside sendFromQueue implementation).
In case, this implementation seems like something we want to try/have, I can even work on that, and give a draft with how it can look like.
Overall, retries for this case are something that can be helpful for library users.

More complex case if error comes into callback. I believe, it is pretty tricky to apply retries to this error.
Since callback is called asynchronously, we don't know at the moment of it's call what's going on with records sent after the one we are getting error for, we don't even know if we send them already. In case we gather results from callbacks on all sent records in chunk, it can be that error happened in between the successful responses. In both cases retries may or will potentially 'cause ordering issues for the records being sent, which is something, I believe, we wanna avoid.
Taking this into account, I think, preferably, such retries should be performed by the library users knowing what and how they are producing.

Send records one by one and retry in case of any errors which you wanna retry.
It may be not ideal, but not far, from what can be implemented on library side (from my perspective), let's come back to this a bit down the writing.

Coming back to this, with the thoughts on callback error, it just seems to me, like the library cannot provide "full retries" with keeping ordering guarantees for this and any errors giving the produceChunk like API (basically, giving any async like sending functionality). Maybe Im just lacking the knowledge on when and how this error can occur, and maybe if it occurs on some record for a topic, we can assume that all other records asynchronously sent after will also fail with the same error 🤷‍♂️. Im curious how Spring-boot handles this.

Hope, this thoughts are of any help.
In case something is unclear, let me know, and I'll elaborate.

@erikvanoosten
Copy link
Collaborator Author

erikvanoosten commented Aug 31, 2024

@ytalashko I see you understand the problem quite well. Let's break it down.

Ordering

You make some good points about whether the library should allow re-ordering. IMHO that should not be a problem as long as we document this carefully. (The same is already true for the Java Kafka library that zio-kafka uses re. retries.) However, again IMHO, it is equally valid to document how to handle Auth* exceptions in user code, for example in the way you propose. I quite like that because complicating the library a lot for those few users that are affected might be overkill.

More precise errors

You make a good case for giving the user more precise errors. I fully agree, we could/should improve this!

In master, when calling send immediately fails, all following records are skipped (not send), and the resulting error is used as the result for all records, even for those records where send did succeed!

In this PR we fix this by calling send for every record, regardless of earlier failures in the batch. But I found another way which I think is better: As soon as send fails, we stop sending the following records, but we will use the results of the callbacks for the earlier records. In addition, we should let the user know exactly which records succeeded, which failed and which were not sent.
But then we need to think about how to communicate this to the user. The API returns an Either[Throwable, RecordMetadata] for every record, how do we encode that a record was not sent, ideally without changing the api? (One idea is to introduce a NotSentDueToEarlierErrorsException, but perhaps there are better ideas.)

Implementation

Then on the topic of how to write this. Performance is important for us, we don't care if we use ZIO properly, it has to go fast and look like zio code from the outside. (Of course we will use ZIO properly when the performance is about equal.) The current producer implementation on master is a good example of optimized low level code (I still wonder who came up with it 🙂 ). The implementation in this PR on the other hand is much nicer to look at but has a small potential overhead, mainly for completing the 'done' promise (using Promise.completeWith, does it still need a fiber? I don't know).

However, this PR does not implement the idea of the previous section (to stop sending after an error).

Perhaps this low level code works (its just a rough idea, can for sure be improved but I don't have time for that now):

   ZIO.suspendSucceed {
     var sentCount = 0
     var errorSeen = false
     val results = Array.ofDim(records.size)
     records.zipWithIndex.foreach { (i, r) =>
       if (errorSeen) {
         results(i) = Left(NotSentDueToEarlierErrorsException)
       } else {
         val result = try {
            p.send(callback { results(i) = if (error) Left(e) else Right(metadata); if (errorSeen && sentCount == i) done.complete(results) })
            sentCount += 1
         } catch {
            case NonFatal(e) =>
              errorSeen = true
              results(i) = Left(e)
         }
       }
     }
    if (sentCount == 0) done.complete(results)
  }

This doesn't work yet, there is a big race condition on sentCount that causes done.complete to be not called. Anyways, that is probably something that can be resolved.

If you want to have a go at it, please do 🙏 !
We have proper benchmarks now, so we can compare solutions 👍

@ytalashko
Copy link
Contributor

Thanks for the reply, Erik! I feel on-board with what you are saying.

MHO that should not be a problem as long as we document this carefully. (The same is already true for the Java Kafka library that zio-kafka uses re. retries.)

Yeah, this makes sense, if users know what they are doing, it is good (it just probably should not be a default configuration).

But then we need to think about how to communicate this to the user. The API returns an Either[Throwable, RecordMetadata] for every record, how do we encode that a record was not sent, ideally without changing the api? (One idea is to introduce a NotSentDueToEarlierErrorsException, but perhaps there are better ideas.)

I like the idea with NotSentDueToEarlierErrorsException, I actually had the same idea in mind (maybe it is part of the reason I like this idea), just without a good name for the error 🙂

Performance is important for us, we don't care if we use ZIO properly, it has to go fast and look like zio code from the outside. (Of course we will use ZIO properly when the performance is about equal.)

This is exactly what I also have in mind

If you want to have a go at it, please do

Sure, the initial version is here #1321

@erikvanoosten
Copy link
Collaborator Author

Note: development of this PR is on hold because #1321 looks like a much more promising approach of the problem.

1 similar comment
@erikvanoosten
Copy link
Collaborator Author

Note: development of this PR is on hold because #1321 looks like a much more promising approach of the problem.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants