From a2793e4d9f52e90d09354af7201fb5874e9ad5e8 Mon Sep 17 00:00:00 2001 From: Philipp Hausmann Date: Wed, 18 Nov 2020 13:44:28 +0100 Subject: [PATCH] Wrap callbacks in newtype to make wrong usage harder --- src/Kafka/Callbacks.hs | 21 +++++++++++---------- src/Kafka/Consumer.hs | 4 ++-- src/Kafka/Consumer/Callbacks.hs | 16 +++++++++------- src/Kafka/Consumer/ConsumerProperties.hs | 6 +++--- src/Kafka/Internal/Setup.hs | 6 ++++++ src/Kafka/Producer.hs | 4 ++-- src/Kafka/Producer/ProducerProperties.hs | 6 +++--- 7 files changed, 36 insertions(+), 27 deletions(-) diff --git a/src/Kafka/Callbacks.hs b/src/Kafka/Callbacks.hs index 83649c0..d3bebb7 100644 --- a/src/Kafka/Callbacks.hs +++ b/src/Kafka/Callbacks.hs @@ -2,12 +2,13 @@ module Kafka.Callbacks ( errorCallback , logCallback , statsCallback +, Callback ) where import Data.ByteString (ByteString) import Kafka.Internal.RdKafka (rdKafkaConfSetErrorCb, rdKafkaConfSetLogCb, rdKafkaConfSetStatsCb) -import Kafka.Internal.Setup (HasKafkaConf(..), getRdKafkaConf) +import Kafka.Internal.Setup (HasKafkaConf(..), getRdKafkaConf, Callback(..)) import Kafka.Types (KafkaError(..), KafkaLogLevel(..)) -- | Add a callback for errors. @@ -20,10 +21,10 @@ import Kafka.Types (KafkaError(..), KafkaLogLevel(..)) -- > -- > myErrorCallback :: 'KafkaError' -> String -> IO () -- > myErrorCallback kafkaError message = print $ show kafkaError <> "|" <> message -errorCallback :: HasKafkaConf k => (KafkaError -> String -> IO ()) -> k -> IO () -errorCallback callback k = +errorCallback :: (KafkaError -> String -> IO ()) -> Callback +errorCallback callback = let realCb _ err = callback (KafkaResponseError err) - in rdKafkaConfSetErrorCb (getRdKafkaConf k) realCb + in Callback $ \k -> rdKafkaConfSetErrorCb (getRdKafkaConf k) realCb -- | Add a callback for logs. -- @@ -35,10 +36,10 @@ errorCallback callback k = -- > -- > myLogCallback :: 'KafkaLogLevel' -> String -> String -> IO () -- > myLogCallback level facility message = print $ show level <> "|" <> facility <> "|" <> message -logCallback :: HasKafkaConf k => (KafkaLogLevel -> String -> String -> IO ()) -> k -> IO () -logCallback callback k = +logCallback :: (KafkaLogLevel -> String -> String -> IO ()) -> Callback +logCallback callback = let realCb _ = callback . toEnum - in rdKafkaConfSetLogCb (getRdKafkaConf k) realCb + in Callback $ \k -> rdKafkaConfSetLogCb (getRdKafkaConf k) realCb -- | Add a callback for stats. The passed ByteString contains an UTF-8 encoded JSON document and can e.g. be parsed using Data.Aeson.decodeStrict. For more information about the content of the JSON document see . -- @@ -50,7 +51,7 @@ logCallback callback k = -- > -- > myStatsCallback :: String -> IO () -- > myStatsCallback stats = print $ show stats -statsCallback :: HasKafkaConf k => (ByteString -> IO ()) -> k -> IO () -statsCallback callback k = +statsCallback :: (ByteString -> IO ()) -> Callback +statsCallback callback = let realCb _ = callback - in rdKafkaConfSetStatsCb (getRdKafkaConf k) realCb + in Callback $ \k -> rdKafkaConfSetStatsCb (getRdKafkaConf k) realCb diff --git a/src/Kafka/Consumer.hs b/src/Kafka/Consumer.hs index 41cbe66..477d25e 100644 --- a/src/Kafka/Consumer.hs +++ b/src/Kafka/Consumer.hs @@ -85,7 +85,7 @@ import Foreign hiding (void) import Kafka.Consumer.Convert (fromMessagePtr, fromNativeTopicPartitionList'', offsetCommitToBool, offsetToInt64, toMap, toNativeTopicPartitionList, toNativeTopicPartitionList', toNativeTopicPartitionListNoDispose, topicPartitionFromMessageForCommit) import Kafka.Consumer.Types (KafkaConsumer (..)) import Kafka.Internal.RdKafka (RdKafkaRespErrT (..), RdKafkaTopicPartitionListTPtr, RdKafkaTypeT (..), newRdKafkaT, newRdKafkaTopicPartitionListT, newRdKafkaTopicT, rdKafkaAssign, rdKafkaAssignment, rdKafkaCommit, rdKafkaCommitted, rdKafkaConfSetDefaultTopicConf, rdKafkaConsumeBatchQueue, rdKafkaConsumeQueue, rdKafkaConsumerClose, rdKafkaConsumerPoll, rdKafkaOffsetsStore, rdKafkaPausePartitions, rdKafkaPollSetConsumer, rdKafkaPosition, rdKafkaQueueDestroy, rdKafkaQueueNew, rdKafkaResumePartitions, rdKafkaSeek, rdKafkaSetLogLevel, rdKafkaSubscribe, rdKafkaSubscription, rdKafkaTopicConfDup, rdKafkaTopicPartitionListAdd) -import Kafka.Internal.Setup (CallbackPollStatus (..), Kafka (..), KafkaConf (..), KafkaProps (..), TopicConf (..), TopicProps (..), getKafkaConf, getRdKafka, kafkaConf, topicConf) +import Kafka.Internal.Setup (CallbackPollStatus (..), Kafka (..), KafkaConf (..), KafkaProps (..), TopicConf (..), TopicProps (..), getKafkaConf, getRdKafka, kafkaConf, topicConf, Callback(..)) import Kafka.Internal.Shared (kafkaErrorToMaybe, maybeToLeft, rdKafkaErrorToEither) import Kafka.Consumer.ConsumerProperties as X @@ -327,7 +327,7 @@ closeConsumer (KafkaConsumer (Kafka k) (KafkaConf _ qr statusVar)) = liftIO $ newConsumerConf :: ConsumerProperties -> IO KafkaConf newConsumerConf ConsumerProperties {cpProps = m, cpCallbacks = cbs} = do conf <- kafkaConf (KafkaProps m) - forM_ cbs (\setCb -> setCb conf) + forM_ cbs (\(Callback setCb) -> setCb conf) return conf -- | Subscribes to a given list of topics. diff --git a/src/Kafka/Consumer/Callbacks.hs b/src/Kafka/Consumer/Callbacks.hs index 63aea6e..347dd76 100644 --- a/src/Kafka/Consumer/Callbacks.hs +++ b/src/Kafka/Consumer/Callbacks.hs @@ -14,16 +14,17 @@ import Kafka.Callbacks as X import Kafka.Consumer.Convert (fromNativeTopicPartitionList', fromNativeTopicPartitionList'') import Kafka.Consumer.Types (KafkaConsumer (..), RebalanceEvent (..), TopicPartition (..)) import Kafka.Internal.RdKafka -import Kafka.Internal.Setup (HasKafka (..), HasKafkaConf (..), Kafka (..), KafkaConf (..), getRdMsgQueue) +import Kafka.Internal.Setup (HasKafka (..), HasKafkaConf (..), Kafka (..), KafkaConf (..), getRdMsgQueue, Callback (..)) import Kafka.Types (KafkaError (..), PartitionId (..), TopicName (..)) import qualified Data.Text as Text -- | Sets a callback that is called when rebalance is needed. -rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> KafkaConf -> IO () -rebalanceCallback callback kc@(KafkaConf conf _ _) = rdKafkaConfSetRebalanceCb conf realCb +rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> Callback +rebalanceCallback callback = + Callback $ \kc@(KafkaConf con _ _) -> rdKafkaConfSetRebalanceCb con (realCb kc) where - realCb k err pl = do + realCb kc k err pl = do k' <- newForeignPtr_ k pls <- newForeignPtr_ pl setRebalanceCallback callback (KafkaConsumer (Kafka k') kc) (KafkaResponseError err) pls @@ -36,10 +37,11 @@ rebalanceCallback callback kc@(KafkaConf conf _ _) = rdKafkaConfSetRebalanceCb c -- If no partitions had valid offsets to commit this callback will be called -- with 'KafkaResponseError' 'RdKafkaRespErrNoOffset' which is not to be considered -- an error. -offsetCommitCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> KafkaConf -> IO () -offsetCommitCallback callback kc@(KafkaConf conf _ _) = rdKafkaConfSetOffsetCommitCb conf realCb +offsetCommitCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> Callback +offsetCommitCallback callback = + Callback $ \kc@(KafkaConf conf _ _) -> rdKafkaConfSetOffsetCommitCb conf (realCb kc) where - realCb k err pl = do + realCb kc k err pl = do k' <- newForeignPtr_ k pls <- fromNativeTopicPartitionList' pl callback (KafkaConsumer (Kafka k') kc) (KafkaResponseError err) pls diff --git a/src/Kafka/Consumer/ConsumerProperties.hs b/src/Kafka/Consumer/ConsumerProperties.hs index ca22ad7..c0093bf 100644 --- a/src/Kafka/Consumer/ConsumerProperties.hs +++ b/src/Kafka/Consumer/ConsumerProperties.hs @@ -34,7 +34,7 @@ import Data.Semigroup as Sem import Data.Text (Text) import qualified Data.Text as Text import Kafka.Consumer.Types (ConsumerGroupId (..)) -import Kafka.Internal.Setup (KafkaConf (..)) +import Kafka.Internal.Setup (KafkaConf (..), Callback(..)) import Kafka.Types (BrokerAddress (..), ClientId (..), KafkaCompressionCodec (..), KafkaDebug (..), KafkaLogLevel (..), Millis (..), kafkaCompressionCodecToText, kafkaDebugToText) import Kafka.Consumer.Callbacks as X @@ -53,7 +53,7 @@ data CallbackPollMode = data ConsumerProperties = ConsumerProperties { cpProps :: Map Text Text , cpLogLevel :: Maybe KafkaLogLevel - , cpCallbacks :: [KafkaConf -> IO ()] + , cpCallbacks :: [Callback] , cpCallbackPollMode :: CallbackPollMode } @@ -117,7 +117,7 @@ clientId (ClientId cid) = -- * 'errorCallback' -- * 'logCallback' -- * 'statsCallback' -setCallback :: (KafkaConf -> IO ()) -> ConsumerProperties +setCallback :: Callback -> ConsumerProperties setCallback cb = mempty { cpCallbacks = [cb] } -- | Set the logging level. diff --git a/src/Kafka/Internal/Setup.hs b/src/Kafka/Internal/Setup.hs index 905b160..0afcc28 100644 --- a/src/Kafka/Internal/Setup.hs +++ b/src/Kafka/Internal/Setup.hs @@ -7,6 +7,7 @@ module Kafka.Internal.Setup , HasKafka(..) , HasKafkaConf(..) , HasTopicConf(..) +, Callback(..) , CallbackPollStatus(..) , getRdKafka , getRdKafkaConf @@ -46,6 +47,11 @@ newtype TopicProps = TopicProps (Map Text Text) deriving (Show, Eq) newtype Kafka = Kafka RdKafkaTPtr deriving Show newtype TopicConf = TopicConf RdKafkaTopicConfTPtr deriving Show +-- | Callbacks allow retrieving various information like error occurences, statistics +-- and log messages. +-- See `Kafka.Consumer.setCallback` (Consumer) and `Kafka.Producer.setCallback` (Producer) for more details. +newtype Callback = Callback (KafkaConf -> IO ()) + data CallbackPollStatus = CallbackPollEnabled | CallbackPollDisabled deriving (Show, Eq) data KafkaConf = KafkaConf diff --git a/src/Kafka/Producer.hs b/src/Kafka/Producer.hs index 63f878d..11973b6 100644 --- a/src/Kafka/Producer.hs +++ b/src/Kafka/Producer.hs @@ -82,7 +82,7 @@ import Foreign.Ptr (Ptr, nullPtr, plusPtr) import Foreign.Storable (Storable (..)) import Foreign.StablePtr (newStablePtr, castStablePtrToPtr) import Kafka.Internal.RdKafka (RdKafkaMessageT (..), RdKafkaRespErrT (..), RdKafkaTypeT (..), destroyUnmanagedRdKafkaTopic, newRdKafkaT, newUnmanagedRdKafkaTopicT, rdKafkaOutqLen, rdKafkaProduce, rdKafkaProduceBatch, rdKafkaSetLogLevel) -import Kafka.Internal.Setup (Kafka (..), KafkaConf (..), KafkaProps (..), TopicConf (..), TopicProps (..), kafkaConf, topicConf) +import Kafka.Internal.Setup (Kafka (..), KafkaConf (..), KafkaProps (..), TopicConf (..), TopicProps (..), kafkaConf, topicConf, Callback(..)) import Kafka.Internal.Shared (pollEvents) import Kafka.Producer.Convert (copyMsgFlags, handleProduceErr', producePartitionCInt, producePartitionInt) import Kafka.Producer.Types (KafkaProducer (..), ImmediateError(..)) @@ -120,7 +120,7 @@ newProducer pps = liftIO $ do deliveryCallback (const mempty) kc -- set callbacks - forM_ (ppCallbacks pps) (\setCb -> setCb kc) + forM_ (ppCallbacks pps) (\(Callback setCb) -> setCb kc) mbKafka <- newRdKafkaT RdKafkaProducer kc' case mbKafka of diff --git a/src/Kafka/Producer/ProducerProperties.hs b/src/Kafka/Producer/ProducerProperties.hs index fdb82eb..6a41c31 100644 --- a/src/Kafka/Producer/ProducerProperties.hs +++ b/src/Kafka/Producer/ProducerProperties.hs @@ -27,7 +27,7 @@ import Control.Monad (MonadPlus(mplus)) import Data.Map (Map) import qualified Data.Map as M import Data.Semigroup as Sem -import Kafka.Internal.Setup (KafkaConf(..)) +import Kafka.Internal.Setup (KafkaConf(..), Callback(..)) import Kafka.Types (KafkaDebug(..), Timeout(..), KafkaCompressionCodec(..), KafkaLogLevel(..), BrokerAddress(..), kafkaDebugToText, kafkaCompressionCodecToText, Millis(..)) import Kafka.Producer.Callbacks @@ -37,7 +37,7 @@ data ProducerProperties = ProducerProperties { ppKafkaProps :: Map Text Text , ppTopicProps :: Map Text Text , ppLogLevel :: Maybe KafkaLogLevel - , ppCallbacks :: [KafkaConf -> IO ()] + , ppCallbacks :: [Callback] } instance Sem.Semigroup ProducerProperties where @@ -70,7 +70,7 @@ brokersList bs = -- * 'errorCallback' -- * 'logCallback' -- * 'statsCallback' -setCallback :: (KafkaConf -> IO ()) -> ProducerProperties +setCallback :: Callback -> ProducerProperties setCallback cb = mempty { ppCallbacks = [cb] } -- | Sets the logging level.