Skip to content

Commit

Permalink
Merge pull request #179 from mlevashov/record_headers
Browse files Browse the repository at this point in the history
Add headers to consumed/produced records
  • Loading branch information
AlexeyRaga authored Oct 16, 2021
2 parents 1bfc027 + 72e6f6d commit 5e61094
Show file tree
Hide file tree
Showing 16 changed files with 281 additions and 145 deletions.
9 changes: 3 additions & 6 deletions example/ProducerExample.hs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ mkMessage k v = ProducerRecord
, prPartition = UnassignedPartition
, prKey = k
, prValue = v
, prHeaders = mempty
}

-- Run an example
Expand Down Expand Up @@ -61,12 +62,8 @@ sendMessages prod = do
putStrLn "And the last one..."
msg3 <- getLine
err3 <- produceMessage prod (mkMessage (Just "key3") (Just $ pack msg3))

-- errs <- produceMessageBatch prod
-- [ mkMessage (Just "b-1") (Just "batch-1")
-- , mkMessage (Just "b-2") (Just "batch-2")
-- , mkMessage Nothing (Just "batch-3")
-- ]

err4 <- produceMessage prod ((mkMessage (Just "key4") (Just $ pack msg3)) { prHeaders = headersFromList [("fancy", "header")]})

-- forM_ errs (print . snd)

Expand Down
14 changes: 7 additions & 7 deletions nix/sources.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,22 @@
"homepage": "https://github.com/nmattia/niv",
"owner": "nmattia",
"repo": "niv",
"rev": "af958e8057f345ee1aca714c1247ef3ba1c15f5e",
"sha256": "1qjavxabbrsh73yck5dcq8jggvh3r2jkbr6b5nlz5d9yrqm9255n",
"rev": "65a61b147f307d24bfd0a5cd56ce7d7b7cc61d2e",
"sha256": "17mirpsx5wyw262fpsd6n6m47jcgw8k2bwcp1iwdnrlzy4dhcgqh",
"type": "tarball",
"url": "https://github.com/nmattia/niv/archive/af958e8057f345ee1aca714c1247ef3ba1c15f5e.tar.gz",
"url": "https://github.com/nmattia/niv/archive/65a61b147f307d24bfd0a5cd56ce7d7b7cc61d2e.tar.gz",
"url_template": "https://github.com/<owner>/<repo>/archive/<rev>.tar.gz"
},
"nixpkgs": {
"branch": "release-19.03",
"branch": "nixos-21.05",
"description": "Nix Packages collection",
"homepage": "",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "da0c385a691d38b56b17eb18b852c4cec2050c24",
"sha256": "0svhqn139cy2nlgv4kqv1bsxza2dcm0yylrhnmanw4p73gv85caf",
"rev": "ce7a1190a0fa4ba3465b5f5471b08567060ca14c",
"sha256": "1zr1s9gp0h5g4arlba1bpb9yqfaaby5195ydm6a2psaxhm748li9",
"type": "tarball",
"url": "https://github.com/NixOS/nixpkgs/archive/da0c385a691d38b56b17eb18b852c4cec2050c24.tar.gz",
"url": "https://github.com/NixOS/nixpkgs/archive/ce7a1190a0fa4ba3465b5f5471b08567060ca14c.tar.gz",
"url_template": "https://github.com/<owner>/<repo>/archive/<rev>.tar.gz"
}
}
2 changes: 1 addition & 1 deletion scripts/build-librdkafka
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash

RDKAFKA_VER="849c066b559950b02e37a69256f0cb7b04381d0e"
RDKAFKA_VER="1a722553638bba85dbda5050455f7b9a5ef302de"

PRJ=$PWD
DST="$PRJ/.librdkafka"
Expand Down
1 change: 1 addition & 0 deletions shell.nix
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pkgs.mkShell {
rdkafka
nettools
niv
gmp
];

shellHook = ''
Expand Down
9 changes: 6 additions & 3 deletions src/Kafka/Consumer/Convert.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ where

