Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rebalance causes some partition streams to be duplicated #280

Closed
ghostdogpr opened this issue Feb 10, 2021 · 5 comments · Fixed by #323
Closed

Rebalance causes some partition streams to be duplicated #280

ghostdogpr opened this issue Feb 10, 2021 · 5 comments · Fixed by #323

Comments

@ghostdogpr
Copy link
Member

When a rebalance happens, there is a chance that some partitions that are revoked and immediately re-assigned to the same consumer get duplicated (all messages received twice in the consumer stream).

Here's how to reproduce the problem (my code is not open source):

First add those 2 println in newPartitionStream when a partition stream is created then ended:

val stream = {
      ZStream.fromEffect(UIO(println(s"Start stream for $tp"))) *>
        ZStream {
          ZManaged.succeed {
            for {
              p      <- Promise.make[Option[Throwable], Chunk[ByteArrayCommittableRecord]]
              _      <- requestQueue.offer(Runloop.Request(tp, p)).unit
              _      <- diagnostics.emitIfEnabled(DiagnosticEvent.Request(tp))
              result <- p.await
            } yield result
          }
        }.ensuring(UIO(println(s"Finish stream for $tp")))
    }
  1. Create a topic with 300 partitions (more partitions = maximize the chance of running into the bug)
  2. Start a producer that produces a message every 20ms
  3. Start a node A consuming this topic => we see 300x Start stream for ...
  4. Start a node B consuming this topic (with the same consumer group) => we see 150x Start stream for ... on node B and 150x Finish stream for ... on node A ✅
  5. Stop B => we see 15x (this number vary, but it's not 150 as it should) Finish stream for ... on node A followed by 300x Start stream for ... on node A ⚠️

This seems to happen because endRevoked only stops partition streams that have a pending Request, which is not all of them. In normal cases, this is okay because those other streams are stopped by handleRequests when a new request is created and it realizes the partition is no longer assigned. But, in this particular case of rebalance, the partitions are reassigned right after being revoked so handleRequests doesn't help. Instead, a new partition stream is created and the old one keeps running too.

One way to solve this might be to defer the revoke/assign logic while we're in the middle of rebalancing? Not sure if it's safe enough though.

@ghostdogpr
Copy link
Member Author

ghostdogpr commented Feb 17, 2021

Trying to describe my understanding of the problem in more details: there are 2 lists of Runloop.Request: one in State (pendingRequests) and one in Runloop (requestQueue). Each Runloop.Request corresponds to a topic-partition stream.

When a topic-partition is revoked:

  • the requests in pendingRequests are cleaned up immediately by endRevoked and corresponding streams are stopped.
  • the requests in requestQueue are reinjected into pendingRequests but there is a time gap before that happens. When those requests are finally processed, we check current assignments to drop those that are no longer assigned (and corresponding streams are stopped). Those that are still assigned are added to pendingRequests.

When a new topic-partition is assigned, a new stream is created regardless of whether there was still a matching request in requestQueue. When such matching request is processed, it will not be deleted because the topic is currently assigned. From that point there are 2 streams for the same topic-partition.

@ghostdogpr
Copy link
Member Author

ghostdogpr commented Feb 18, 2021

I'm having a hard time reproducing the issue consistently. I corrected the previous message to reflect that the gap during which requestQueue requests are reinjected to pendingRequests doesn't depend on the consuming speed and is pretty short, even though it still exists.

Another case I suspect is when records eq null (which I guess happens when there are no items to pull), we don't revoke anything. If a partition is removed during that call to poll, it's not revoked and resubscribing would lead to duplicates.

To be the safest possible, should fulfillRequests check if a partition is requested multiple times and only fullfill the first one while stopping the others? Any other idea?

@iravid
Copy link
Member

iravid commented Mar 31, 2021

Folks, sorry for the delay here. I've not been able to find the time to fix this, but I am writing down how this should be fixed because we will work on this soon:

  • The problem is that at some point, the Runloop decides to start a new stream (emit a partition over the partitionQueue), even though there is already an existing stream for that partition.

Diagnosis

  • Currently we decide which partitions are newly assigned, and therefore should spawn new streams, by comparing the assignment before the call to poll to the assignment after the call to poll. (Runloop.scala:290)
  • This works well as long as we can accurately close the streams for revoked partitions.
  • The problem is that we only close streams based on if we have a pending request during doPoll. (Runloop.scala:295, call to endRevoked)
  • This mechanism is not conclusive because it's possible for a stream associated with a revoked partition to not have a pending request during poll, which means it'll live on alongside a newly emitted partition stream.
  • Another issue that @ghostdogpr noted is that when null is returned from poll, we don't do any revocations at all which is undoubtedly wrong. (Runloop.scala:283)

Possible fix

  • The easiest possible fix is to keep track of which partition streams exist and interrupting them eagerly when a revocation occurs.
  • This can be done by keeping a Map[TopicPartition, Promise[Throwable, Unit]] in the state, into which an entry is added every time a partition stream is emitted. The partition stream can be defined as today, with the addition of .interruptWhen(p)

Obviously the fix should include a test with a reproduction, etc. @narma has helpfully ported a test case from fs2-kafka which we can use with attribution: https://gist.github.com/narma/b63b5ec99d0b722f7658efd7111f09c0

@aartigao
Copy link
Contributor

aartigao commented Jun 2, 2021

Any updates on this? We've seen this behavior in our service too.

@aartigao
Copy link
Contributor

aartigao commented Jun 3, 2021

Anyway, this is critical for our company, I'm working on implementing Itamar's fix instructions. Maybe will need help with the testing part, but this can be discussed in the future PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants