diff --git a/src/Kafka/Callbacks.hs b/src/Kafka/Callbacks.hs index 35bf204..83649c0 100644 --- a/src/Kafka/Callbacks.hs +++ b/src/Kafka/Callbacks.hs @@ -5,6 +5,7 @@ module Kafka.Callbacks ) where +import Data.ByteString (ByteString) import Kafka.Internal.RdKafka (rdKafkaConfSetErrorCb, rdKafkaConfSetLogCb, rdKafkaConfSetStatsCb) import Kafka.Internal.Setup (HasKafkaConf(..), getRdKafkaConf) import Kafka.Types (KafkaError(..), KafkaLogLevel(..)) @@ -39,7 +40,7 @@ logCallback callback k = let realCb _ = callback . toEnum in rdKafkaConfSetLogCb (getRdKafkaConf k) realCb --- | Add a callback for stats. +-- | Add a callback for stats. The passed ByteString contains an UTF-8 encoded JSON document and can e.g. be parsed using Data.Aeson.decodeStrict. For more information about the content of the JSON document see . -- -- ==== __Examples__ -- @@ -49,7 +50,7 @@ logCallback callback k = -- > -- > myStatsCallback :: String -> IO () -- > myStatsCallback stats = print $ show stats -statsCallback :: HasKafkaConf k => (String -> IO ()) -> k -> IO () +statsCallback :: HasKafkaConf k => (ByteString -> IO ()) -> k -> IO () statsCallback callback k = let realCb _ = callback in rdKafkaConfSetStatsCb (getRdKafkaConf k) realCb diff --git a/src/Kafka/Internal/RdKafka.chs b/src/Kafka/Internal/RdKafka.chs index 3044067..5329ca0 100644 --- a/src/Kafka/Internal/RdKafka.chs +++ b/src/Kafka/Internal/RdKafka.chs @@ -3,6 +3,8 @@ module Kafka.Internal.RdKafka where +import Data.ByteString (ByteString) +import qualified Data.ByteString as BS import Data.Text (Text) import qualified Data.Text as Text import Control.Monad (liftM) @@ -15,7 +17,7 @@ import Foreign.Storable (Storable(..)) import Foreign.Ptr (Ptr, FunPtr, castPtr, nullPtr) import Foreign.ForeignPtr (FinalizerPtr, addForeignPtrFinalizer, newForeignPtr_, withForeignPtr) import Foreign.C.Error (Errno(..), getErrno) -import Foreign.C.String (CString, newCString, withCAString, peekCAString, peekCAStringLen, peekCString) +import Foreign.C.String (CString, newCString, withCAString, peekCAString, peekCString) import Foreign.C.Types (CFile, CInt(..), CSize, CChar) import System.IO (Handle, stdin, stdout, stderr) import System.Posix.IO (handleToFd) @@ -414,7 +416,7 @@ rdKafkaConfSetLogCb conf cb = do ---- Stats Callback type StatsCallback' = Ptr RdKafkaT -> CString -> CSize -> Word8Ptr -> IO () -type StatsCallback = Ptr RdKafkaT -> String -> IO () +type StatsCallback = Ptr RdKafkaT -> ByteString -> IO () foreign import ccall safe "wrapper" mkStatsCallback :: StatsCallback' -> IO (FunPtr StatsCallback') @@ -424,7 +426,7 @@ foreign import ccall safe "rd_kafka.h rd_kafka_conf_set_stats_cb" rdKafkaConfSetStatsCb :: RdKafkaConfTPtr -> StatsCallback -> IO () rdKafkaConfSetStatsCb conf cb = do - cb' <- mkStatsCallback $ \k j jl _ -> peekCAStringLen (j, cIntConv jl) >>= cb k + cb' <- mkStatsCallback $ \k j jl _ -> BS.packCStringLen (j, cIntConv jl) >>= cb k withForeignPtr conf $ \c -> rdKafkaConfSetStatsCb' c cb' return ()