Skip to content

Commit

Permalink
Wrap callbacks in newtype to make wrong usage harder
Browse files Browse the repository at this point in the history
  • Loading branch information
phile314 committed Dec 3, 2020
1 parent 3a3291c commit a2793e4
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 27 deletions.
21 changes: 11 additions & 10 deletions src/Kafka/Callbacks.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ module Kafka.Callbacks
( errorCallback
, logCallback
, statsCallback
, Callback
)
where

import Data.ByteString (ByteString)
import Kafka.Internal.RdKafka (rdKafkaConfSetErrorCb, rdKafkaConfSetLogCb, rdKafkaConfSetStatsCb)
import Kafka.Internal.Setup (HasKafkaConf(..), getRdKafkaConf)
import Kafka.Internal.Setup (HasKafkaConf(..), getRdKafkaConf, Callback(..))
import Kafka.Types (KafkaError(..), KafkaLogLevel(..))

-- | Add a callback for errors.
Expand All @@ -20,10 +21,10 @@ import Kafka.Types (KafkaError(..), KafkaLogLevel(..))
-- >
-- > myErrorCallback :: 'KafkaError' -> String -> IO ()
-- > myErrorCallback kafkaError message = print $ show kafkaError <> "|" <> message
errorCallback :: HasKafkaConf k => (KafkaError -> String -> IO ()) -> k -> IO ()
errorCallback callback k =
errorCallback :: (KafkaError -> String -> IO ()) -> Callback
errorCallback callback =
let realCb _ err = callback (KafkaResponseError err)
in rdKafkaConfSetErrorCb (getRdKafkaConf k) realCb
in Callback $ \k -> rdKafkaConfSetErrorCb (getRdKafkaConf k) realCb

-- | Add a callback for logs.
--
Expand All @@ -35,10 +36,10 @@ errorCallback callback k =
-- >
-- > myLogCallback :: 'KafkaLogLevel' -> String -> String -> IO ()
-- > myLogCallback level facility message = print $ show level <> "|" <> facility <> "|" <> message
logCallback :: HasKafkaConf k => (KafkaLogLevel -> String -> String -> IO ()) -> k -> IO ()
logCallback callback k =
logCallback :: (KafkaLogLevel -> String -> String -> IO ()) -> Callback
logCallback callback =
let realCb _ = callback . toEnum
in rdKafkaConfSetLogCb (getRdKafkaConf k) realCb
in Callback $ \k -> rdKafkaConfSetLogCb (getRdKafkaConf k) realCb

