Skip to content

Commit

Permalink
Do not batch poll and rebalance in parallel
Browse files Browse the repository at this point in the history
If we allow a batch poll and rebalance event to run in parallel then
there is no way to tell if a poll that starts before a rebalance and
returns after the rebalance completes returns messages from the old
assigned partition set or the new one. The hw-kafka-client user will
then not know whether they should disregard the returned message set or
process it.

This makes it so the batch message polling runs in the same lock the
consumer event callback does. That way we can be sure a poll for
messages and a rebalance event will never be running in parallel.
  • Loading branch information
jwoudenberg authored and michaelglass committed May 18, 2021
1 parent 73f9573 commit e00886e
Showing 1 changed file with 6 additions and 1 deletion.
7 changes: 6 additions & 1 deletion src/Kafka/Consumer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ pollMessageBatch c@(KafkaConsumer _ (KafkaConf _ qr _)) (Timeout ms) (BatchSize
mbq <- readIORef qr
case mbq of
Nothing -> return [Left $ KafkaBadSpecification "Calling pollMessageBatch while CallbackPollMode is set to CallbackPollModeSync."]
Just q -> rdKafkaConsumeBatchQueue q ms b >>= traverse fromMessagePtr
Just q -> whileNoCallbackRunning c $ rdKafkaConsumeBatchQueue q ms b >>= traverse fromMessagePtr

-- | Commit message's offset on broker for the message's partition.
commitOffsetMessage :: MonadIO m
Expand Down Expand Up @@ -373,6 +373,11 @@ runConsumerLoop k timeout =
CallbackPollEnabled -> go
CallbackPollDisabled -> pure ()

whileNoCallbackRunning :: KafkaConsumer -> IO a -> IO a
whileNoCallbackRunning k f = do
let statusVar = kcfgCallbackPollStatus (getKafkaConf k)
withMVar statusVar $ \_ -> f

withCallbackPollEnabled :: KafkaConsumer -> IO () -> IO CallbackPollStatus
withCallbackPollEnabled k f = do
let statusVar = kcfgCallbackPollStatus (getKafkaConf k)
Expand Down

0 comments on commit e00886e

Please sign in to comment.