Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update hw-kafka-client #172

Merged
merged 4 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion freckle-app.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ library
, http-conduit >=2.3.5
, http-link-header
, http-types
, hw-kafka-client <5.0.0
, hw-kafka-client
, immortal
, lens
, memcache
Expand Down
44 changes: 24 additions & 20 deletions library/Freckle/App/Kafka/Producer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ module Freckle.App.Kafka.Producer
import Freckle.App.Prelude

import Blammo.Logging
import Conduit ((.|))
import qualified Conduit
import Control.Lens (Lens', view)
import Data.Aeson (ToJSON, encode)
import Data.ByteString.Lazy (toStrict)
Expand Down Expand Up @@ -91,13 +93,13 @@ createKafkaProducerPool
-> KafkaProducerPoolConfig
-> IO (Pool KafkaProducer)
createKafkaProducerPool addresses KafkaProducerPoolConfig {..} =
Pool.newPool $
Pool.setNumStripes (Just kafkaProducerPoolConfigStripes) $
Pool.defaultPoolConfig
mkProducer
closeProducer
(realToFrac kafkaProducerPoolConfigIdleTimeout)
kafkaProducerPoolConfigSize
Pool.newPool
$ Pool.setNumStripes (Just kafkaProducerPoolConfigStripes)
$ Pool.defaultPoolConfig
mkProducer
closeProducer
(realToFrac kafkaProducerPoolConfigIdleTimeout)
kafkaProducerPoolConfigSize
chris-martin marked this conversation as resolved.
Show resolved Hide resolved
where
mkProducer =
either
Expand All @@ -123,25 +125,27 @@ produceKeyedOn prTopic values keyF = traced $ do
view kafkaProducerPoolL >>= \case
NullKafkaProducerPool -> pure ()
KafkaProducerPool producerPool -> do
errors <-
liftIO $
Pool.withResource producerPool $ \producer ->
produceMessageBatch producer $
toList $
mkProducerRecord <$> values
unless (null errors) $
logErrorNS "kafka" $
"Failed to send events" :# ["errors" .= fmap (tshow . snd) errors]
errors <- liftIO $ Pool.withResource producerPool $ \producer ->
Conduit.runConduit
chris-martin marked this conversation as resolved.
Show resolved Hide resolved
$ Conduit.yieldMany values
.| Conduit.awaitForever
( \value -> do
mError <- liftIO $ produceMessage producer $ mkProducerRecord value
for_ @Maybe mError Conduit.yield
)
.| Conduit.sinkList
unless (null errors)
$ logErrorNS "kafka"
$ "Failed to send events"
:# ["errors" .= fmap tshow errors]
where
mkProducerRecord value =
ProducerRecord
{ prTopic
, prPartition = UnassignedPartition
, prKey = Just $ toStrict $ encode $ keyF value
, prValue =
Just $
toStrict $
encode value
, prValue = Just $ toStrict $ encode value
, prHeaders = mempty
}

traced =
Expand Down
2 changes: 1 addition & 1 deletion package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ library:
- http-conduit >= 2.3.5 # addToRequestQueryString
- http-link-header
- http-types
- hw-kafka-client < 5.0.0
- hw-kafka-client
- immortal
- lens
- memcache
Expand Down
1 change: 1 addition & 0 deletions stack-lts-20.26.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ extra-deps:
- hs-opentelemetry-instrumentation-persistent-0.1.0.0
- hs-opentelemetry-instrumentation-wai-0.1.0.0
- hs-opentelemetry-sdk-0.0.3.6
- hw-kafka-client-5.0.0
- resource-pool-0.4.0.0

# for hs-opentelemetry-sdk
Expand Down
21 changes: 21 additions & 0 deletions stack-lts-20.26.yaml.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,27 @@
# https://docs.haskellstack.org/en/stable/lock_files

packages:
- completed:
hackage: Blammo-1.2.0.0@sha256:727b02bff2ef0363501778bfc01a13d4c7b2912843b9e1a2538081f0b44484d4,4836
pantry-tree:
sha256: 35a51b2053f5b56a16458e8316bb1cd5d13490eab87b0ba30cd1dbbbdef99c0c
size: 1724
original:
hackage: Blammo-1.2.0.0
- completed:
hackage: bugsnag-1.1.0.0@sha256:9723af13b09e7aed7e5855fcbcd7f89a904df80324f21d32480f01aabd8d035b,4565
pantry-tree:
sha256: 1c23c30271a5b136a165f817c6db2828b95b5d37399fe052f02d80049a464575
size: 1664
original:
hackage: bugsnag-1.1.0.0
- completed:
hackage: fast-logger-3.2.3@sha256:41b4f1c07d5ee4a7cc785689eb7772554d29ddbbcced3cc184fe50fc63ece3f7,2176
pantry-tree:
sha256: c4a8dcfa5f5bc3bd77cfe86d904e96f90607adc1e4f3f1cf082e722673ee7230
size: 1302
original:
hackage: fast-logger-3.2.3
- completed:
hackage: monad-validate-1.3.0.0@sha256:eb6ddd5c9cf72ff0563cba604fa00291376e96138fdb4932d00ff3a99d66706e,2605
pantry-tree:
Expand Down Expand Up @@ -46,6 +60,13 @@ packages:
size: 1430
original:
hackage: hs-opentelemetry-sdk-0.0.3.6
- completed:
hackage: hw-kafka-client-5.0.0@sha256:99ce4899a39e3be37acf013a2f8f12761e868bf9777c0135547fcfb4c6fbf882,4861
pantry-tree:
sha256: d87af59eabad9c90fdf2d778740374911dd3b15b767c8e9fe5ba33244f8952f7
size: 2056
original:
hackage: hw-kafka-client-5.0.0
- completed:
hackage: resource-pool-0.4.0.0@sha256:9c1e448a159875e21a7e68697feee2b61a4e584720974fa465a2fa1bc0776c73,1342
pantry-tree:
Expand Down
1 change: 1 addition & 0 deletions stack-lts-21.25.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ extra-deps:
- hs-opentelemetry-instrumentation-wai-0.1.0.0
- hs-opentelemetry-propagator-datadog-0.0.0.0
- hs-opentelemetry-sdk-0.0.3.6
- hw-kafka-client-5.0.0

# for hs-opentelemetry-sdk
- hs-opentelemetry-exporter-otlp-0.0.1.5
Expand Down
7 changes: 7 additions & 0 deletions stack-lts-21.25.yaml.lock
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ packages:
size: 1430
original:
hackage: hs-opentelemetry-sdk-0.0.3.6
- completed:
hackage: hw-kafka-client-5.0.0@sha256:99ce4899a39e3be37acf013a2f8f12761e868bf9777c0135547fcfb4c6fbf882,4861
pantry-tree:
sha256: d87af59eabad9c90fdf2d778740374911dd3b15b767c8e9fe5ba33244f8952f7
size: 2056
original:
hackage: hw-kafka-client-5.0.0
- completed:
hackage: hs-opentelemetry-exporter-otlp-0.0.1.5@sha256:89b0a6481096a338fa6383fbdf08ccaa0eb7bb009c4cbb340894eac33e55c5de,2214
pantry-tree:
Expand Down
4 changes: 0 additions & 4 deletions stack-nightly.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ extra-deps:
- thread-utils-context-0.3.0.4
- thread-utils-finalizers-0.1.1.0

# We're not ready to take on the breaking changes in v5
# @sled --exclude hw-kafka-client
- hw-kafka-client-4.0.3

# These are me just adding what Stack told me to
- Cabal-3.10.2.1
- Cabal-syntax-3.10.2.0
Expand Down
21 changes: 14 additions & 7 deletions stack-nightly.yaml.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,27 @@
# https://docs.haskellstack.org/en/stable/lock_files

packages:
- completed:
hackage: Blammo-1.2.1.0@sha256:d319b91109e14762f06c340bbf955efcfccd6cc853a182e4c384113e236ddcb5,4836
pantry-tree:
sha256: ea00c0835cbbcfa749803647d2c0415d1991d2bd325ecb13d8de4186b6dd2c4f
size: 1725
original:
hackage: Blammo-1.2.1.0
- completed:
hackage: bcp47-0.2.0.6@sha256:9071d1f97ef249ae62e4554e3cba892cd6059ac263271fd72635157c83743a30,2949
pantry-tree:
sha256: 3cd17d04bc9d13c8ba7e8e390973fce7d79003b18379533c8ce702f9ad3f82b3
size: 1498
original:
hackage: bcp47-0.2.0.6
- completed:
hackage: fast-logger-3.2.3@sha256:41b4f1c07d5ee4a7cc785689eb7772554d29ddbbcced3cc184fe50fc63ece3f7,2176
pantry-tree:
sha256: c4a8dcfa5f5bc3bd77cfe86d904e96f90607adc1e4f3f1cf082e722673ee7230
size: 1302
original:
hackage: fast-logger-3.2.3
- completed:
hackage: monad-validate-1.3.0.0@sha256:eb6ddd5c9cf72ff0563cba604fa00291376e96138fdb4932d00ff3a99d66706e,2605
pantry-tree:
Expand Down Expand Up @@ -95,13 +109,6 @@ packages:
size: 400
original:
hackage: thread-utils-finalizers-0.1.1.0
- completed:
hackage: hw-kafka-client-4.0.3@sha256:20e3614454d5afd2a6acfc0709b9bf00bd6ee9a9bc646c2e2e4ccec1d16831a1,4861
pantry-tree:
sha256: 709ea6818b3b3beb8eed6f79ea76b8dc13cce9ae165870c17986d5de2d695447
size: 2057
original:
hackage: hw-kafka-client-4.0.3
- completed:
hackage: Cabal-3.10.2.1@sha256:0f7cc73c7c0c18464ce249c97267a5188d796690a926d73b6e084a4612a66e32,12693
pantry-tree:
Expand Down
4 changes: 0 additions & 4 deletions stack.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ extra-deps:
# for thread-utils-context
- thread-utils-finalizers-0.1.1.0

# We're not ready to take on the breaking changes in v5
# @sled --exclude hw-kafka-client
- hw-kafka-client-4.0.3

allow-newer: true
allow-newer-deps:
- hs-opentelemetry-propagator-datadog
Loading