Skip to content

Commit

Permalink
Merge pull request #174 from haskell-works/fully-upgrade-to-cabal-3.4…
Browse files Browse the repository at this point in the history
….0.0

Fully upgrade to cabal 3.4.0.0
  • Loading branch information
newhoggy authored Apr 19, 2021
2 parents f0d1d1b + 36381a0 commit f9760fc
Show file tree
Hide file tree
Showing 17 changed files with 52 additions and 50 deletions.
9 changes: 1 addition & 8 deletions .github/workflows/haskell.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,11 @@ jobs:
- name: Build librdkafka
run: ./scripts/build-librdkafka

- name: Select optimal cabal version
run: |
case "$OS" in
Windows_NT) echo "CABAL_VERSION=3.4.0.0-rc5" >> $GITHUB_ENV;;
*) echo "CABAL_VERSION=3.4.0.0" >> $GITHUB_ENV;;
esac
- uses: haskell/actions/setup@v1
id: setup-haskell
with:
ghc-version: ${{ matrix.ghc }}
cabal-version: ${{ env.CABAL_VERSION }}
cabal-version: 3.4.0.0

- name: Set some window specific things
if: matrix.os == 'windows-latest'
Expand Down
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ import Kafka.Consumer

-- Global consumer properties
consumerProps :: ConsumerProperties
consumerProps = brokersList [BrokerAddress "localhost:9092"]
<> groupId (ConsumerGroupId "consumer_example_group")
consumerProps = brokersList ["localhost:9092"]
<> groupId "consumer_example_group"
<> noAutoCommit
<> logLevel KafkaLogInfo

-- Subscription to topics
consumerSub :: Subscription
consumerSub = topics [TopicName "kafka-client-example-topic"]
consumerSub = topics ["kafka-client-example-topic"]
<> offsetReset Earliest

-- Running an example
Expand All @@ -58,7 +58,7 @@ runConsumerExample = do
where
mkConsumer = newConsumer consumerProps consumerSub
clConsumer (Left err) = return (Left err)
clConsumer (Right kc) = (maybe (Right ()) Left) <$> closeConsumer kc
clConsumer (Right kc) = maybe (Right ()) Left <$> closeConsumer kc
runHandler (Left err) = return (Left err)
runHandler (Right kc) = processMessages kc

Expand Down Expand Up @@ -97,7 +97,7 @@ never dropped from the queue:

```haskell
producerProps :: ProducerProperties
producerProps = brokersList [BrokerAddress "localhost:9092"]
producerProps = brokersList ["localhost:9092"]
<> sendTimeout (Timeout 0) -- for librdkafka "0" means "infinite" (see https://github.com/edenhill/librdkafka/issues/2015)
```

Expand All @@ -108,7 +108,7 @@ Currently `hw-kafka-client` only supports delivery error callbacks:

```haskell
producerProps :: ProducerProperties
producerProps = brokersList [BrokerAddress "localhost:9092"]
producerProps = brokersList ["localhost:9092"]
<> setCallback (deliveryCallback print)
```

Expand All @@ -126,12 +126,12 @@ import Kafka.Producer

-- Global producer properties
producerProps :: ProducerProperties
producerProps = brokersList [BrokerAddress "localhost:9092"]
producerProps = brokersList ["localhost:9092"]
<> logLevel KafkaLogDebug

-- Topic to send messages to
targetTopic :: TopicName
targetTopic = TopicName "kafka-client-example-topic"
targetTopic = "kafka-client-example-topic"

-- Run an example
runProducerExample :: IO ()
Expand Down
8 changes: 4 additions & 4 deletions example/ConsumerExample.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ import Data.Text (Text)

-- Global consumer properties
consumerProps :: ConsumerProperties
consumerProps = brokersList [BrokerAddress "localhost:9092"]
<> groupId (ConsumerGroupId "consumer_example_group")
consumerProps = brokersList ["localhost:9092"]
<> groupId "consumer_example_group"
<> noAutoCommit
<> setCallback (rebalanceCallback printingRebalanceCallback)
<> setCallback (offsetCommitCallback printingOffsetCallback)
<> logLevel KafkaLogInfo

-- Subscription to topics
consumerSub :: Subscription
consumerSub = topics [TopicName "kafka-client-example-topic"]
consumerSub = topics ["kafka-client-example-topic"]
<> offsetReset Earliest

-- Running an example
Expand All @@ -32,7 +32,7 @@ runConsumerExample = do
where
mkConsumer = newConsumer consumerProps consumerSub
clConsumer (Left err) = return (Left err)
clConsumer (Right kc) = (maybe (Right ()) Left) <$> closeConsumer kc
clConsumer (Right kc) = maybe (Right ()) Left <$> closeConsumer kc
runHandler (Left err) = return (Left err)
runHandler (Right kc) = processMessages kc