import Control.Monad ((>=>))
import qualified Data.ByteString as BS
import Data.Either (fromRight)
import Data.Int (Int64)
import Data.Map.Strict (Map, fromListWith)
import qualified Data.Set as S
Expand All @@ -41,7 +42,7 @@ import Kafka.Internal.RdKafka
, rdKafkaTopicPartitionListNew
, peekCText
)
import Kafka.Internal.Shared (kafkaRespErr, readTopic, readKey, readPayload, readTimestamp)
import Kafka.Internal.Shared (kafkaRespErr, readHeaders, readTopic, readKey, readPayload, readTimestamp)
import Kafka.Types (KafkaError(..), PartitionId(..), TopicName(..))

-- | Converts offsets sync policy to integer (the way Kafka understands it):
Expand Down Expand Up @@ -158,20 +159,22 @@ fromMessagePtr ptr =
s <- peek realPtr
msg <- if err'RdKafkaMessageT s /= RdKafkaRespErrNoError
then return . Left . KafkaResponseError $ err'RdKafkaMessageT s
else Right <$> mkRecord s
else Right <$> mkRecord s realPtr
rdKafkaMessageDestroy realPtr
return msg
where
mkRecord msg = do
mkRecord msg rptr = do
topic <- readTopic msg
key <- readKey msg
payload <- readPayload msg
timestamp <- readTimestamp ptr
headers <- fromRight mempty <$> readHeaders rptr
return ConsumerRecord
{ crTopic = TopicName topic
, crPartition = PartitionId $ partition'RdKafkaMessageT msg
, crOffset = Offset $ offset'RdKafkaMessageT msg
, crTimestamp = timestamp
, crHeaders = headers
, crKey = key
, crValue = payload
}
Expand Down
5 changes: 3 additions & 2 deletions src/Kafka/Consumer/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import Data.Text (Text)
import Data.Typeable (Typeable)
import GHC.Generics (Generic)
import Kafka.Internal.Setup (HasKafka (..), HasKafkaConf (..), Kafka (..), KafkaConf (..))
import Kafka.Types (Millis (..), PartitionId (..), TopicName (..))
import Kafka.Types (Millis (..), PartitionId (..), TopicName (..), Headers)

-- | The main type for Kafka consumption, used e.g. to poll and commit messages.
--
Expand Down Expand Up @@ -143,13 +143,14 @@ data ConsumerRecord k v = ConsumerRecord
, crPartition :: !PartitionId -- ^ Kafka partition this message was received from
, crOffset :: !Offset -- ^ Offset within the 'crPartition' Kafka partition
, crTimestamp :: !Timestamp -- ^ Message timestamp
, crHeaders :: !Headers -- ^ Message headers
, crKey :: !k -- ^ Message key
, crValue :: !v -- ^ Message value
}
deriving (Eq, Show, Read, Typeable, Generic)

