From 5d8f0180ceb963373c11334970d6fff43e68fbed Mon Sep 17 00:00:00 2001 From: Jasper Woudenberg Date: Thu, 6 May 2021 15:43:18 +0200 Subject: [PATCH] Do not batch poll and rebalance in parallel If we allow a batch poll and rebalance event to run in parallel then there is no way to tell if a poll that starts before a rebalance and returns after the rebalance completes returns messages from the old assigned partition set or the new one. The hw-kafka-client user will then not know whether they should disregard the returned message set or process it. This makes it so the batch message polling runs in the same lock the consumer event callback does. That way we can be sure a poll for messages and a rebalance event will never be running in parallel. --- src/Kafka/Consumer.hs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/Kafka/Consumer.hs b/src/Kafka/Consumer.hs index 0b17fd0..fd56036 100644 --- a/src/Kafka/Consumer.hs +++ b/src/Kafka/Consumer.hs @@ -168,7 +168,7 @@ pollMessageBatch c@(KafkaConsumer _ (KafkaConf _ qr _)) (Timeout ms) (BatchSize mbq <- readIORef qr case mbq of Nothing -> return [Left $ KafkaBadSpecification "Calling pollMessageBatch while CallbackPollMode is set to CallbackPollModeSync."] - Just q -> rdKafkaConsumeBatchQueue q ms b >>= traverse fromMessagePtr + Just q -> whileNoCallbackRunning c $ rdKafkaConsumeBatchQueue q ms b >>= traverse fromMessagePtr -- | Commit message's offset on broker for the message's partition. commitOffsetMessage :: MonadIO m @@ -373,6 +373,11 @@ runConsumerLoop k timeout = CallbackPollEnabled -> go CallbackPollDisabled -> pure () +whileNoCallbackRunning :: KafkaConsumer -> IO a -> IO a +whileNoCallbackRunning k f = do + let statusVar = kcfgCallbackPollStatus (getKafkaConf k) + withMVar statusVar $ \_ -> f + withCallbackPollEnabled :: KafkaConsumer -> IO () -> IO CallbackPollStatus withCallbackPollEnabled k f = do let statusVar = kcfgCallbackPollStatus (getKafkaConf k)