From aa678ee3b3a593ad01106392c422fd74b7e67207 Mon Sep 17 00:00:00 2001 From: Brian Carroll Date: Wed, 21 Feb 2024 10:51:25 +0000 Subject: [PATCH 1/6] Modify pause-resume bug example to be more like the kafka fire PHX-1253 --- .../scripts/pause-resume-bug/Consumer.hs | 58 +++++++++---------- 1 file changed, 27 insertions(+), 31 deletions(-) diff --git a/nri-kafka/scripts/pause-resume-bug/Consumer.hs b/nri-kafka/scripts/pause-resume-bug/Consumer.hs index 96a45ad6..f8900204 100644 --- a/nri-kafka/scripts/pause-resume-bug/Consumer.hs +++ b/nri-kafka/scripts/pause-resume-bug/Consumer.hs @@ -1,14 +1,14 @@ module Consumer where import Control.Concurrent (forkIO, threadDelay) -import Control.Concurrent.MVar (MVar, newEmptyMVar, newMVar, putMVar, tryTakeMVar, withMVar) -import Control.Monad (void) +import Control.Concurrent.MVar (MVar, newMVar, withMVar) +import Control.Monad (void, when) import qualified Environment import qualified Kafka.Worker as Kafka import Message -import System.Environment (setEnv) -import System.IO (Handle, hPutStrLn, stderr, stdout) -import Prelude (IO, String, show) +import System.Environment (getEnv, setEnv) +import System.IO (Handle, hPutStrLn, stdout) +import Prelude (IO, String, mod, show, fromIntegral, pure) main :: IO () main = do @@ -20,39 +20,26 @@ main = do setEnv "KAFKA_MAX_MSGS_PER_PARTITION_BUFFERED_LOCALLY" "20" setEnv "KAFKA_POLL_BATCH_SIZE" "5" + fireDelay <- readIntEnvVar "FIRE_DELAY" + fireModulo <- readIntEnvVar "FIRE_MODULO" + settings <- Environment.decode Kafka.decoder doAnythingHandler <- Platform.doAnythingHandler - lastId <- newEmptyMVar lock <- newMVar () let processMsg (msg :: Message) = ( do - let msgId = ("ID(" ++ show (id msg) ++ ")") - prevId <- tryTakeMVar lastId - - case (prevId, id msg) of - (Nothing, _) -> - printAtomic lock stdout (msgId ++ " First message has been received") - (_, 1) -> - printAtomic lock stdout (msgId ++ " Producer has been restarted") - (Just prev, curr) - | prev + 1 == curr -> - -- This is the expected behavior - printAtomic lock stdout (msgId ++ " OK") - (Just prev, curr) -> - -- This is the bug - printAtomic - lock - stderr - ( "ERROR: Expected ID " - ++ show (prev + 1) - ++ " but got " - ++ show curr - ) - - putMVar lastId (id msg) - threadDelay 200000 + let msgId = id msg + let msgIdStr = "ID(" ++ show msgId ++ ")" + when + (msgId `mod` fireModulo == 0) + ( do + printAtomic lock stdout (msgIdStr ++ " Pausing consumer (simulating stuck MySQL)") + threadDelay (fromIntegral fireDelay * 1000000) + ) + printAtomic lock stdout (msgIdStr ++ " Done") + threadDelay 2000 ) |> fmap Ok |> Platform.doAnything doAnythingHandler @@ -66,3 +53,12 @@ printAtomic lock handle msg = do |> withMVar lock |> forkIO |> void + +readIntEnvVar :: String -> IO Int +readIntEnvVar name = do + valueStr <- getEnv name + valueStr + |> Text.fromList + |> Text.toInt + |> Maybe.withDefault (Debug.todo (Text.fromList name ++ " must be a number")) + |> pure From a597b359d6aae53ffa6ecfb02e4c11ee4d94d570 Mon Sep 17 00:00:00 2001 From: Brian Carroll Date: Wed, 21 Feb 2024 15:10:37 +0000 Subject: [PATCH 2/6] Print a message on polling --- nri-kafka/src/Kafka/Worker/Fetcher.hs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/nri-kafka/src/Kafka/Worker/Fetcher.hs b/nri-kafka/src/Kafka/Worker/Fetcher.hs index 273b6b99..1623aceb 100644 --- a/nri-kafka/src/Kafka/Worker/Fetcher.hs +++ b/nri-kafka/src/Kafka/Worker/Fetcher.hs @@ -11,6 +11,7 @@ import qualified Kafka.Worker.Analytics as Analytics import qualified Kafka.Worker.Partition as Partition import qualified Kafka.Worker.Settings as Settings import qualified Prelude +import qualified Data.Either type EnqueueRecord = (ConsumerRecord -> Prelude.IO Partition.SeekCmd) @@ -76,8 +77,12 @@ pollingLoop' -- See https://github.com/confluentinc/librdkafka/blob/c282ba2423b2694052393c8edb0399a5ef471b3f/CHANGELOG.md?plain=1#L90-L95 -- -- We have a small app to reproduce the bug. Check out scripts/pause-resume-bug/README.md - MVar.withMVar consumerLock - <| \_ -> Consumer.pollMessageBatch consumer pollingTimeout pollBatchSize + MVar.withMVar consumerLock <| \_ -> do + Prelude.putStrLn "Polling for messages..." + em <- Consumer.pollMessageBatch consumer pollingTimeout pollBatchSize + let digits = List.filterMap recordContents em |> ByteString.intercalate ", " + Prelude.putStrLn <| "Polling done. Found messages: " ++ Prelude.show digits + Prelude.pure em msgs <- Prelude.traverse handleKafkaError eitherMsgs assignment <- Consumer.assignment consumer @@ -103,6 +108,13 @@ pollingLoop' throttle maxMsgsPerSecondPerPartition maxPollIntervalMs (List.length appendResults) analytics now lastPollTimestamp pollingLoop' settings enqueueRecord analytics consumer consumerLock (pollTimeIsOld now) +recordContents :: Data.Either.Either x ConsumerRecord -> Maybe ByteString.ByteString +recordContents (Data.Either.Left _) = Nothing +recordContents (Data.Either.Right record) = do + val <- Consumer.crValue record + let digits = ByteString.filter (\c -> c >= 48 && c <= 57) val + Just digits + getPartitionKey :: Consumer.ConsumerRecord k v -> (Consumer.TopicName, Consumer.PartitionId) getPartitionKey record = ( Consumer.crTopic record, From 13b4c5d1a439101f4512b4445c79b20a3f3966b7 Mon Sep 17 00:00:00 2001 From: Brian Carroll Date: Wed, 21 Feb 2024 15:13:55 +0000 Subject: [PATCH 3/6] Set default values for fire env vars --- nri-kafka/scripts/pause-resume-bug/Consumer.hs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/nri-kafka/scripts/pause-resume-bug/Consumer.hs b/nri-kafka/scripts/pause-resume-bug/Consumer.hs index f8900204..02929822 100644 --- a/nri-kafka/scripts/pause-resume-bug/Consumer.hs +++ b/nri-kafka/scripts/pause-resume-bug/Consumer.hs @@ -20,8 +20,8 @@ main = do setEnv "KAFKA_MAX_MSGS_PER_PARTITION_BUFFERED_LOCALLY" "20" setEnv "KAFKA_POLL_BATCH_SIZE" "5" - fireDelay <- readIntEnvVar "FIRE_DELAY" - fireModulo <- readIntEnvVar "FIRE_MODULO" + fireDelay <- readIntEnvVar "FIRE_DELAY" 31 -- seconds + fireModulo <- readIntEnvVar "FIRE_MODULO" 5 -- sleep on every Nth message settings <- Environment.decode Kafka.decoder doAnythingHandler <- Platform.doAnythingHandler @@ -54,11 +54,11 @@ printAtomic lock handle msg = do |> forkIO |> void -readIntEnvVar :: String -> IO Int -readIntEnvVar name = do +readIntEnvVar :: String -> Int -> IO Int +readIntEnvVar name defaultVal = do valueStr <- getEnv name valueStr |> Text.fromList |> Text.toInt - |> Maybe.withDefault (Debug.todo (Text.fromList name ++ " must be a number")) + |> Maybe.withDefault defaultVal |> pure From 82ac691eec8c0f601e53320740ab6b821c95e9eb Mon Sep 17 00:00:00 2001 From: Juliano Solanho Date: Mon, 11 Mar 2024 12:39:13 -0700 Subject: [PATCH 4/6] failed attempt at patching rdkafka --- shell-ghc-8-10.nix | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/shell-ghc-8-10.nix b/shell-ghc-8-10.nix index 67beb07b..45339ef9 100644 --- a/shell-ghc-8-10.nix +++ b/shell-ghc-8-10.nix @@ -1,7 +1,31 @@ let sources = import ./nix/sources.nix { }; - pkgs = import sources.nixpkgs { }; + pkgs = import sources.nixpkgs { + ovelays = [ + (self: super: { + rdkafka = self.stdenv.mkDerivation rec { + pname = "rdkafka"; + version = "2.2.0"; + + # git clone https://github.com/confluentinc/librdkafka ../librdkafka + src = ../librdkafka; + + nativeBuildInputs = with self.pkgs; [ pkg-config python3 which ]; + + buildInputs = with self.pkgs; [ zlib zstd openssl ]; + + env.NIX_CFLAGS_COMPILE = "-Wno-error=strict-overflow"; + + postPatch = '' + patchShebangs . + ''; + + enableParallelBuilding = true; + }; + }) + ]; + }; in import nix/mk-shell.nix { pkgs = pkgs; haskellPackages = pkgs.haskell.packages.ghc8107.extend (self: super: { From 1c5a367de97f616f1b8c8d5b806c10a4f9dc2f46 Mon Sep 17 00:00:00 2001 From: Juliano Solanho Date: Tue, 12 Mar 2024 07:53:13 -0700 Subject: [PATCH 5/6] successful attempt at patching rdkafka --- nix/mk-shell.nix | 14 ++++++++------ nix/sources.json | 12 ++++++++++++ shell-ghc-8-10.nix | 25 +++++-------------------- 3 files changed, 25 insertions(+), 26 deletions(-) diff --git a/nix/mk-shell.nix b/nix/mk-shell.nix index e351fb05..fcb69430 100644 --- a/nix/mk-shell.nix +++ b/nix/mk-shell.nix @@ -1,4 +1,4 @@ -{ pkgs, haskellPackages }: +{ pkgs, haskellPackages, rdkafka }: # Fix from https://github.com/srid/haskell-template let @@ -6,11 +6,12 @@ let with pkgs.haskell.lib; overrideCabal hpkg (drv: { enableSeparateBinOutput = false; }); # It is still necessary to run `hpack --force` into packages home dirs - haskell-language-server = pkgs.haskellPackages.haskell-language-server.override { - hls-ormolu-plugin = pkgs.haskellPackages.hls-ormolu-plugin.override { - ormolu = (workaround140774 pkgs.haskellPackages.ormolu); + haskell-language-server = + pkgs.haskellPackages.haskell-language-server.override { + hls-ormolu-plugin = pkgs.haskellPackages.hls-ormolu-plugin.override { + ormolu = (workaround140774 pkgs.haskellPackages.ormolu); + }; }; - }; in pkgs.mkShell { buildInputs = [ @@ -36,7 +37,8 @@ in pkgs.mkShell { hostname http-client http-client-tls - hw-kafka-client + (pkgs.haskell.lib.overrideCabal hw-kafka-client + (orig: { librarySystemDepends = [ rdkafka ]; })) io-streams junit-xml microlens diff --git a/nix/sources.json b/nix/sources.json index d6f72230..0ac5e776 100644 --- a/nix/sources.json +++ b/nix/sources.json @@ -34,5 +34,17 @@ "type": "tarball", "url": "https://github.com/NixOS/nixpkgs/archive/4c0834929cafb7478a5e82616d484578a80a3e41.tar.gz", "url_template": "https://github.com///archive/.tar.gz" + }, + "rdkafka": { + "branch": "master", + "description": "The Apache Kafka C/C++ library", + "homepage": "", + "owner": "confluentinc", + "repo": "librdkafka", + "rev": "a6d85bdbc1023b1a5477b8befe516242c3e182f6", + "sha256": "0ybyk5jsfpym5wjkqvz7amv746ahcn73rahv4zi40ipv7yk0fwbr", + "type": "tarball", + "url": "https://github.com/confluentinc/librdkafka/archive/a6d85bdbc1023b1a5477b8befe516242c3e182f6.tar.gz", + "url_template": "https://github.com///archive/.tar.gz" } } diff --git a/shell-ghc-8-10.nix b/shell-ghc-8-10.nix index 45339ef9..603d5211 100644 --- a/shell-ghc-8-10.nix +++ b/shell-ghc-8-10.nix @@ -3,29 +3,13 @@ let pkgs = import sources.nixpkgs { ovelays = [ - (self: super: { - rdkafka = self.stdenv.mkDerivation rec { - pname = "rdkafka"; - version = "2.2.0"; + (final: prev: + { - # git clone https://github.com/confluentinc/librdkafka ../librdkafka - src = ../librdkafka; - - nativeBuildInputs = with self.pkgs; [ pkg-config python3 which ]; - - buildInputs = with self.pkgs; [ zlib zstd openssl ]; - - env.NIX_CFLAGS_COMPILE = "-Wno-error=strict-overflow"; - - postPatch = '' - patchShebangs . - ''; - - enableParallelBuilding = true; - }; - }) + }) ]; }; + rdkafka = pkgs.rdkafka.overrideAttrs (old: { src = sources.rdkafka; }); in import nix/mk-shell.nix { pkgs = pkgs; haskellPackages = pkgs.haskell.packages.ghc8107.extend (self: super: { @@ -36,4 +20,5 @@ in import nix/mk-shell.nix { # todo: resolve breaking changes in brick >= 0.72 brick = self.callHackage "brick" "0.71.1" { }; }); + rdkafka = rdkafka; } From 3cdc411bc47436c7f9176177e8333b6994124896 Mon Sep 17 00:00:00 2001 From: Juliano Solanho Date: Tue, 12 Mar 2024 17:40:25 -0700 Subject: [PATCH 6/6] Point straight to the local dir I was relying on NIV_OVERRIDE_rdkafka=../librdkafka before, but it's easy to forget to do that --- .../scripts/pause-resume-bug/Consumer.hs | 19 +++++------------ shell-ghc-8-10.nix | 21 ++++++++++++++++++- 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/nri-kafka/scripts/pause-resume-bug/Consumer.hs b/nri-kafka/scripts/pause-resume-bug/Consumer.hs index 02929822..b5ba4458 100644 --- a/nri-kafka/scripts/pause-resume-bug/Consumer.hs +++ b/nri-kafka/scripts/pause-resume-bug/Consumer.hs @@ -2,7 +2,7 @@ module Consumer where import Control.Concurrent (forkIO, threadDelay) import Control.Concurrent.MVar (MVar, newMVar, withMVar) -import Control.Monad (void, when) +import Control.Monad (void) import qualified Environment import qualified Kafka.Worker as Kafka import Message @@ -17,11 +17,8 @@ main = do setEnv "LOG_FILE" "/dev/null" -- Reduce buffer and batch sizes to make it fail faster - setEnv "KAFKA_MAX_MSGS_PER_PARTITION_BUFFERED_LOCALLY" "20" - setEnv "KAFKA_POLL_BATCH_SIZE" "5" - - fireDelay <- readIntEnvVar "FIRE_DELAY" 31 -- seconds - fireModulo <- readIntEnvVar "FIRE_MODULO" 5 -- sleep on every Nth message + setEnv "KAFKA_MAX_MSGS_PER_PARTITION_BUFFERED_LOCALLY" "1" + setEnv "KAFKA_POLL_BATCH_SIZE" "1" settings <- Environment.decode Kafka.decoder doAnythingHandler <- Platform.doAnythingHandler @@ -32,14 +29,8 @@ main = do ( do let msgId = id msg let msgIdStr = "ID(" ++ show msgId ++ ")" - when - (msgId `mod` fireModulo == 0) - ( do - printAtomic lock stdout (msgIdStr ++ " Pausing consumer (simulating stuck MySQL)") - threadDelay (fromIntegral fireDelay * 1000000) - ) - printAtomic lock stdout (msgIdStr ++ " Done") - threadDelay 2000 + printAtomic lock stdout ("✅ " ++ msgIdStr ++ " Done") + threadDelay (10 * 1000000) ) |> fmap Ok |> Platform.doAnything doAnythingHandler diff --git a/shell-ghc-8-10.nix b/shell-ghc-8-10.nix index 603d5211..80a4c727 100644 --- a/shell-ghc-8-10.nix +++ b/shell-ghc-8-10.nix @@ -9,7 +9,26 @@ let }) ]; }; - rdkafka = pkgs.rdkafka.overrideAttrs (old: { src = sources.rdkafka; }); + # rdkafka = pkgs.rdkafka.overrideAttrs (old: { src = sources.rdkafka; }); + rdkafka = pkgs.stdenv.mkDerivation rec { + pname = "rdkafka"; + version = "2.2.0"; + + # git clone https://github.com/confluentinc/librdkafka ../librdkafka + src = ../librdkafka; + + nativeBuildInputs = with pkgs; [ pkg-config python3 which ]; + + buildInputs = with pkgs; [ zlib zstd openssl ]; + + env.NIX_CFLAGS_COMPILE = "-Wno-error=strict-overflow"; + + postPatch = '' + patchShebangs . + ''; + + enableParallelBuilding = true; + }; in import nix/mk-shell.nix { pkgs = pkgs; haskellPackages = pkgs.haskell.packages.ghc8107.extend (self: super: {