Expand Down
4 changes: 2 additions & 2 deletions example/ProducerExample.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ import Data.Text (Text)

-- Global producer properties
producerProps :: ProducerProperties
producerProps = brokersList [BrokerAddress "localhost:9092"]
producerProps = brokersList ["localhost:9092"]
<> sendTimeout (Timeout 10000)
<> setCallback (deliveryCallback print)
<> logLevel KafkaLogDebug

-- Topic to send messages to
targetTopic :: TopicName
targetTopic = TopicName "kafka-client-example-topic"
targetTopic = "kafka-client-example-topic"

mkMessage :: Maybe ByteString -> Maybe ByteString -> ProducerRecord
mkMessage k v = ProducerRecord
Expand Down
2 changes: 1 addition & 1 deletion src/Kafka/Consumer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
--
-- -- Global consumer properties
-- consumerProps :: 'ConsumerProperties'
-- consumerProps = 'brokersList' ['BrokerAddress' "localhost:9092"]
-- consumerProps = 'brokersList' ["localhost:9092"]
-- <> 'groupId' ('ConsumerGroupId' "consumer_example_group")
-- <> 'noAutoCommit'
-- <> 'logLevel' 'KafkaLogInfo'
Expand Down
2 changes: 1 addition & 1 deletion src/Kafka/Consumer/ConsumerProperties.hs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ instance Monoid ConsumerProperties where
-- | Set the <https://kafka.apache.org/documentation/#bootstrap.servers list of brokers> to contact to connect to the Kafka cluster.
brokersList :: [BrokerAddress] -> ConsumerProperties
brokersList bs =
let bs' = Text.intercalate "," ((\(BrokerAddress x) -> x) <$> bs)
let bs' = Text.intercalate "," (unBrokerAddress <$> bs)
in extraProps $ M.fromList [("bootstrap.servers", bs')]

-- | Set the <https://kafka.apache.org/documentation/#auto.commit.interval.ms auto commit interval> and enables <https://kafka.apache.org/documentation/#enable.auto.commit auto commit>.
Expand Down
10 changes: 7 additions & 3 deletions src/Kafka/Consumer/Types.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}

-----------------------------------------------------------------------------
-- |
Expand Down Expand Up @@ -37,6 +38,7 @@ import Data.Bifoldable (Bifoldable (..))
import Data.Bifunctor (Bifunctor (..))
import Data.Bitraversable (Bitraversable (..), bimapM, bisequence)
import Data.Int (Int64)
import Data.String (IsString)
import Data.Text (Text)
import Data.Typeable (Typeable)
import GHC.Generics (Generic)
Expand All @@ -62,7 +64,9 @@ instance HasKafkaConf KafkaConsumer where
-- | Consumer group ID. Different consumers with the same consumer group ID will get assigned different partitions of each subscribed topic.
--
-- See <https://kafka.apache.org/documentation/#group.id Kafka documentation on consumer group>
newtype ConsumerGroupId = ConsumerGroupId { unConsumerGroupId :: Text } deriving (Show, Ord, Eq, Generic)
newtype ConsumerGroupId = ConsumerGroupId
{ unConsumerGroupId :: Text
} deriving (Show, Ord, Eq, IsString, Generic)

-- | A message offset in a partition
newtype Offset = Offset { unOffset :: Int64 } deriving (Show, Eq, Ord, Read, Generic)
Expand Down
4 changes: 2 additions & 2 deletions src/Kafka/Internal/Shared.hs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ import Kafka.Types (KafkaError (..), Millis (..), Timeout

pollEvents :: HasKafka a => a -> Maybe Timeout -> IO ()
pollEvents a tm =
let timeout = maybe 0 (\(Timeout ms) -> ms) tm
(Kafka k) = getKafka a
let timeout = maybe 0 unTimeout tm
Kafka k = getKafka a
in void (rdKafkaPoll k timeout)

word8PtrToBS :: Int -> Word8Ptr -> IO BS.ByteString
Expand Down
6 changes: 3 additions & 3 deletions src/Kafka/Metadata.hs
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,8 @@ fromPartitionMetadataPtr pm = do
{ pmPartitionId = PartitionId (id'RdKafkaMetadataPartitionT pm)
, pmError = kafkaErrorToMaybe $ KafkaResponseError (err'RdKafkaMetadataPartitionT pm)
, pmLeader = BrokerId (leader'RdKafkaMetadataPartitionT pm)
, pmReplicas = (BrokerId . fromIntegral) <$> reps
, pmInSyncReplicas = (BrokerId . fromIntegral) <$> isrs
, pmReplicas = BrokerId . fromIntegral <$> reps
, pmInSyncReplicas = BrokerId . fromIntegral <$> isrs
}


Expand Down Expand Up @@ -296,4 +296,4 @@ groupStateFromKafkaString s = case s of
"Stable" -> GroupStable
"Dead" -> GroupDead
"Empty" -> GroupEmpty
_ -> error $ "Unknown group state: " <> (Text.unpack s)
_ -> error $ "Unknown group state: " <> Text.unpack s
2 changes: 1 addition & 1 deletion src/Kafka/Producer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
--
-- -- Global producer properties
-- producerProps :: 'ProducerProperties'
-- producerProps = 'brokersList' ['BrokerAddress' "localhost:9092"]
-- producerProps = 'brokersList' ["localhost:9092"]
-- <> 'logLevel' 'KafkaLogDebug'
--
-- -- Topic to send messages to
Expand Down
4 changes: 2 additions & 2 deletions src/Kafka/Producer/Convert.hs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ producePartitionCInt = fromIntegral . producePartitionInt
{-# INLINE producePartitionCInt #-}

handleProduceErr :: Int -> IO (Maybe KafkaError)
handleProduceErr (- 1) = (Just . kafkaRespErr) <$> getErrno
handleProduceErr (- 1) = Just . kafkaRespErr <$> getErrno
handleProduceErr 0 = return Nothing
handleProduceErr _ = return $ Just KafkaInvalidReturnValue
{-# INLINE handleProduceErr #-}

handleProduceErr' :: Int -> IO (Either KafkaError ())
handleProduceErr' (- 1) = (Left . kafkaRespErr) <$> getErrno
handleProduceErr' (- 1) = Left . kafkaRespErr <$> getErrno
handleProduceErr' 0 = return (Right ())
handleProduceErr' _ = return $ Left KafkaInvalidReturnValue
{-# INLINE handleProduceErr' #-}
2 changes: 1 addition & 1 deletion src/Kafka/Producer/ProducerProperties.hs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ instance Monoid ProducerProperties where
-- | Set the <https://kafka.apache.org/documentation/#bootstrap.servers list of brokers> to contact to connect to the Kafka cluster.
brokersList :: [BrokerAddress] -> ProducerProperties
brokersList bs =
let bs' = Text.intercalate "," ((\(BrokerAddress x) -> x) <$> bs)
let bs' = Text.intercalate "," (unBrokerAddress <$> bs)
in extraProps $ M.fromList [("bootstrap.servers", bs')]

-- | Set the producer callback.
Expand Down
15 changes: 10 additions & 5 deletions src/Kafka/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ where

import Control.Exception (Exception (..))
import Data.Int (Int64)
import Data.String (IsString)
import Data.Text (Text, isPrefixOf)
import Data.Typeable (Typeable)
import GHC.Generics (Generic)
Expand All @@ -46,7 +47,9 @@ newtype Millis = Millis { unMillis :: Int64 } deriving (Show, Read, Eq, Ord
-- | Client ID used by Kafka to better track requests
--
-- See <https://kafka.apache.org/documentation/#client.id Kafka documentation on client ID>
newtype ClientId = ClientId { unClientId :: Text } deriving (Show, Eq, Ord, Generic)
newtype ClientId = ClientId
{ unClientId :: Text
} deriving (Show, Eq, IsString, Ord, Generic)

-- | Batch size used for polling
newtype BatchSize = BatchSize { unBatchSize :: Int } deriving (Show, Read, Eq, Ord, Num, Generic)
Expand All @@ -63,9 +66,9 @@ data TopicType =
-- any topic name in the topics list that is prefixed with @^@ will
-- be regex-matched to the full list of topics in the cluster and matching
-- topics will be added to the subscription list.
newtype TopicName =
TopicName { unTopicName :: Text } -- ^ a simple topic name or a regex if started with @^@
deriving (Show, Eq, Ord, Read, Generic)
newtype TopicName = TopicName
{ unTopicName :: Text -- ^ a simple topic name or a regex if started with @^@
} deriving (Show, Eq, Ord, IsString, Read, Generic)

-- | Deduce the type of a topic from its name, by checking if it starts with a double underscore "\__"
topicType :: TopicName -> TopicType
Expand All @@ -74,7 +77,9 @@ topicType (TopicName tn) =
{-# INLINE topicType #-}

-- | Kafka broker address string (e.g. @broker1:9092@)
newtype BrokerAddress = BrokerAddress { unBrokerAddress :: Text } deriving (Show, Eq, Generic)
newtype BrokerAddress = BrokerAddress
{ unBrokerAddress :: Text
} deriving (Show, Eq, IsString, Generic)

-- | Timeout in milliseconds
newtype Timeout = Timeout { unTimeout :: Int } deriving (Show, Eq, Read, Generic)
Expand Down
10 changes: 5 additions & 5 deletions tests-it/Kafka/IntegrationSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ spec = do
var <- newEmptyMVar
let
msg = ProducerRecord
{ prTopic = TopicName "callback-topic"
{ prTopic = "callback-topic"
, prPartition = UnassignedPartition
, prKey = Nothing
, prValue = Just "test from producer"
Expand All @@ -136,7 +136,7 @@ spec = do
specWithConsumer "Run consumer with sync polling" (consumerProps <> groupId (makeGroupId "sync") <> callbackPollMode CallbackPollModeSync) runConsumerSpec

describe "Kafka.Consumer.BatchSpec" $ do
specWithConsumer "Batch consumer" (consumerProps <> groupId (ConsumerGroupId "batch-consumer")) $ do
specWithConsumer "Batch consumer" (consumerProps <> groupId "batch-consumer") $ do
it "should consume first batch" $ \k -> do
res <- pollMessageBatch k (Timeout 1000) (BatchSize 5)
length res `shouldBe` 5
Expand Down Expand Up @@ -232,8 +232,8 @@ runConsumerSpec = do
it "should return topic metadata" $ \k -> do
res <- topicMetadata k (Timeout 1000) testTopic
res `shouldSatisfy` isRight
(length . kmBrokers) <$> res `shouldBe` Right 1
(length . kmTopics) <$> res `shouldBe` Right 1
length . kmBrokers <$> res `shouldBe` Right 1
length . kmTopics <$> res `shouldBe` Right 1

it "should describe all consumer groups" $ \k -> do
res <- allConsumerGroupsInfo k (Timeout 1000)
Expand All @@ -248,7 +248,7 @@ runConsumerSpec = do
fmap giGroup <$> res `shouldBe` Right [testGroupId]

it "should describe non-existent consumer group" $ \k -> do
res <- consumerGroupInfo k (Timeout 1000) (ConsumerGroupId "does-not-exist")
res <- consumerGroupInfo k (Timeout 1000) "does-not-exist"
res `shouldBe` Right []

it "should read topic offsets for time" $ \k -> do
Expand Down
4 changes: 2 additions & 2 deletions tests-it/Kafka/TestEnv.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ testPrefix = unsafePerformIO $ take 10 . Rnd.randomRs ('a','z') <$> Rnd.newStdGe

brokerAddress :: BrokerAddress
brokerAddress = unsafePerformIO $
(BrokerAddress . Text.pack) <$> getEnv "KAFKA_TEST_BROKER" `catch` \(_ :: SomeException) -> return "localhost:9092"
BrokerAddress . Text.pack <$> getEnv "KAFKA_TEST_BROKER" `catch` \(_ :: SomeException) -> return "localhost:9092"
{-# NOINLINE brokerAddress #-}

testTopic :: TopicName
testTopic = unsafePerformIO $
(TopicName . Text.pack) <$> getEnv "KAFKA_TEST_TOPIC" `catch` \(_ :: SomeException) -> return $ testPrefix <> "-topic"
TopicName . Text.pack <$> getEnv "KAFKA_TEST_TOPIC" `catch` \(_ :: SomeException) -> return $ testPrefix <> "-topic"
{-# NOINLINE testTopic #-}

testGroupId :: ConsumerGroupId
Expand Down
2 changes: 1 addition & 1 deletion tests/Kafka/Consumer/ConsumerRecordMapSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ testValue = "some-value"

testRecord :: ConsumerRecord (Maybe Text) (Maybe Text)
testRecord = ConsumerRecord
{ crTopic = TopicName "some-topic"
{ crTopic = "some-topic"
, crPartition = PartitionId 0
, crOffset = Offset 5
, crTimestamp = NoTimestamp
Expand Down
2 changes: 1 addition & 1 deletion tests/Kafka/Consumer/ConsumerRecordTraverseSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ testValue = "some-value"

testRecord :: ConsumerRecord Text Text
testRecord = ConsumerRecord
{ crTopic = TopicName "some-topic"
{ crTopic = "some-topic"
, crPartition = PartitionId 0
, crOffset = Offset 5
, crTimestamp = NoTimestamp
Expand Down

0 comments on commit f9760fc

Please sign in to comment.