diff --git a/src/Kafka/Consumer.hs b/src/Kafka/Consumer.hs index 0b17fd0..fd56036 100644 --- a/src/Kafka/Consumer.hs +++ b/src/Kafka/Consumer.hs @@ -168,7 +168,7 @@ pollMessageBatch c@(KafkaConsumer _ (KafkaConf _ qr _)) (Timeout ms) (BatchSize mbq <- readIORef qr case mbq of Nothing -> return [Left $ KafkaBadSpecification "Calling pollMessageBatch while CallbackPollMode is set to CallbackPollModeSync."] - Just q -> rdKafkaConsumeBatchQueue q ms b >>= traverse fromMessagePtr + Just q -> whileNoCallbackRunning c $ rdKafkaConsumeBatchQueue q ms b >>= traverse fromMessagePtr -- | Commit message's offset on broker for the message's partition. commitOffsetMessage :: MonadIO m @@ -373,6 +373,11 @@ runConsumerLoop k timeout = CallbackPollEnabled -> go CallbackPollDisabled -> pure () +whileNoCallbackRunning :: KafkaConsumer -> IO a -> IO a +whileNoCallbackRunning k f = do + let statusVar = kcfgCallbackPollStatus (getKafkaConf k) + withMVar statusVar $ \_ -> f + withCallbackPollEnabled :: KafkaConsumer -> IO () -> IO CallbackPollStatus withCallbackPollEnabled k f = do let statusVar = kcfgCallbackPollStatus (getKafkaConf k)