instance Bifunctor ConsumerRecord where
bimap f g (ConsumerRecord t p o ts k v) = ConsumerRecord t p o ts (f k) (g v)
bimap f g (ConsumerRecord t p o ts hds k v) = ConsumerRecord t p o ts hds (f k) (g v)
{-# INLINE bimap #-}

instance Functor (ConsumerRecord k) where
Expand Down
118 changes: 115 additions & 3 deletions src/Kafka/Internal/RdKafka.chs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ import Data.Word (Word8)
import Foreign.Concurrent (newForeignPtr)
import qualified Foreign.Concurrent as Concurrent
import Foreign.Marshal.Alloc (alloca, allocaBytes)
import Foreign.Marshal.Array (peekArray, allocaArray)
import Foreign.Marshal.Array (peekArray, allocaArray, withArrayLen)
import Foreign.Storable (Storable(..))
import Foreign.Ptr (Ptr, FunPtr, castPtr, nullPtr)
import Foreign.ForeignPtr (FinalizerPtr, addForeignPtrFinalizer, newForeignPtr_, withForeignPtr)
import Foreign.C.Error (Errno(..), getErrno)
import Foreign.C.String (CString, newCString, withCAString, peekCAString, peekCString)
import Foreign.C.Types (CFile, CInt(..), CSize, CChar)
import Foreign.C.Types (CFile, CInt(..), CSize, CChar, CLong)
import System.IO (Handle, stdin, stdout, stderr)
import System.Posix.IO (handleToFd)
import System.Posix.Types (Fd(..))
Expand Down Expand Up @@ -972,6 +972,118 @@ newRdKafkaTopicT kafkaPtr topic topicConfPtr = do
_ <- traverse (addForeignPtrFinalizer rdKafkaTopicDestroy') res
return res

-------------------------------------------------------------------------------------------------
---- Errors

data RdKafkaErrorT
{#pointer *rd_kafka_error_t as RdKafkaErrorTPtr -> RdKafkaErrorT #}

{#fun rd_kafka_error_code as ^
{`RdKafkaErrorTPtr'} -> `RdKafkaRespErrT' cIntToEnum #}

{#fun rd_kafka_error_destroy as ^
{`RdKafkaErrorTPtr'} -> `()' #}
-------------------------------------------------------------------------------------------------
---- Headers

data RdKafkaHeadersT
{#pointer *rd_kafka_headers_t as RdKafkaHeadersTPtr -> RdKafkaHeadersT #}

{#fun rd_kafka_header_get_all as ^
{`RdKafkaHeadersTPtr', cIntConv `CSize', castPtr `Ptr CString', castPtr `Ptr Word8Ptr', `CSizePtr'} -> `RdKafkaRespErrT' cIntToEnum #}

{#fun rd_kafka_message_headers as ^
{castPtr `Ptr RdKafkaMessageT', alloca- `RdKafkaHeadersTPtr' peekPtr*} -> `RdKafkaRespErrT' cIntToEnum #}

--- Produceva api

{#enum rd_kafka_vtype_t as ^ {underscoreToCase} deriving (Show, Eq) #}

data RdKafkaVuT
= Topic'RdKafkaVu CString
| TopicHandle'RdKafkaVu (Ptr RdKafkaTopicT)
| Partition'RdKafkaVu CInt32T
| Value'RdKafkaVu Word8Ptr CSize
| Key'RdKafkaVu Word8Ptr CSize
| MsgFlags'RdKafkaVu CInt
| Timestamp'RdKafkaVu CInt64T
| Opaque'RdKafkaVu (Ptr ())
| Header'RdKafkaVu CString Word8Ptr CSize
| Headers'RdKafkaVu (Ptr RdKafkaHeadersT) -- The message object will assume ownership of the headers (unless produceva() fails)
| End'RdKafkaVu

{#pointer *rd_kafka_vu_t as RdKafkaVuTPtr foreign -> RdKafkaVuT #}

instance Storable RdKafkaVuT where
alignment _ = {#alignof rd_kafka_vu_t #}
sizeOf _ = {#sizeof rd_kafka_vu_t #}
peek p = {#get rd_kafka_vu_t->vtype #} p >>= \a -> case cIntToEnum a of
RdKafkaVtypeEnd -> return End'RdKafkaVu
RdKafkaVtypeTopic -> Topic'RdKafkaVu <$> ({#get rd_kafka_vu_t->u.cstr #} p)
RdKafkaVtypeMsgflags -> MsgFlags'RdKafkaVu <$> ({#get rd_kafka_vu_t->u.i #} p)
RdKafkaVtypeTimestamp -> Timestamp'RdKafkaVu <$> ({#get rd_kafka_vu_t->u.i64 #} p)
RdKafkaVtypePartition -> Partition'RdKafkaVu <$> ({#get rd_kafka_vu_t->u.i32 #} p)
RdKafkaVtypeHeaders -> Headers'RdKafkaVu <$> ({#get rd_kafka_vu_t->u.headers #} p)
RdKafkaVtypeValue -> do
nm <- liftM castPtr ({#get rd_kafka_vu_t->u.mem.ptr #} p)
sz <- ({#get rd_kafka_vu_t->u.mem.size #} p)
return $ Value'RdKafkaVu nm (cIntConv sz)
RdKafkaVtypeKey -> do
nm <- liftM castPtr ({#get rd_kafka_vu_t->u.mem.ptr #} p)
sz <- ({#get rd_kafka_vu_t->u.mem.size #} p)
return $ Key'RdKafkaVu nm (cIntConv sz)
RdKafkaVtypeRkt -> TopicHandle'RdKafkaVu <$> ({#get rd_kafka_vu_t->u.rkt #} p)
RdKafkaVtypeOpaque -> Opaque'RdKafkaVu <$> ({#get rd_kafka_vu_t->u.ptr #} p)
RdKafkaVtypeHeader -> do
nm <- ({#get rd_kafka_vu_t->u.header.name #} p)
val' <- liftM castPtr ({#get rd_kafka_vu_t->u.header.val #} p)
sz <- ({#get rd_kafka_vu_t->u.header.size #} p)
return $ Header'RdKafkaVu nm val' (cIntConv sz)
poke p End'RdKafkaVu =
{#set rd_kafka_vu_t.vtype #} p (enumToCInt RdKafkaVtypeEnd)
poke p (Topic'RdKafkaVu str) = do
{#set rd_kafka_vu_t.vtype #} p (enumToCInt RdKafkaVtypeTopic)
{#set rd_kafka_vu_t.u.cstr #} p str
poke p (Timestamp'RdKafkaVu tms) = do
{#set rd_kafka_vu_t.vtype #} p (enumToCInt RdKafkaVtypeTimestamp)
{#set rd_kafka_vu_t.u.i64 #} p tms
poke p (Partition'RdKafkaVu prt) = do
{#set rd_kafka_vu_t.vtype #} p (enumToCInt RdKafkaVtypePartition)
{#set rd_kafka_vu_t.u.i32 #} p prt
poke p (MsgFlags'RdKafkaVu flags) = do
{#set rd_kafka_vu_t.vtype #} p (enumToCInt RdKafkaVtypeMsgflags)
{#set rd_kafka_vu_t.u.i #} p flags
poke p (Headers'RdKafkaVu headers) = do
{#set rd_kafka_vu_t.vtype #} p (enumToCInt RdKafkaVtypeHeaders)
{#set rd_kafka_vu_t.u.headers #} p headers
poke p (TopicHandle'RdKafkaVu tphandle) = do
{#set rd_kafka_vu_t.vtype #} p (enumToCInt RdKafkaVtypeRkt)
{#set rd_kafka_vu_t.u.rkt #} p tphandle
poke p (Value'RdKafkaVu pl sz) = do
{#set rd_kafka_vu_t.vtype #} p (enumToCInt RdKafkaVtypeValue)
{#set rd_kafka_vu_t.u.mem.size #} p (cIntConv sz)
{#set rd_kafka_vu_t.u.mem.ptr #} p (castPtr pl)
poke p (Key'RdKafkaVu pl sz) = do
{#set rd_kafka_vu_t.vtype #} p (enumToCInt RdKafkaVtypeKey)
{#set rd_kafka_vu_t.u.mem.size #} p (cIntConv sz)
{#set rd_kafka_vu_t.u.mem.ptr #} p (castPtr pl)
poke p (Opaque'RdKafkaVu ptr') = do
{#set rd_kafka_vu_t.vtype #} p (enumToCInt RdKafkaVtypeOpaque)
{#set rd_kafka_vu_t.u.ptr #} p ptr'
poke p (Header'RdKafkaVu nm val' sz) = do
{#set rd_kafka_vu_t.vtype #} p (enumToCInt RdKafkaVtypeHeader)
{#set rd_kafka_vu_t.u.header.size #} p (cIntConv sz)
{#set rd_kafka_vu_t.u.header.name #} p nm
{#set rd_kafka_vu_t.u.header.val #} p (castPtr val')

{#fun rd_kafka_produceva as rdKafkaMessageProduceVa'
{`RdKafkaTPtr', `RdKafkaVuTPtr', `CLong'} -> `RdKafkaErrorTPtr' #}

rdKafkaMessageProduceVa :: RdKafkaTPtr -> [RdKafkaVuT] -> IO RdKafkaErrorTPtr
rdKafkaMessageProduceVa kafkaPtr vts = withArrayLen vts $ \i arrPtr -> do
fptr <- newForeignPtr_ arrPtr
rdKafkaMessageProduceVa' kafkaPtr fptr (cIntConv i)

-- Marshall / Unmarshall
enumToCInt :: Enum a => a -> CInt
enumToCInt = fromIntegral . fromEnum
Expand Down Expand Up @@ -1013,4 +1125,4 @@ c_stdin = handleToCFile stdin "r"
c_stdout :: IO CFilePtr
c_stdout = handleToCFile stdout "w"
c_stderr :: IO CFilePtr
c_stderr = handleToCFile stderr "w"
c_stderr = handleToCFile stderr "w"
31 changes: 29 additions & 2 deletions src/Kafka/Internal/Shared.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
{-# LANGUAGE LambdaCase #-}

module Kafka.Internal.Shared
( pollEvents
, word8PtrToBS
Expand All @@ -8,6 +10,7 @@ module Kafka.Internal.Shared
, kafkaErrorToEither
, kafkaErrorToMaybe
, maybeToLeft
, readHeaders
, readPayload
, readTopic
, readKey
Expand All @@ -29,9 +32,9 @@ import Foreign.Marshal.Alloc (alloca)
import Foreign.Ptr (Ptr, nullPtr)
import Foreign.Storable (Storable (peek))
import Kafka.Consumer.Types (Timestamp (..))
import Kafka.Internal.RdKafka (RdKafkaMessageT (..), RdKafkaMessageTPtr, RdKafkaRespErrT (..), RdKafkaTimestampTypeT (..), Word8Ptr, rdKafkaErrno2err, rdKafkaMessageTimestamp, rdKafkaPoll, rdKafkaTopicName)
import Kafka.Internal.RdKafka (RdKafkaMessageT (..), RdKafkaMessageTPtr, RdKafkaRespErrT (..), RdKafkaTimestampTypeT (..), Word8Ptr, rdKafkaErrno2err, rdKafkaMessageTimestamp, rdKafkaPoll, rdKafkaTopicName, rdKafkaHeaderGetAll, rdKafkaMessageHeaders)
import Kafka.Internal.Setup (HasKafka (..), Kafka (..))
import Kafka.Types (KafkaError (..), Millis (..), Timeout (..))
import Kafka.Types (KafkaError (..), Millis (..), Timeout (..), Headers, headersFromList)

pollEvents :: HasKafka a => a -> Maybe Timeout -> IO ()
pollEvents a tm =
Expand Down Expand Up @@ -102,6 +105,30 @@ readTimestamp msg =
RdKafkaTimestampLogAppendTime -> LogAppendTime (Millis ts)
RdKafkaTimestampNotAvailable -> NoTimestamp


readHeaders :: Ptr RdKafkaMessageT -> IO (Either RdKafkaRespErrT Headers)
readHeaders msg = do
(err, headersPtr) <- rdKafkaMessageHeaders msg
case err of
RdKafkaRespErrNoent -> return $ Right mempty
RdKafkaRespErrNoError -> fmap headersFromList <$> extractHeaders headersPtr
e -> return . Left $ e
where extractHeaders ptHeaders =
alloca $ \nptr ->
alloca $ \vptr ->
alloca $ \szptr ->
let go acc idx = rdKafkaHeaderGetAll ptHeaders idx nptr vptr szptr >>= \case
RdKafkaRespErrNoent -> return $ Right acc
RdKafkaRespErrNoError -> do
cstr <- peek nptr
wptr <- peek vptr
csize <- peek szptr
hn <- BS.packCString cstr
hv <- word8PtrToBS (fromIntegral csize) wptr
go ((hn, hv) : acc) (idx + 1)
_ -> error "Unexpected error code while extracting headers"
in go [] 0

readBS :: (t -> Int) -> (t -> Ptr Word8) -> t -> IO (Maybe BS.ByteString)
readBS flen fdata s = if fdata s == nullPtr
then return Nothing
Expand Down
Loading

0 comments on commit 5e61094

Please sign in to comment.