Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IsString instance #163

Merged
merged 1 commit into from
Dec 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ import Kafka.Consumer

-- Global consumer properties
consumerProps :: ConsumerProperties
consumerProps = brokersList [BrokerAddress "localhost:9092"]
<> groupId (ConsumerGroupId "consumer_example_group")
consumerProps = brokersList ["localhost:9092"]
<> groupId "consumer_example_group"
<> noAutoCommit
<> logLevel KafkaLogInfo

-- Subscription to topics
consumerSub :: Subscription
consumerSub = topics [TopicName "kafka-client-example-topic"]
consumerSub = topics ["kafka-client-example-topic"]
<> offsetReset Earliest

-- Running an example
Expand All @@ -58,7 +58,7 @@ runConsumerExample = do
where
mkConsumer = newConsumer consumerProps consumerSub
clConsumer (Left err) = return (Left err)
clConsumer (Right kc) = (maybe (Right ()) Left) <$> closeConsumer kc
clConsumer (Right kc) = maybe (Right ()) Left <$> closeConsumer kc
runHandler (Left err) = return (Left err)
runHandler (Right kc) = processMessages kc

Expand Down Expand Up @@ -97,7 +97,7 @@ never dropped from the queue:

```haskell
producerProps :: ProducerProperties
producerProps = brokersList [BrokerAddress "localhost:9092"]
producerProps = brokersList ["localhost:9092"]
<> sendTimeout (Timeout 0) -- for librdkafka "0" means "infinite" (see https://github.com/edenhill/librdkafka/issues/2015)
```

Expand All @@ -108,7 +108,7 @@ Currently `hw-kafka-client` only supports delivery error callbacks:

```haskell
producerProps :: ProducerProperties
producerProps = brokersList [BrokerAddress "localhost:9092"]
producerProps = brokersList ["localhost:9092"]
<> setCallback (deliveryCallback print)
```

Expand All @@ -126,12 +126,12 @@ import Kafka.Producer

-- Global producer properties
producerProps :: ProducerProperties
producerProps = brokersList [BrokerAddress "localhost:9092"]
producerProps = brokersList ["localhost:9092"]
<> logLevel KafkaLogDebug

-- Topic to send messages to
targetTopic :: TopicName
targetTopic = TopicName "kafka-client-example-topic"
targetTopic = "kafka-client-example-topic"

-- Run an example
runProducerExample :: IO ()
Expand Down
8 changes: 4 additions & 4 deletions example/ConsumerExample.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ import Data.Text (Text)

-- Global consumer properties
consumerProps :: ConsumerProperties
consumerProps = brokersList [BrokerAddress "localhost:9092"]
<> groupId (ConsumerGroupId "consumer_example_group")
consumerProps = brokersList ["localhost:9092"]
<> groupId "consumer_example_group"
<> noAutoCommit
<> setCallback (rebalanceCallback printingRebalanceCallback)
<> setCallback (offsetCommitCallback printingOffsetCallback)
<> logLevel KafkaLogInfo

-- Subscription to topics
consumerSub :: Subscription
consumerSub = topics [TopicName "kafka-client-example-topic"]
consumerSub = topics ["kafka-client-example-topic"]
<> offsetReset Earliest

-- Running an example
Expand All @@ -32,7 +32,7 @@ runConsumerExample = do
where
mkConsumer = newConsumer consumerProps consumerSub
clConsumer (Left err) = return (Left err)
clConsumer (Right kc) = (maybe (Right ()) Left) <$> closeConsumer kc
clConsumer (Right kc) = maybe (Right ()) Left <$> closeConsumer kc
runHandler (Left err) = return (Left err)
runHandler (Right kc) = processMessages kc

Expand Down
4 changes: 2 additions & 2 deletions example/ProducerExample.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ import Data.Text (Text)

-- Global producer properties
producerProps :: ProducerProperties
producerProps = brokersList [BrokerAddress "localhost:9092"]
producerProps = brokersList ["localhost:9092"]
<> sendTimeout (Timeout 10000)
<> setCallback (deliveryCallback print)
<> logLevel KafkaLogDebug

-- Topic to send messages to
targetTopic :: TopicName
targetTopic = TopicName "kafka-client-example-topic"
targetTopic = "kafka-client-example-topic"

