From e00886e1fd794458b5c8ac5222680bf7c36c5568 Mon Sep 17 00:00:00 2001 From: Jasper Woudenberg Date: Thu, 6 May 2021 15:43:18 +0200 Subject: [PATCH] Do not batch poll and rebalance in parallel If we allow a batch poll and rebalance event to run in parallel then there is no way to tell if a poll that starts before a rebalance and returns after the rebalance completes returns messages from the old assigned partition set or the new one. The hw-kafka-client user will then not know whether they should disregard the returned message set or process it. This makes it so the batch message polling runs in the same lock the consumer event callback does. That way we can be sure a poll for messages and a rebalance event will never be running in parallel. --- src/Kafka/Consumer.hs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/Kafka/Consumer.hs b/src/Kafka/Consumer.hs index 8aa04fc..ee1adf2 100644 --- a/src/Kafka/Consumer.hs +++ b/src/Kafka/Consumer.hs @@ -168,7 +168,7 @@ pollMessageBatch c@(KafkaConsumer _ (KafkaConf _ qr _)) (Timeout ms) (BatchSize mbq <- readIORef qr case mbq of Nothing -> return [Left $ KafkaBadSpecification "Calling pollMessageBatch while CallbackPollMode is set to CallbackPollModeSync."] - Just q -> rdKafkaConsumeBatchQueue q ms b >>= traverse fromMessagePtr + Just q -> whileNoCallbackRunning c $ rdKafkaConsumeBatchQueue q ms b >>= traverse fromMessagePtr -- | Commit message's offset on broker for the message's partition. commitOffsetMessage :: MonadIO m @@ -373,6 +373,11 @@ runConsumerLoop k timeout = CallbackPollEnabled -> go CallbackPollDisabled -> pure () +whileNoCallbackRunning :: KafkaConsumer -> IO a -> IO a +whileNoCallbackRunning k f = do + let statusVar = kcfgCallbackPollStatus (getKafkaConf k) + withMVar statusVar $ \_ -> f + withCallbackPollEnabled :: KafkaConsumer -> IO () -> IO CallbackPollStatus withCallbackPollEnabled k f = do let statusVar = kcfgCallbackPollStatus (getKafkaConf k)