Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PartitionedStream hanging after stream consumption started #1107

Closed
flavienbert opened this issue Nov 15, 2023 · 10 comments · Fixed by #1109
Closed

PartitionedStream hanging after stream consumption started #1107

flavienbert opened this issue Nov 15, 2023 · 10 comments · Fixed by #1109

Comments

@flavienbert
Copy link
Contributor

flavienbert commented Nov 15, 2023

Hi, I notice a bug in v2.6.0 that is not in v2.3.4.

I am using a Consumer.partitionedStream and for each partition I need to get the committed and endOffset with the Consumer.endOffsets / Consumer.committed method:

Consumer.partitionedStream(???).mapZIOParUnordered(12)(
        (e: (TopicPartition, stream.Stream[Throwable, CommittableRecord[Bytes, Either[UnknownEvent, Event]]])) => {
          val (topicPartition, streamIn) = e
          Consumer.endOffsets(Set(topicPartition)).flatMap {
            ZIO.debug("some more exec here") *> streamIn.runDrain
          }
        }
      )

I have 12 partitions so I use mapZIOParUnordered with a parallelism of 12 to process all my partitions.

I notice that as soon as the first stream starts for one partition, then the Consumer.endOffsets and Consumer.committed are really long for others partitions and too long. It looks like the polling of the consumer blocks other threads to achieve Consumer.endOffsets and Consumer.committed tasks. The result after few minutes is that all the partitions are not assigned anymore forever but the process is not kill.

@erikvanoosten
Copy link
Collaborator

Tricky stuff. Since 2.4 zio-kafka polls the consumer continuously. This gives a huge performance improvement, but it also means that the Consumer.endOffsets method needs to wait for a long time before it gets a chance to get the shared underlying java consumer.

I am not really sure what we can do about this.

@svroonland
Copy link
Collaborator

There is continuous polling yes, but the Semaphore used to guarantee exclusive access to the underlying apache kafka consumer is requested for every poll. So other calls like endOffsets should have a fair chance.

I can't think of a reason why that would not be the case, we'd have to try to reproduce the issue.

@erikvanoosten
Copy link
Collaborator

Fair semaphores are not very performant so most implementations are not fair. I could not find out if ZIO's semaphore is fair.

@svroonland
Copy link
Collaborator

Fair enough (pun intended) 😛

@svroonland
Copy link
Collaborator

As a workaround, perhaps something like this works to first get committed and end offsets before starting a partition stream:

              Consumer
                   .partitionedStream(???)
                   .mapZIO { case (tp, streamIn) =>
                     (ZIO.succeed(tp) zip ZIO.succeed(streamIn) zip Consumer.committed(Set(tp)) zip Consumer.endOffsets(
                       Set(tp)
                     ))
                   }
                   .flatMapPar(nrPartitions) { case (tp, streamIn, committed, endOffsets) =>
                     streamIn
                       .debug // etc
                   }

@flavienbert
Copy link
Contributor Author

The issue can occurs with partitions rebalance. This mean the polling is still active, and I need to recompute my starting state with endOffsets/committedOffset for the new assigned partition. I think your workaround could work at first start, but not during rebalance.

To complete my issue, then after few minutes (5-10 min) The consumer end up with this log:

Member mozzart-orchestrator-prod-6ccc965954-49572-dd587bfc-85ab-4035-8447-0983e2abdcd1 sending LeaveGroup request to coordinator b-1.platformeventsmsk.ipp9h3.c8.kafka.eu-west-1.amazonaws.com:9094 (id: 2147483646 rack: null) due to the consumer unsubscribed from all topics

I am running my program on kube. At this point, the consumer group doesn't have any assigned partition on the kafka cluster and the process is not finish so kube doesn't restart the pod.. The consumer seems to hang forever.

@flavienbert
Copy link
Contributor Author

flavienbert commented Nov 16, 2023

@erikvanoosten @erikvanoosten, What do you think to send a new Control RunloopCommand to disabled the polling when we use a consumer method that require the java consumer. And enable again the polling once it's done? Or the second way easier could be to edit the RunLoop State and add a params in the state to disable the polling.
Maybe it could trigger the maxPollingInterval exception?

@erikvanoosten
Copy link
Collaborator

Just got confirmation that ZIO's semaphore is not fair.

@flavienbert IMHO it will be easier (and more predictable) to make the semaphore that protects the consumer fair, that is give the users access in the order that they arrived.

@flavienbert
Copy link
Contributor Author

Agree with you. Are you handling it? Or do you want me to take care of it?

@erikvanoosten
Copy link
Collaborator

Agree with you. Are you handling it? Or do you want me to take care of it?

Please, go ahead! 🙏

erikvanoosten pushed a commit that referenced this issue Nov 17, 2023
Replace the semaphore that is used to control access to the java consumer with a reentrant lock that has fairness enabled. This prevents liveness problems for consumers that need to use the consumer, e.g. to fetch committed offsets. Fixes #1107.
@erikvanoosten erikvanoosten changed the title PartitionnedStream hanging after stream consumption started PartitionedStream hanging after stream consumption started Nov 19, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants