Skip to content

Commit

Permalink
Do not batch poll and rebalance in parallel
Browse files Browse the repository at this point in the history
If we allow a batch poll and rebalance event to run in parallel then
there is no way to tell if a poll that starts before a rebalance and
returns after the rebalance completes returns messages from the old
assigned partition set or the new one. The hw-kafka-client user will
then not know whether they should disregard the returned message set or
process it.

This makes it so the batch message polling runs in the same lock the
consumer event callback does. That way we can be sure a poll for
messages and a rebalance event will never be running in parallel.
  • Loading branch information
jwoudenberg committed May 6, 2021
1 parent f9760fc commit 5d8f018
Showing 1 changed file with 6 additions and 1 deletion.
7 changes: 6 additions & 1 deletion src/Kafka/Consumer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ pollMessageBatch c@(KafkaConsumer _ (KafkaConf _ qr _)) (Timeout ms) (BatchSize
mbq <- readIORef qr
case mbq of
Nothing -> return [Left $ KafkaBadSpecification "Calling pollMessageBatch while CallbackPollMode is set to CallbackPollModeSync."]
Just q -> rdKafkaConsumeBatchQueue q ms b >>= traverse fromMessagePtr
Just q -> whileNoCallbackRunning c $ rdKafkaConsumeBatchQueue q ms b >>= traverse fromMessagePtr

-- | Commit message's offset on broker for the message's partition.
commitOffsetMessage :: MonadIO m
Expand Down Expand Up @@ -373,6 +373,11 @@ runConsumerLoop k timeout =
CallbackPollEnabled -> go
CallbackPollDisabled -> pure ()

whileNoCallbackRunning :: KafkaConsumer -> IO a -> IO a
whileNoCallbackRunning k f = do
let statusVar = kcfgCallbackPollStatus (getKafkaConf k)
withMVar statusVar $ \_ -> f

withCallbackPollEnabled :: KafkaConsumer -> IO () -> IO CallbackPollStatus
withCallbackPollEnabled k f = do
let statusVar = kcfgCallbackPollStatus (getKafkaConf k)
Expand Down

0 comments on commit 5d8f018

Please sign in to comment.