diff --git a/README.md b/README.md index 52c86b3..9ff3c46 100644 --- a/README.md +++ b/README.md @@ -40,14 +40,14 @@ import Kafka.Consumer -- Global consumer properties consumerProps :: ConsumerProperties -consumerProps = brokersList [BrokerAddress "localhost:9092"] - <> groupId (ConsumerGroupId "consumer_example_group") +consumerProps = brokersList ["localhost:9092"] + <> groupId "consumer_example_group" <> noAutoCommit <> logLevel KafkaLogInfo -- Subscription to topics consumerSub :: Subscription -consumerSub = topics [TopicName "kafka-client-example-topic"] +consumerSub = topics ["kafka-client-example-topic"] <> offsetReset Earliest -- Running an example @@ -58,7 +58,7 @@ runConsumerExample = do where mkConsumer = newConsumer consumerProps consumerSub clConsumer (Left err) = return (Left err) - clConsumer (Right kc) = (maybe (Right ()) Left) <$> closeConsumer kc + clConsumer (Right kc) = maybe (Right ()) Left <$> closeConsumer kc runHandler (Left err) = return (Left err) runHandler (Right kc) = processMessages kc @@ -97,7 +97,7 @@ never dropped from the queue: ```haskell producerProps :: ProducerProperties -producerProps = brokersList [BrokerAddress "localhost:9092"] +producerProps = brokersList ["localhost:9092"] <> sendTimeout (Timeout 0) -- for librdkafka "0" means "infinite" (see https://github.com/edenhill/librdkafka/issues/2015) ``` @@ -108,7 +108,7 @@ Currently `hw-kafka-client` only supports delivery error callbacks: ```haskell producerProps :: ProducerProperties -producerProps = brokersList [BrokerAddress "localhost:9092"] +producerProps = brokersList ["localhost:9092"] <> setCallback (deliveryCallback print) ``` @@ -126,12 +126,12 @@ import Kafka.Producer -- Global producer properties producerProps :: ProducerProperties -producerProps = brokersList [BrokerAddress "localhost:9092"] +producerProps = brokersList ["localhost:9092"] <> logLevel KafkaLogDebug -- Topic to send messages to targetTopic :: TopicName -targetTopic = TopicName "kafka-client-example-topic" +targetTopic = "kafka-client-example-topic" -- Run an example runProducerExample :: IO () diff --git a/example/ConsumerExample.hs b/example/ConsumerExample.hs index d1a97d2..df272eb 100644 --- a/example/ConsumerExample.hs +++ b/example/ConsumerExample.hs @@ -11,8 +11,8 @@ import Data.Text (Text) -- Global consumer properties consumerProps :: ConsumerProperties -consumerProps = brokersList [BrokerAddress "localhost:9092"] - <> groupId (ConsumerGroupId "consumer_example_group") +consumerProps = brokersList ["localhost:9092"] + <> groupId "consumer_example_group" <> noAutoCommit <> setCallback (rebalanceCallback printingRebalanceCallback) <> setCallback (offsetCommitCallback printingOffsetCallback) @@ -20,7 +20,7 @@ consumerProps = brokersList [BrokerAddress "localhost:9092"] -- Subscription to topics consumerSub :: Subscription -consumerSub = topics [TopicName "kafka-client-example-topic"] +consumerSub = topics ["kafka-client-example-topic"] <> offsetReset Earliest -- Running an example @@ -32,7 +32,7 @@ runConsumerExample = do where mkConsumer = newConsumer consumerProps consumerSub clConsumer (Left err) = return (Left err) - clConsumer (Right kc) = (maybe (Right ()) Left) <$> closeConsumer kc + clConsumer (Right kc) = maybe (Right ()) Left <$> closeConsumer kc runHandler (Left err) = return (Left err) runHandler (Right kc) = processMessages kc diff --git a/example/ProducerExample.hs b/example/ProducerExample.hs index ce86c26..a56775e 100644 --- a/example/ProducerExample.hs +++ b/example/ProducerExample.hs @@ -16,14 +16,14 @@ import Data.Text (Text) -- Global producer properties producerProps :: ProducerProperties -producerProps = brokersList [BrokerAddress "localhost:9092"] +producerProps = brokersList ["localhost:9092"] <> sendTimeout (Timeout 10000) <> setCallback (deliveryCallback print) <> logLevel KafkaLogDebug -- Topic to send messages to targetTopic :: TopicName -targetTopic = TopicName "kafka-client-example-topic" +targetTopic = "kafka-client-example-topic" mkMessage :: Maybe ByteString -> Maybe ByteString -> ProducerRecord mkMessage k v = ProducerRecord diff --git a/src/Kafka/Consumer.hs b/src/Kafka/Consumer.hs index 41cbe66..0b17fd0 100644 --- a/src/Kafka/Consumer.hs +++ b/src/Kafka/Consumer.hs @@ -15,7 +15,7 @@ -- -- -- Global consumer properties -- consumerProps :: 'ConsumerProperties' --- consumerProps = 'brokersList' ['BrokerAddress' "localhost:9092"] +-- consumerProps = 'brokersList' ["localhost:9092"] -- <> 'groupId' ('ConsumerGroupId' "consumer_example_group") -- <> 'noAutoCommit' -- <> 'logLevel' 'KafkaLogInfo' diff --git a/src/Kafka/Consumer/ConsumerProperties.hs b/src/Kafka/Consumer/ConsumerProperties.hs index 2ebe198..689ce62 100644 --- a/src/Kafka/Consumer/ConsumerProperties.hs +++ b/src/Kafka/Consumer/ConsumerProperties.hs @@ -76,7 +76,7 @@ instance Monoid ConsumerProperties where -- | Set the to contact to connect to the Kafka cluster. brokersList :: [BrokerAddress] -> ConsumerProperties brokersList bs = - let bs' = Text.intercalate "," ((\(BrokerAddress x) -> x) <$> bs) + let bs' = Text.intercalate "," (unBrokerAddress <$> bs) in extraProps $ M.fromList [("bootstrap.servers", bs')] -- | Set the and enables . diff --git a/src/Kafka/Consumer/Types.hs b/src/Kafka/Consumer/Types.hs index de34821..e0da633 100644 --- a/src/Kafka/Consumer/Types.hs +++ b/src/Kafka/Consumer/Types.hs @@ -1,5 +1,6 @@ -{-# LANGUAGE DeriveDataTypeable #-} -{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE DeriveDataTypeable #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} ----------------------------------------------------------------------------- -- | @@ -37,6 +38,7 @@ import Data.Bifoldable (Bifoldable (..)) import Data.Bifunctor (Bifunctor (..)) import Data.Bitraversable (Bitraversable (..), bimapM, bisequence) import Data.Int (Int64) +import Data.String (IsString) import Data.Text (Text) import Data.Typeable (Typeable) import GHC.Generics (Generic) @@ -62,7 +64,9 @@ instance HasKafkaConf KafkaConsumer where -- | Consumer group ID. Different consumers with the same consumer group ID will get assigned different partitions of each subscribed topic. -- -- See -newtype ConsumerGroupId = ConsumerGroupId { unConsumerGroupId :: Text } deriving (Show, Ord, Eq, Generic) +newtype ConsumerGroupId = ConsumerGroupId + { unConsumerGroupId :: Text + } deriving (Show, Ord, Eq, IsString, Generic) -- | A message offset in a partition newtype Offset = Offset { unOffset :: Int64 } deriving (Show, Eq, Ord, Read, Generic) diff --git a/src/Kafka/Internal/Shared.hs b/src/Kafka/Internal/Shared.hs index 916b7f4..d84ddbf 100644 --- a/src/Kafka/Internal/Shared.hs +++ b/src/Kafka/Internal/Shared.hs @@ -35,8 +35,8 @@ import Kafka.Types (KafkaError (..), Millis (..), Timeout pollEvents :: HasKafka a => a -> Maybe Timeout -> IO () pollEvents a tm = - let timeout = maybe 0 (\(Timeout ms) -> ms) tm - (Kafka k) = getKafka a + let timeout = maybe 0 unTimeout tm + Kafka k = getKafka a in void (rdKafkaPoll k timeout) word8PtrToBS :: Int -> Word8Ptr -> IO BS.ByteString diff --git a/src/Kafka/Metadata.hs b/src/Kafka/Metadata.hs index bb4b8cf..b44749d 100644 --- a/src/Kafka/Metadata.hs +++ b/src/Kafka/Metadata.hs @@ -260,8 +260,8 @@ fromPartitionMetadataPtr pm = do { pmPartitionId = PartitionId (id'RdKafkaMetadataPartitionT pm) , pmError = kafkaErrorToMaybe $ KafkaResponseError (err'RdKafkaMetadataPartitionT pm) , pmLeader = BrokerId (leader'RdKafkaMetadataPartitionT pm) - , pmReplicas = (BrokerId . fromIntegral) <$> reps - , pmInSyncReplicas = (BrokerId . fromIntegral) <$> isrs + , pmReplicas = BrokerId . fromIntegral <$> reps + , pmInSyncReplicas = BrokerId . fromIntegral <$> isrs } @@ -296,4 +296,4 @@ groupStateFromKafkaString s = case s of "Stable" -> GroupStable "Dead" -> GroupDead "Empty" -> GroupEmpty - _ -> error $ "Unknown group state: " <> (Text.unpack s) + _ -> error $ "Unknown group state: " <> Text.unpack s diff --git a/src/Kafka/Producer.hs b/src/Kafka/Producer.hs index 63f878d..139088e 100644 --- a/src/Kafka/Producer.hs +++ b/src/Kafka/Producer.hs @@ -15,7 +15,7 @@ -- -- -- Global producer properties -- producerProps :: 'ProducerProperties' --- producerProps = 'brokersList' ['BrokerAddress' "localhost:9092"] +-- producerProps = 'brokersList' ["localhost:9092"] -- <> 'logLevel' 'KafkaLogDebug' -- -- -- Topic to send messages to diff --git a/src/Kafka/Producer/Convert.hs b/src/Kafka/Producer/Convert.hs index 5e46521..e1135e6 100644 --- a/src/Kafka/Producer/Convert.hs +++ b/src/Kafka/Producer/Convert.hs @@ -28,13 +28,13 @@ producePartitionCInt = fromIntegral . producePartitionInt {-# INLINE producePartitionCInt #-} handleProduceErr :: Int -> IO (Maybe KafkaError) -handleProduceErr (- 1) = (Just . kafkaRespErr) <$> getErrno +handleProduceErr (- 1) = Just . kafkaRespErr <$> getErrno handleProduceErr 0 = return Nothing handleProduceErr _ = return $ Just KafkaInvalidReturnValue {-# INLINE handleProduceErr #-} handleProduceErr' :: Int -> IO (Either KafkaError ()) -handleProduceErr' (- 1) = (Left . kafkaRespErr) <$> getErrno +handleProduceErr' (- 1) = Left . kafkaRespErr <$> getErrno handleProduceErr' 0 = return (Right ()) handleProduceErr' _ = return $ Left KafkaInvalidReturnValue {-# INLINE handleProduceErr' #-} diff --git a/src/Kafka/Producer/ProducerProperties.hs b/src/Kafka/Producer/ProducerProperties.hs index a89d8b7..d58d205 100644 --- a/src/Kafka/Producer/ProducerProperties.hs +++ b/src/Kafka/Producer/ProducerProperties.hs @@ -59,7 +59,7 @@ instance Monoid ProducerProperties where -- | Set the to contact to connect to the Kafka cluster. brokersList :: [BrokerAddress] -> ProducerProperties brokersList bs = - let bs' = Text.intercalate "," ((\(BrokerAddress x) -> x) <$> bs) + let bs' = Text.intercalate "," (unBrokerAddress <$> bs) in extraProps $ M.fromList [("bootstrap.servers", bs')] -- | Set the producer callback. diff --git a/src/Kafka/Types.hs b/src/Kafka/Types.hs index 6d2397f..7cbcd82 100644 --- a/src/Kafka/Types.hs +++ b/src/Kafka/Types.hs @@ -29,6 +29,7 @@ where import Control.Exception (Exception (..)) import Data.Int (Int64) +import Data.String (IsString) import Data.Text (Text, isPrefixOf) import Data.Typeable (Typeable) import GHC.Generics (Generic) @@ -46,7 +47,9 @@ newtype Millis = Millis { unMillis :: Int64 } deriving (Show, Read, Eq, Ord -- | Client ID used by Kafka to better track requests -- -- See -newtype ClientId = ClientId { unClientId :: Text } deriving (Show, Eq, Ord, Generic) +newtype ClientId = ClientId + { unClientId :: Text + } deriving (Show, Eq, IsString, Ord, Generic) -- | Batch size used for polling newtype BatchSize = BatchSize { unBatchSize :: Int } deriving (Show, Read, Eq, Ord, Num, Generic) @@ -63,9 +66,9 @@ data TopicType = -- any topic name in the topics list that is prefixed with @^@ will -- be regex-matched to the full list of topics in the cluster and matching -- topics will be added to the subscription list. -newtype TopicName = - TopicName { unTopicName :: Text } -- ^ a simple topic name or a regex if started with @^@ - deriving (Show, Eq, Ord, Read, Generic) +newtype TopicName = TopicName + { unTopicName :: Text -- ^ a simple topic name or a regex if started with @^@ + } deriving (Show, Eq, Ord, IsString, Read, Generic) -- | Deduce the type of a topic from its name, by checking if it starts with a double underscore "\__" topicType :: TopicName -> TopicType @@ -74,7 +77,9 @@ topicType (TopicName tn) = {-# INLINE topicType #-} -- | Kafka broker address string (e.g. @broker1:9092@) -newtype BrokerAddress = BrokerAddress { unBrokerAddress :: Text } deriving (Show, Eq, Generic) +newtype BrokerAddress = BrokerAddress + { unBrokerAddress :: Text + } deriving (Show, Eq, IsString, Generic) -- | Timeout in milliseconds newtype Timeout = Timeout { unTimeout :: Int } deriving (Show, Eq, Read, Generic) diff --git a/tests-it/Kafka/IntegrationSpec.hs b/tests-it/Kafka/IntegrationSpec.hs index 685572a..d8c2a04 100644 --- a/tests-it/Kafka/IntegrationSpec.hs +++ b/tests-it/Kafka/IntegrationSpec.hs @@ -118,7 +118,7 @@ spec = do var <- newEmptyMVar let msg = ProducerRecord - { prTopic = TopicName "callback-topic" + { prTopic = "callback-topic" , prPartition = UnassignedPartition , prKey = Nothing , prValue = Just "test from producer" @@ -136,7 +136,7 @@ spec = do specWithConsumer "Run consumer with sync polling" (consumerProps <> groupId (makeGroupId "sync") <> callbackPollMode CallbackPollModeSync) runConsumerSpec describe "Kafka.Consumer.BatchSpec" $ do - specWithConsumer "Batch consumer" (consumerProps <> groupId (ConsumerGroupId "batch-consumer")) $ do + specWithConsumer "Batch consumer" (consumerProps <> groupId "batch-consumer") $ do it "should consume first batch" $ \k -> do res <- pollMessageBatch k (Timeout 1000) (BatchSize 5) length res `shouldBe` 5 @@ -232,8 +232,8 @@ runConsumerSpec = do it "should return topic metadata" $ \k -> do res <- topicMetadata k (Timeout 1000) testTopic res `shouldSatisfy` isRight - (length . kmBrokers) <$> res `shouldBe` Right 1 - (length . kmTopics) <$> res `shouldBe` Right 1 + length . kmBrokers <$> res `shouldBe` Right 1 + length . kmTopics <$> res `shouldBe` Right 1 it "should describe all consumer groups" $ \k -> do res <- allConsumerGroupsInfo k (Timeout 1000) @@ -248,7 +248,7 @@ runConsumerSpec = do fmap giGroup <$> res `shouldBe` Right [testGroupId] it "should describe non-existent consumer group" $ \k -> do - res <- consumerGroupInfo k (Timeout 1000) (ConsumerGroupId "does-not-exist") + res <- consumerGroupInfo k (Timeout 1000) "does-not-exist" res `shouldBe` Right [] it "should read topic offsets for time" $ \k -> do diff --git a/tests-it/Kafka/TestEnv.hs b/tests-it/Kafka/TestEnv.hs index 3c90977..d31b405 100644 --- a/tests-it/Kafka/TestEnv.hs +++ b/tests-it/Kafka/TestEnv.hs @@ -24,12 +24,12 @@ testPrefix = unsafePerformIO $ take 10 . Rnd.randomRs ('a','z') <$> Rnd.newStdGe brokerAddress :: BrokerAddress brokerAddress = unsafePerformIO $ - (BrokerAddress . Text.pack) <$> getEnv "KAFKA_TEST_BROKER" `catch` \(_ :: SomeException) -> return "localhost:9092" + BrokerAddress . Text.pack <$> getEnv "KAFKA_TEST_BROKER" `catch` \(_ :: SomeException) -> return "localhost:9092" {-# NOINLINE brokerAddress #-} testTopic :: TopicName testTopic = unsafePerformIO $ - (TopicName . Text.pack) <$> getEnv "KAFKA_TEST_TOPIC" `catch` \(_ :: SomeException) -> return $ testPrefix <> "-topic" + TopicName . Text.pack <$> getEnv "KAFKA_TEST_TOPIC" `catch` \(_ :: SomeException) -> return $ testPrefix <> "-topic" {-# NOINLINE testTopic #-} testGroupId :: ConsumerGroupId diff --git a/tests/Kafka/Consumer/ConsumerRecordMapSpec.hs b/tests/Kafka/Consumer/ConsumerRecordMapSpec.hs index 792a72e..2c458a0 100644 --- a/tests/Kafka/Consumer/ConsumerRecordMapSpec.hs +++ b/tests/Kafka/Consumer/ConsumerRecordMapSpec.hs @@ -15,7 +15,7 @@ testValue = "some-value" testRecord :: ConsumerRecord (Maybe Text) (Maybe Text) testRecord = ConsumerRecord - { crTopic = TopicName "some-topic" + { crTopic = "some-topic" , crPartition = PartitionId 0 , crOffset = Offset 5 , crTimestamp = NoTimestamp diff --git a/tests/Kafka/Consumer/ConsumerRecordTraverseSpec.hs b/tests/Kafka/Consumer/ConsumerRecordTraverseSpec.hs index d6766cb..031f756 100644 --- a/tests/Kafka/Consumer/ConsumerRecordTraverseSpec.hs +++ b/tests/Kafka/Consumer/ConsumerRecordTraverseSpec.hs @@ -17,7 +17,7 @@ testValue = "some-value" testRecord :: ConsumerRecord Text Text testRecord = ConsumerRecord - { crTopic = TopicName "some-topic" + { crTopic = "some-topic" , crPartition = PartitionId 0 , crOffset = Offset 5 , crTimestamp = NoTimestamp