mkMessage :: Maybe ByteString -> Maybe ByteString -> ProducerRecord
mkMessage k v = ProducerRecord
Expand Down
2 changes: 1 addition & 1 deletion src/Kafka/Consumer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
--
-- -- Global consumer properties
-- consumerProps :: 'ConsumerProperties'
-- consumerProps = 'brokersList' ['BrokerAddress' "localhost:9092"]
-- consumerProps = 'brokersList' ["localhost:9092"]
-- <> 'groupId' ('ConsumerGroupId' "consumer_example_group")
-- <> 'noAutoCommit'
-- <> 'logLevel' 'KafkaLogInfo'
Expand Down
2 changes: 1 addition & 1 deletion src/Kafka/Consumer/ConsumerProperties.hs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ instance Monoid ConsumerProperties where
-- | Set the <https://kafka.apache.org/documentation/#bootstrap.servers list of brokers> to contact to connect to the Kafka cluster.
brokersList :: [BrokerAddress] -> ConsumerProperties
brokersList bs =
let bs' = Text.intercalate "," ((\(BrokerAddress x) -> x) <$> bs)
let bs' = Text.intercalate "," (unBrokerAddress <$> bs)
in extraProps $ M.fromList [("bootstrap.servers", bs')]

-- | Set the <https://kafka.apache.org/documentation/#auto.commit.interval.ms auto commit interval> and enables <https://kafka.apache.org/documentation/#enable.auto.commit auto commit>.
Expand Down
10 changes: 7 additions & 3 deletions src/Kafka/Consumer/Types.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}

-----------------------------------------------------------------------------
-- |
Expand Down Expand Up @@ -37,6 +38,7 @@ import Data.Bifoldable (Bifoldable (..))
import Data.Bifunctor (Bifunctor (..))
import Data.Bitraversable (Bitraversable (..), bimapM, bisequence)
import Data.Int (Int64)
import Data.String (IsString)
import Data.Text (Text)
import Data.Typeable (Typeable)
import GHC.Generics (Generic)
Expand All @@ -62,7 +64,9 @@ instance HasKafkaConf KafkaConsumer where
-- | Consumer group ID. Different consumers with the same consumer group ID will get assigned different partitions of each subscribed topic.
--
-- See <https://kafka.apache.org/documentation/#group.id Kafka documentation on consumer group>
newtype ConsumerGroupId = ConsumerGroupId { unConsumerGroupId :: Text } deriving (Show, Ord, Eq, Generic)
newtype ConsumerGroupId = ConsumerGroupId
{ unConsumerGroupId :: Text
} deriving (Show, Ord, Eq, IsString, Generic)

-- | A message offset in a partition
newtype Offset = Offset { unOffset :: Int64 } deriving (Show, Eq, Ord, Read, Generic)
Expand Down
4 changes: 2 additions & 2 deletions src/Kafka/Internal/Shared.hs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ import Kafka.Types (KafkaError (..), Millis (..), Timeout

pollEvents :: HasKafka a => a -> Maybe Timeout -> IO ()
pollEvents a tm =
let timeout = maybe 0 (\(Timeout ms) -> ms) tm
(Kafka k) = getKafka a
let timeout = maybe 0 unTimeout tm
Kafka k = getKafka a
in void (rdKafkaPoll k timeout)

word8PtrToBS :: Int -> Word8Ptr -> IO BS.ByteString
Expand Down
6 changes: 3 additions & 3 deletions src/Kafka/Metadata.hs
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,8 @@ fromPartitionMetadataPtr pm = do
{ pmPartitionId = PartitionId (id'RdKafkaMetadataPartitionT pm)
, pmError = kafkaErrorToMaybe $ KafkaResponseError (err'RdKafkaMetadataPartitionT pm)
, pmLeader = BrokerId (leader'RdKafkaMetadataPartitionT pm)
, pmReplicas = (BrokerId . fromIntegral) <$> reps
, pmInSyncReplicas = (BrokerId . fromIntegral) <$> isrs
, pmReplicas = BrokerId . fromIntegral <$> reps
, pmInSyncReplicas = BrokerId . fromIntegral <$> isrs
}


Expand Down Expand Up @@ -296,4 +296,4 @@ groupStateFromKafkaString s = case s of
"Stable" -> GroupStable
"Dead" -> GroupDead
"Empty" -> GroupEmpty
_ -> error $ "Unknown group state: " <> (Text.unpack s)
_ -> error $ "Unknown group state: " <> Text.unpack s
2 changes: 1 addition & 1 deletion src/Kafka/Producer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
--
-- -- Global producer properties
-- producerProps :: 'ProducerProperties'
-- producerProps = 'brokersList' ['BrokerAddress' "localhost:9092"]
-- producerProps = 'brokersList' ["localhost:9092"]
-- <> 'logLevel' 'KafkaLogDebug'
--
-- -- Topic to send messages to
Expand Down
4 changes: 2 additions & 2 deletions src/Kafka/Producer/Convert.hs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ producePartitionCInt = fromIntegral . producePartitionInt
{-# INLINE producePartitionCInt #-}

handleProduceErr :: Int -> IO (Maybe KafkaError)
handleProduceErr (- 1) = (Just . kafkaRespErr) <$> getErrno
handleProduceErr (- 1) = Just . kafkaRespErr <$> getErrno
handleProduceErr 0 = return Nothing
handleProduceErr _ = return $ Just KafkaInvalidReturnValue
{-# INLINE handleProduceErr #-}

handleProduceErr' :: Int -> IO (Either KafkaError ())
handleProduceErr' (- 1) = (Left . kafkaRespErr) <$> getErrno
handleProduceErr' (- 1) = Left . kafkaRespErr <$> getErrno
handleProduceErr' 0 = return (Right ())
handleProduceErr' _ = return $ Left KafkaInvalidReturnValue
{-# INLINE handleProduceErr' #-}
2 changes: 1 addition & 1 deletion src/Kafka/Producer/ProducerProperties.hs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ instance Monoid ProducerProperties where
-- | Set the <https://kafka.apache.org/documentation/#bootstrap.servers list of brokers> to contact to connect to the Kafka cluster.
brokersList :: [BrokerAddress] -> ProducerProperties
brokersList bs =
let bs' = Text.intercalate "," ((\(BrokerAddress x) -> x) <$> bs)
let bs' = Text.intercalate "," (unBrokerAddress <$> bs)
in extraProps $ M.fromList [("bootstrap.servers", bs')]

-- | Set the producer callback.
Expand Down
15 changes: 10 additions & 5 deletions src/Kafka/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ where

import Control.Exception (Exception (..))
import Data.Int (Int64)
import Data.String (IsString)
import Data.Text (Text, isPrefixOf)
import Data.Typeable (Typeable)
import GHC.Generics (Generic)
Expand All @@ -46,7 +47,9 @@ newtype Millis = Millis { unMillis :: Int64 } deriving (Show, Read, Eq, Ord
-- | Client ID used by Kafka to better track requests
--
-- See <https://kafka.apache.org/documentation/#client.id Kafka documentation on client ID>
newtype ClientId = ClientId { unClientId :: Text } deriving (Show, Eq, Ord, Generic)
newtype ClientId = ClientId
{ unClientId :: Text
} deriving (Show, Eq, IsString, Ord, Generic)

-- | Batch size used for polling
newtype BatchSize = BatchSize { unBatchSize :: Int } deriving (Show, Read, Eq, Ord, Num, Generic)
Expand All @@ -63,9 +66,9 @@ data TopicType =
-- any topic name in the topics list that is prefixed with @^@ will
-- be regex-matched to the full list of topics in the cluster and matching
-- topics will be added to the subscription list.
newtype TopicName =
TopicName { unTopicName :: Text } -- ^ a simple topic name or a regex if started with @^@
deriving (Show, Eq, Ord, Read, Generic)
newtype TopicName = TopicName
{ unTopicName :: Text -- ^ a simple topic name or a regex if started with @^@
} deriving (Show, Eq, Ord, IsString, Read, Generic)

-- | Deduce the type of a topic from its name, by checking if it starts with a double underscore "\__"
topicType :: TopicName -> TopicType
Expand All @@ -74,7 +77,9 @@ topicType (TopicName tn) =
{-# INLINE topicType #-}

-- | Kafka broker address string (e.g. @broker1:9092@)
newtype BrokerAddress = BrokerAddress { unBrokerAddress :: Text } deriving (Show, Eq, Generic)
newtype BrokerAddress = BrokerAddress
{ unBrokerAddress :: Text
} deriving (Show, Eq, IsString, Generic)

-- | Timeout in milliseconds
newtype Timeout = Timeout { unTimeout :: Int } deriving (Show, Eq, Read, Generic)
Expand Down
10 changes: 5 additions & 5 deletions tests-it/Kafka/IntegrationSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ spec = do
var <- newEmptyMVar
let
msg = ProducerRecord
{ prTopic = TopicName "callback-topic"
{ prTopic = "callback-topic"
, prPartition = UnassignedPartition
, prKey = Nothing
, prValue = Just "test from producer"
Expand All @@ -136,7 +136,7 @@ spec = do
specWithConsumer "Run consumer with sync polling" (consumerProps <> groupId (makeGroupId "sync") <> callbackPollMode CallbackPollModeSync) runConsumerSpec

describe "Kafka.Consumer.BatchSpec" $ do
specWithConsumer "Batch consumer" (consumerProps <> groupId (ConsumerGroupId "batch-consumer")) $ do
specWithConsumer "Batch consumer" (consumerProps <> groupId "batch-consumer") $ do
it "should consume first batch" $ \k -> do
res <- pollMessageBatch k (Timeout 1000) (BatchSize 5)
length res `shouldBe` 5
Expand Down Expand Up @@ -232,8 +232,8 @@ runConsumerSpec = do
it "should return topic metadata" $ \k -> do
res <- topicMetadata k (Timeout 1000) testTopic
res `shouldSatisfy` isRight
(length . kmBrokers) <$> res `shouldBe` Right 1
(length . kmTopics) <$> res `shouldBe` Right 1
length . kmBrokers <$> res `shouldBe` Right 1
length . kmTopics <$> res `shouldBe` Right 1

it "should describe all consumer groups" $ \k -> do
res <- allConsumerGroupsInfo k (Timeout 1000)
Expand All @@ -248,7 +248,7 @@ runConsumerSpec = do
fmap giGroup <$> res `shouldBe` Right [testGroupId]

it "should describe non-existent consumer group" $ \k -> do
res <- consumerGroupInfo k (Timeout 1000) (ConsumerGroupId "does-not-exist")
res <- consumerGroupInfo k (Timeout 1000) "does-not-exist"
res `shouldBe` Right []

it "should read topic offsets for time" $ \k -> do
Expand Down
4 changes: 2 additions & 2 deletions tests-it/Kafka/TestEnv.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ testPrefix = unsafePerformIO $ take 10 . Rnd.randomRs ('a','z') <$> Rnd.newStdGe

brokerAddress :: BrokerAddress
brokerAddress = unsafePerformIO $
(BrokerAddress . Text.pack) <$> getEnv "KAFKA_TEST_BROKER" `catch` \(_ :: SomeException) -> return "localhost:9092"
BrokerAddress . Text.pack <$> getEnv "KAFKA_TEST_BROKER" `catch` \(_ :: SomeException) -> return "localhost:9092"
{-# NOINLINE brokerAddress #-}

testTopic :: TopicName
testTopic = unsafePerformIO $
(TopicName . Text.pack) <$> getEnv "KAFKA_TEST_TOPIC" `catch` \(_ :: SomeException) -> return $ testPrefix <> "-topic"
TopicName . Text.pack <$> getEnv "KAFKA_TEST_TOPIC" `catch` \(_ :: SomeException) -> return $ testPrefix <> "-topic"
{-# NOINLINE testTopic #-}

testGroupId :: ConsumerGroupId
Expand Down
2 changes: 1 addition & 1 deletion tests/Kafka/Consumer/ConsumerRecordMapSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ testValue = "some-value"

testRecord :: ConsumerRecord (Maybe Text) (Maybe Text)
testRecord = ConsumerRecord
{ crTopic = TopicName "some-topic"
{ crTopic = "some-topic"
, crPartition = PartitionId 0
, crOffset = Offset 5
, crTimestamp = NoTimestamp
Expand Down
2 changes: 1 addition & 1 deletion tests/Kafka/Consumer/ConsumerRecordTraverseSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ testValue = "some-value"

testRecord :: ConsumerRecord Text Text
testRecord = ConsumerRecord
{ crTopic = TopicName "some-topic"
{ crTopic = "some-topic"
, crPartition = PartitionId 0
, crOffset = Offset 5
, crTimestamp = NoTimestamp
Expand Down