-- | Add a callback for stats. The passed ByteString contains an UTF-8 encoded JSON document and can e.g. be parsed using Data.Aeson.decodeStrict. For more information about the content of the JSON document see <https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md>.
--
Expand All @@ -50,7 +51,7 @@ logCallback callback k =
-- >
-- > myStatsCallback :: String -> IO ()
-- > myStatsCallback stats = print $ show stats
statsCallback :: HasKafkaConf k => (ByteString -> IO ()) -> k -> IO ()
statsCallback callback k =
statsCallback :: (ByteString -> IO ()) -> Callback
statsCallback callback =
let realCb _ = callback
in rdKafkaConfSetStatsCb (getRdKafkaConf k) realCb
in Callback $ \k -> rdKafkaConfSetStatsCb (getRdKafkaConf k) realCb
4 changes: 2 additions & 2 deletions src/Kafka/Consumer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ import Foreign hiding (void)
import Kafka.Consumer.Convert (fromMessagePtr, fromNativeTopicPartitionList'', offsetCommitToBool, offsetToInt64, toMap, toNativeTopicPartitionList, toNativeTopicPartitionList', toNativeTopicPartitionListNoDispose, topicPartitionFromMessageForCommit)
import Kafka.Consumer.Types (KafkaConsumer (..))
import Kafka.Internal.RdKafka (RdKafkaRespErrT (..), RdKafkaTopicPartitionListTPtr, RdKafkaTypeT (..), newRdKafkaT, newRdKafkaTopicPartitionListT, newRdKafkaTopicT, rdKafkaAssign, rdKafkaAssignment, rdKafkaCommit, rdKafkaCommitted, rdKafkaConfSetDefaultTopicConf, rdKafkaConsumeBatchQueue, rdKafkaConsumeQueue, rdKafkaConsumerClose, rdKafkaConsumerPoll, rdKafkaOffsetsStore, rdKafkaPausePartitions, rdKafkaPollSetConsumer, rdKafkaPosition, rdKafkaQueueDestroy, rdKafkaQueueNew, rdKafkaResumePartitions, rdKafkaSeek, rdKafkaSetLogLevel, rdKafkaSubscribe, rdKafkaSubscription, rdKafkaTopicConfDup, rdKafkaTopicPartitionListAdd)
import Kafka.Internal.Setup (CallbackPollStatus (..), Kafka (..), KafkaConf (..), KafkaProps (..), TopicConf (..), TopicProps (..), getKafkaConf, getRdKafka, kafkaConf, topicConf)
import Kafka.Internal.Setup (CallbackPollStatus (..), Kafka (..), KafkaConf (..), KafkaProps (..), TopicConf (..), TopicProps (..), getKafkaConf, getRdKafka, kafkaConf, topicConf, Callback(..))
import Kafka.Internal.Shared (kafkaErrorToMaybe, maybeToLeft, rdKafkaErrorToEither)

import Kafka.Consumer.ConsumerProperties as X
Expand Down Expand Up @@ -327,7 +327,7 @@ closeConsumer (KafkaConsumer (Kafka k) (KafkaConf _ qr statusVar)) = liftIO $
newConsumerConf :: ConsumerProperties -> IO KafkaConf
newConsumerConf ConsumerProperties {cpProps = m, cpCallbacks = cbs} = do
conf <- kafkaConf (KafkaProps m)
forM_ cbs (\setCb -> setCb conf)
forM_ cbs (\(Callback setCb) -> setCb conf)
return conf

-- | Subscribes to a given list of topics.
Expand Down
16 changes: 9 additions & 7 deletions src/Kafka/Consumer/Callbacks.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,17 @@ import Kafka.Callbacks as X
import Kafka.Consumer.Convert (fromNativeTopicPartitionList', fromNativeTopicPartitionList'')
import Kafka.Consumer.Types (KafkaConsumer (..), RebalanceEvent (..), TopicPartition (..))
import Kafka.Internal.RdKafka
import Kafka.Internal.Setup (HasKafka (..), HasKafkaConf (..), Kafka (..), KafkaConf (..), getRdMsgQueue)
import Kafka.Internal.Setup (HasKafka (..), HasKafkaConf (..), Kafka (..), KafkaConf (..), getRdMsgQueue, Callback (..))
import Kafka.Types (KafkaError (..), PartitionId (..), TopicName (..))

import qualified Data.Text as Text

-- | Sets a callback that is called when rebalance is needed.
rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> KafkaConf -> IO ()
rebalanceCallback callback kc@(KafkaConf conf _ _) = rdKafkaConfSetRebalanceCb conf realCb
rebalanceCallback :: (KafkaConsumer -> RebalanceEvent -> IO ()) -> Callback
rebalanceCallback callback =
Callback $ \kc@(KafkaConf con _ _) -> rdKafkaConfSetRebalanceCb con (realCb kc)
where
realCb k err pl = do
realCb kc k err pl = do
k' <- newForeignPtr_ k
pls <- newForeignPtr_ pl
setRebalanceCallback callback (KafkaConsumer (Kafka k') kc) (KafkaResponseError err) pls
Expand All @@ -36,10 +37,11 @@ rebalanceCallback callback kc@(KafkaConf conf _ _) = rdKafkaConfSetRebalanceCb c
-- If no partitions had valid offsets to commit this callback will be called
-- with 'KafkaResponseError' 'RdKafkaRespErrNoOffset' which is not to be considered
-- an error.
offsetCommitCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> KafkaConf -> IO ()
offsetCommitCallback callback kc@(KafkaConf conf _ _) = rdKafkaConfSetOffsetCommitCb conf realCb
offsetCommitCallback :: (KafkaConsumer -> KafkaError -> [TopicPartition] -> IO ()) -> Callback
offsetCommitCallback callback =
Callback $ \kc@(KafkaConf conf _ _) -> rdKafkaConfSetOffsetCommitCb conf (realCb kc)
where
realCb k err pl = do
realCb kc k err pl = do
k' <- newForeignPtr_ k
pls <- fromNativeTopicPartitionList' pl
callback (KafkaConsumer (Kafka k') kc) (KafkaResponseError err) pls
Expand Down
6 changes: 3 additions & 3 deletions src/Kafka/Consumer/ConsumerProperties.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import Data.Semigroup as Sem
import Data.Text (Text)
import qualified Data.Text as Text
import Kafka.Consumer.Types (ConsumerGroupId (..))
import Kafka.Internal.Setup (KafkaConf (..))
import Kafka.Internal.Setup (KafkaConf (..), Callback(..))
import Kafka.Types (BrokerAddress (..), ClientId (..), KafkaCompressionCodec (..), KafkaDebug (..), KafkaLogLevel (..), Millis (..), kafkaCompressionCodecToText, kafkaDebugToText)

import Kafka.Consumer.Callbacks as X
Expand All @@ -53,7 +53,7 @@ data CallbackPollMode =
data ConsumerProperties = ConsumerProperties
{ cpProps :: Map Text Text
, cpLogLevel :: Maybe KafkaLogLevel
, cpCallbacks :: [KafkaConf -> IO ()]
, cpCallbacks :: [Callback]
, cpCallbackPollMode :: CallbackPollMode
}

Expand Down Expand Up @@ -117,7 +117,7 @@ clientId (ClientId cid) =
-- * 'errorCallback'
-- * 'logCallback'
-- * 'statsCallback'
setCallback :: (KafkaConf -> IO ()) -> ConsumerProperties
setCallback :: Callback -> ConsumerProperties
setCallback cb = mempty { cpCallbacks = [cb] }

-- | Set the logging level.
Expand Down
6 changes: 6 additions & 0 deletions src/Kafka/Internal/Setup.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ module Kafka.Internal.Setup
, HasKafka(..)
, HasKafkaConf(..)
, HasTopicConf(..)
, Callback(..)
, CallbackPollStatus(..)
, getRdKafka
, getRdKafkaConf
Expand Down Expand Up @@ -46,6 +47,11 @@ newtype TopicProps = TopicProps (Map Text Text) deriving (Show, Eq)
newtype Kafka = Kafka RdKafkaTPtr deriving Show
newtype TopicConf = TopicConf RdKafkaTopicConfTPtr deriving Show

-- | Callbacks allow retrieving various information like error occurences, statistics
-- and log messages.
-- See `Kafka.Consumer.setCallback` (Consumer) and `Kafka.Producer.setCallback` (Producer) for more details.
newtype Callback = Callback (KafkaConf -> IO ())

data CallbackPollStatus = CallbackPollEnabled | CallbackPollDisabled deriving (Show, Eq)

data KafkaConf = KafkaConf
Expand Down
4 changes: 2 additions & 2 deletions src/Kafka/Producer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ import Foreign.Ptr (Ptr, nullPtr, plusPtr)
import Foreign.Storable (Storable (..))
import Foreign.StablePtr (newStablePtr, castStablePtrToPtr)
import Kafka.Internal.RdKafka (RdKafkaMessageT (..), RdKafkaRespErrT (..), RdKafkaTypeT (..), destroyUnmanagedRdKafkaTopic, newRdKafkaT, newUnmanagedRdKafkaTopicT, rdKafkaOutqLen, rdKafkaProduce, rdKafkaProduceBatch, rdKafkaSetLogLevel)
import Kafka.Internal.Setup (Kafka (..), KafkaConf (..), KafkaProps (..), TopicConf (..), TopicProps (..), kafkaConf, topicConf)
import Kafka.Internal.Setup (Kafka (..), KafkaConf (..), KafkaProps (..), TopicConf (..), TopicProps (..), kafkaConf, topicConf, Callback(..))
import Kafka.Internal.Shared (pollEvents)
import Kafka.Producer.Convert (copyMsgFlags, handleProduceErr', producePartitionCInt, producePartitionInt)
import Kafka.Producer.Types (KafkaProducer (..), ImmediateError(..))
Expand Down Expand Up @@ -120,7 +120,7 @@ newProducer pps = liftIO $ do
deliveryCallback (const mempty) kc

-- set callbacks
forM_ (ppCallbacks pps) (\setCb -> setCb kc)
forM_ (ppCallbacks pps) (\(Callback setCb) -> setCb kc)

mbKafka <- newRdKafkaT RdKafkaProducer kc'
case mbKafka of
Expand Down
6 changes: 3 additions & 3 deletions src/Kafka/Producer/ProducerProperties.hs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import Control.Monad (MonadPlus(mplus))
import Data.Map (Map)
import qualified Data.Map as M
import Data.Semigroup as Sem
import Kafka.Internal.Setup (KafkaConf(..))
import Kafka.Internal.Setup (KafkaConf(..), Callback(..))
import Kafka.Types (KafkaDebug(..), Timeout(..), KafkaCompressionCodec(..), KafkaLogLevel(..), BrokerAddress(..), kafkaDebugToText, kafkaCompressionCodecToText, Millis(..))

import Kafka.Producer.Callbacks
Expand All @@ -37,7 +37,7 @@ data ProducerProperties = ProducerProperties
{ ppKafkaProps :: Map Text Text
, ppTopicProps :: Map Text Text
, ppLogLevel :: Maybe KafkaLogLevel
, ppCallbacks :: [KafkaConf -> IO ()]
, ppCallbacks :: [Callback]
}

instance Sem.Semigroup ProducerProperties where
Expand Down Expand Up @@ -70,7 +70,7 @@ brokersList bs =
-- * 'errorCallback'
-- * 'logCallback'
-- * 'statsCallback'
setCallback :: (KafkaConf -> IO ()) -> ProducerProperties
setCallback :: Callback -> ProducerProperties
setCallback cb = mempty { ppCallbacks = [cb] }

-- | Sets the logging level.
Expand Down

0 comments on commit a2793e4

Please sign in to comment.