From f81bec402fe1fcde7fa4274aaf0f8c1a70c41d9d Mon Sep 17 00:00:00 2001 From: Felix Mulder Date: Sat, 18 Jan 2020 19:56:30 +0100 Subject: [PATCH] Implement support for callbacks on individual messages --- src/Kafka/Internal/RdKafka.chs | 5 ++- src/Kafka/Producer.hs | 75 ++++++++++++++++++++++--------- src/Kafka/Producer/Callbacks.hs | 25 ++++++++++- src/Kafka/Producer/Convert.hs | 7 +++ src/Kafka/Producer/Types.hs | 11 ++++- tests-it/Kafka/IntegrationSpec.hs | 20 +++++++++ 6 files changed, 118 insertions(+), 25 deletions(-) diff --git a/src/Kafka/Internal/RdKafka.chs b/src/Kafka/Internal/RdKafka.chs index d0c0aff..d1de137 100644 --- a/src/Kafka/Internal/RdKafka.chs +++ b/src/Kafka/Internal/RdKafka.chs @@ -147,6 +147,7 @@ data RdKafkaMessageT = RdKafkaMessageT , offset'RdKafkaMessageT :: Int64 , payload'RdKafkaMessageT :: Word8Ptr , key'RdKafkaMessageT :: Word8Ptr + , opaque'RdKafkaMessageT :: Ptr () } deriving (Show, Eq) @@ -162,6 +163,7 @@ instance Storable RdKafkaMessageT where <*> liftM fromIntegral ({#get rd_kafka_message_t->offset #} p) <*> liftM castPtr ({#get rd_kafka_message_t->payload #} p) <*> liftM castPtr ({#get rd_kafka_message_t->key #} p) + <*> liftM castPtr ({#get rd_kafka_message_t->_private #} p) poke p x = do {#set rd_kafka_message_t.err#} p (enumToCInt $ err'RdKafkaMessageT x) {#set rd_kafka_message_t.rkt#} p (castPtr $ topic'RdKafkaMessageT x) @@ -171,6 +173,7 @@ instance Storable RdKafkaMessageT where {#set rd_kafka_message_t.offset#} p (fromIntegral $ offset'RdKafkaMessageT x) {#set rd_kafka_message_t.payload#} p (castPtr $ payload'RdKafkaMessageT x) {#set rd_kafka_message_t.key#} p (castPtr $ key'RdKafkaMessageT x) + {#set rd_kafka_message_t._private#} p (castPtr $ opaque'RdKafkaMessageT x) {#pointer *rd_kafka_message_t as RdKafkaMessageTPtr foreign -> RdKafkaMessageT #} @@ -893,7 +896,7 @@ rdKafkaConsumeStop topicPtr partition = do {#fun rd_kafka_produce as ^ {`RdKafkaTopicTPtr', cIntConv `CInt32T', `Int', castPtr `Word8Ptr', - cIntConv `CSize', castPtr `Word8Ptr', cIntConv `CSize', castPtr `Word8Ptr'} + cIntConv `CSize', castPtr `Word8Ptr', cIntConv `CSize', castPtr `Ptr ()'} -> `Int' #} {#fun rd_kafka_produce_batch as ^ diff --git a/src/Kafka/Producer.hs b/src/Kafka/Producer.hs index 0207f42..2825122 100644 --- a/src/Kafka/Producer.hs +++ b/src/Kafka/Producer.hs @@ -1,9 +1,11 @@ -{-# LANGUAGE TupleSections #-} +{-# LANGUAGE TupleSections #-} +{-# LANGUAGE LambdaCase #-} module Kafka.Producer ( module X , runProducer , newProducer , produceMessage, produceMessageBatch +, produceMessage' , flushProducer , closeProducer , KafkaProducer @@ -25,11 +27,12 @@ import Foreign.ForeignPtr (newForeignPtr_, withForeignPtr) import Foreign.Marshal.Array (withArrayLen) import Foreign.Ptr (Ptr, nullPtr, plusPtr) import Foreign.Storable (Storable (..)) +import Foreign.StablePtr (newStablePtr, castStablePtrToPtr) import Kafka.Internal.RdKafka (RdKafkaMessageT (..), RdKafkaRespErrT (..), RdKafkaTypeT (..), destroyUnmanagedRdKafkaTopic, newRdKafkaT, newUnmanagedRdKafkaTopicT, rdKafkaOutqLen, rdKafkaProduce, rdKafkaProduceBatch, rdKafkaSetLogLevel) import Kafka.Internal.Setup (Kafka (..), KafkaConf (..), KafkaProps (..), TopicConf (..), TopicProps (..), kafkaConf, topicConf) import Kafka.Internal.Shared (pollEvents) -import Kafka.Producer.Convert (copyMsgFlags, handleProduceErr, producePartitionCInt, producePartitionInt) -import Kafka.Producer.Types (KafkaProducer (..)) +import Kafka.Producer.Convert (copyMsgFlags, handleProduceErr', producePartitionCInt, producePartitionInt) +import Kafka.Producer.Types (KafkaProducer (..), ImmediateError(..)) import Kafka.Producer.ProducerProperties as X import Kafka.Producer.Types as X hiding (KafkaProducer) @@ -60,6 +63,9 @@ newProducer pps = liftIO $ do kc@(KafkaConf kc' _ _) <- kafkaConf (KafkaProps $ (ppKafkaProps pps)) tc <- topicConf (TopicProps $ (ppTopicProps pps)) + -- add default delivery report callback + deliveryCallback (const mempty) kc + -- set callbacks forM_ (ppCallbacks pps) (\setCb -> setCb kc) @@ -78,23 +84,51 @@ produceMessage :: MonadIO m => KafkaProducer -> ProducerRecord -> m (Maybe KafkaError) -produceMessage kp@(KafkaProducer (Kafka k) _ (TopicConf tc)) m = liftIO $ do - pollEvents kp (Just $ Timeout 0) -- fire callbacks if any exist (handle delivery reports) - bracket (mkTopic $ prTopic m) clTopic withTopic - where - mkTopic (TopicName tn) = newUnmanagedRdKafkaTopicT k (Text.unpack tn) (Just tc) - - clTopic = either (return . const ()) destroyUnmanagedRdKafkaTopic - - withTopic (Left err) = return . Just . KafkaError $ Text.pack err - withTopic (Right t) = - withBS (prValue m) $ \payloadPtr payloadLength -> - withBS (prKey m) $ \keyPtr keyLength -> - handleProduceErr =<< - rdKafkaProduce t (producePartitionCInt (prPartition m)) - copyMsgFlags payloadPtr (fromIntegral payloadLength) - keyPtr (fromIntegral keyLength) nullPtr - +produceMessage kp m = produceMessage' kp m (pure . mempty) >>= adjustRes + where + adjustRes = \case + Right () -> pure Nothing + Left (ImmediateError err) -> pure (Just err) + +-- | Sends a single message with a registered callback. +-- +-- The callback can be a long running process, as it is forked by the thread +-- that handles the delivery reports. +-- +produceMessage' :: MonadIO m + => KafkaProducer + -> ProducerRecord + -> (DeliveryReport -> IO ()) + -> m (Either ImmediateError ()) +produceMessage' kp@(KafkaProducer (Kafka k) _ (TopicConf tc)) msg cb = liftIO $ + fireCallbacks >> bracket (mkTopic . prTopic $ msg) closeTopic withTopic + where + fireCallbacks = + pollEvents kp . Just . Timeout $ 0 + + mkTopic (TopicName tn) = + newUnmanagedRdKafkaTopicT k (Text.unpack tn) (Just tc) + + closeTopic = either mempty destroyUnmanagedRdKafkaTopic + + withTopic (Left err) = return . Left . ImmediateError . KafkaError . Text.pack $ err + withTopic (Right topic) = + withBS (prValue msg) $ \payloadPtr payloadLength -> + withBS (prKey msg) $ \keyPtr keyLength -> do + callbackPtr <- newStablePtr cb + res <- handleProduceErr' =<< rdKafkaProduce + topic + (producePartitionCInt (prPartition msg)) + copyMsgFlags + payloadPtr + (fromIntegral payloadLength) + keyPtr + (fromIntegral keyLength) + (castStablePtrToPtr callbackPtr) + + pure $ case res of + Left err -> Left . ImmediateError $ err + Right () -> Right () -- | Sends a batch of messages. -- Returns a list of messages which it was unable to send with corresponding errors. @@ -146,6 +180,7 @@ produceMessageBatch kp@(KafkaProducer (Kafka k) _ (TopicConf tc)) messages = lif , offset'RdKafkaMessageT = 0 , keyLen'RdKafkaMessageT = keyLength , key'RdKafkaMessageT = keyPtr + , opaque'RdKafkaMessageT = nullPtr } -- | Closes the producer. diff --git a/src/Kafka/Producer/Callbacks.hs b/src/Kafka/Producer/Callbacks.hs index bfd43aa..4ef3b5f 100644 --- a/src/Kafka/Producer/Callbacks.hs +++ b/src/Kafka/Producer/Callbacks.hs @@ -1,12 +1,16 @@ +{-# LANGUAGE TypeApplications #-} module Kafka.Producer.Callbacks ( deliveryCallback , module X ) where +import Control.Monad (void) +import Control.Concurrent (forkIO) import Foreign.C.Error (getErrno) import Foreign.Ptr (Ptr, nullPtr) import Foreign.Storable (Storable(peek)) +import Foreign.StablePtr (castPtrToStablePtr, deRefStablePtr) import Kafka.Callbacks as X import Kafka.Consumer.Types (Offset(..)) import Kafka.Internal.RdKafka (RdKafkaMessageT(..), RdKafkaRespErrT(..), rdKafkaConfSetDrMsgCb) @@ -16,6 +20,12 @@ import Kafka.Producer.Types (ProducerRecord(..), DeliveryReport(..), import Kafka.Types (KafkaError(..), TopicName(..)) -- | Sets the callback for delivery reports. +-- +-- /Note: A callback should not be a long-running process as it blocks +-- librdkafka from continuing on the thread that handles the delivery +-- callbacks. For callbacks to individual messsages see +-- 'Kafka.Producer.produceMessage\''./ +-- deliveryCallback :: (DeliveryReport -> IO ()) -> KafkaConf -> IO () deliveryCallback callback kc = rdKafkaConfSetDrMsgCb (getRdKafkaConf kc) realCb where @@ -25,9 +35,20 @@ deliveryCallback callback kc = rdKafkaConfSetDrMsgCb (getRdKafkaConf kc) realCb then getErrno >>= (callback . NoMessageError . kafkaRespErr) else do s <- peek mptr + let cbPtr = opaque'RdKafkaMessageT s if err'RdKafkaMessageT s /= RdKafkaRespErrNoError - then mkErrorReport s >>= callback - else mkSuccessReport s >>= callback + then mkErrorReport s >>= callbacks cbPtr + else mkSuccessReport s >>= callbacks cbPtr + + callbacks cbPtr rep = do + callback rep + if cbPtr == nullPtr then + pure () + else do + msgCb <- deRefStablePtr @(DeliveryReport -> IO ()) $ castPtrToStablePtr $ cbPtr + -- Here we fork the callback since it might be a longer action and + -- blocking here would block librdkafka from continuing its execution + void . forkIO $ msgCb rep mkErrorReport :: RdKafkaMessageT -> IO DeliveryReport mkErrorReport msg = do diff --git a/src/Kafka/Producer/Convert.hs b/src/Kafka/Producer/Convert.hs index 6d36223..5e46521 100644 --- a/src/Kafka/Producer/Convert.hs +++ b/src/Kafka/Producer/Convert.hs @@ -3,6 +3,7 @@ module Kafka.Producer.Convert , producePartitionInt , producePartitionCInt , handleProduceErr +, handleProduceErr' ) where @@ -31,3 +32,9 @@ handleProduceErr (- 1) = (Just . kafkaRespErr) <$> getErrno handleProduceErr 0 = return Nothing handleProduceErr _ = return $ Just KafkaInvalidReturnValue {-# INLINE handleProduceErr #-} + +handleProduceErr' :: Int -> IO (Either KafkaError ()) +handleProduceErr' (- 1) = (Left . kafkaRespErr) <$> getErrno +handleProduceErr' 0 = return (Right ()) +handleProduceErr' _ = return $ Left KafkaInvalidReturnValue +{-# INLINE handleProduceErr' #-} diff --git a/src/Kafka/Producer/Types.hs b/src/Kafka/Producer/Types.hs index 4a6a0bd..8fc1cb2 100644 --- a/src/Kafka/Producer/Types.hs +++ b/src/Kafka/Producer/Types.hs @@ -1,10 +1,13 @@ -{-# LANGUAGE DeriveDataTypeable #-} -{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE DeriveDataTypeable #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} module Kafka.Producer.Types ( KafkaProducer(..) , ProducerRecord(..) , ProducePartition(..) , DeliveryReport(..) +, ImmediateError(..) ) where @@ -47,6 +50,10 @@ data ProducePartition = | UnassignedPartition deriving (Show, Eq, Ord, Typeable, Generic) +-- | Data type representing an error that is caused by pre-flight conditions not being met +newtype ImmediateError = ImmediateError KafkaError + deriving newtype (Eq, Show) + data DeliveryReport = DeliverySuccess ProducerRecord Offset | DeliveryFailure ProducerRecord KafkaError diff --git a/tests-it/Kafka/IntegrationSpec.hs b/tests-it/Kafka/IntegrationSpec.hs index c7dc5f2..80097d0 100644 --- a/tests-it/Kafka/IntegrationSpec.hs +++ b/tests-it/Kafka/IntegrationSpec.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE LambdaCase #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} @@ -6,6 +7,7 @@ where import Control.Monad (forM, forM_) import Control.Monad.Loops +import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar) import qualified Data.ByteString as BS import Data.Either import Data.Map (fromList) @@ -113,6 +115,24 @@ spec = do res <- sendMessages (testMessages testTopic) prod res `shouldBe` Right () + it "sends messages with callback to test topic" $ \prod -> do + var <- newEmptyMVar + let + msg = ProducerRecord + { prTopic = TopicName "callback-topic" + , prPartition = UnassignedPartition + , prKey = Nothing + , prValue = Just "test from producer" + } + + res <- produceMessage' prod msg (putMVar var) + res `shouldBe` Right () + callbackRes <- flushProducer prod *> takeMVar var + callbackRes `shouldSatisfy` \case + DeliverySuccess _ _ -> True + DeliveryFailure _ _ -> False + NoMessageError _ -> False + specWithConsumer "Run consumer with async polling" (consumerProps <> groupId (makeGroupId "async")) runConsumerSpec specWithConsumer "Run consumer with sync polling" (consumerProps <> groupId (makeGroupId "sync") <> callbackPollMode CallbackPollModeSync) runConsumerSpec