Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Patch rdkafka #101

Draft
wants to merge 6 commits into
base: trunk
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions nix/mk-shell.nix
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
{ pkgs, haskellPackages }:
{ pkgs, haskellPackages, rdkafka }:

# Fix from https://github.com/srid/haskell-template
let
workaround140774 = hpkg:
with pkgs.haskell.lib;
overrideCabal hpkg (drv: { enableSeparateBinOutput = false; });
# It is still necessary to run `hpack --force` into packages home dirs
haskell-language-server = pkgs.haskellPackages.haskell-language-server.override {
hls-ormolu-plugin = pkgs.haskellPackages.hls-ormolu-plugin.override {
ormolu = (workaround140774 pkgs.haskellPackages.ormolu);
haskell-language-server =
pkgs.haskellPackages.haskell-language-server.override {
hls-ormolu-plugin = pkgs.haskellPackages.hls-ormolu-plugin.override {
ormolu = (workaround140774 pkgs.haskellPackages.ormolu);
};
};
};

in pkgs.mkShell {
buildInputs = [
Expand All @@ -36,7 +37,8 @@ in pkgs.mkShell {
hostname
http-client
http-client-tls
hw-kafka-client
(pkgs.haskell.lib.overrideCabal hw-kafka-client
(orig: { librarySystemDepends = [ rdkafka ]; }))
io-streams
junit-xml
microlens
Expand Down
12 changes: 12 additions & 0 deletions nix/sources.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,17 @@
"type": "tarball",
"url": "https://github.com/NixOS/nixpkgs/archive/4c0834929cafb7478a5e82616d484578a80a3e41.tar.gz",
"url_template": "https://github.com/<owner>/<repo>/archive/<rev>.tar.gz"
},
"rdkafka": {
"branch": "master",
"description": "The Apache Kafka C/C++ library",
"homepage": "",
"owner": "confluentinc",
"repo": "librdkafka",
"rev": "a6d85bdbc1023b1a5477b8befe516242c3e182f6",
"sha256": "0ybyk5jsfpym5wjkqvz7amv746ahcn73rahv4zi40ipv7yk0fwbr",
"type": "tarball",
"url": "https://github.com/confluentinc/librdkafka/archive/a6d85bdbc1023b1a5477b8befe516242c3e182f6.tar.gz",
"url_template": "https://github.com/<owner>/<repo>/archive/<rev>.tar.gz"
}
}
51 changes: 19 additions & 32 deletions nri-kafka/scripts/pause-resume-bug/Consumer.hs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
module Consumer where

import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.MVar (MVar, newEmptyMVar, newMVar, putMVar, tryTakeMVar, withMVar)
import Control.Concurrent.MVar (MVar, newMVar, withMVar)
import Control.Monad (void)
import qualified Environment
import qualified Kafka.Worker as Kafka
import Message
import System.Environment (setEnv)
import System.IO (Handle, hPutStrLn, stderr, stdout)
import Prelude (IO, String, show)
import System.Environment (getEnv, setEnv)
import System.IO (Handle, hPutStrLn, stdout)
import Prelude (IO, String, mod, show, fromIntegral, pure)

main :: IO ()
main = do
Expand All @@ -17,42 +17,20 @@ main = do
setEnv "LOG_FILE" "/dev/null"

-- Reduce buffer and batch sizes to make it fail faster
setEnv "KAFKA_MAX_MSGS_PER_PARTITION_BUFFERED_LOCALLY" "20"
setEnv "KAFKA_POLL_BATCH_SIZE" "5"
setEnv "KAFKA_MAX_MSGS_PER_PARTITION_BUFFERED_LOCALLY" "1"
setEnv "KAFKA_POLL_BATCH_SIZE" "1"

settings <- Environment.decode Kafka.decoder
doAnythingHandler <- Platform.doAnythingHandler
lastId <- newEmptyMVar

lock <- newMVar ()

let processMsg (msg :: Message) =
( do
let msgId = ("ID(" ++ show (id msg) ++ ")")
prevId <- tryTakeMVar lastId

case (prevId, id msg) of
(Nothing, _) ->
printAtomic lock stdout (msgId ++ " First message has been received")
(_, 1) ->
printAtomic lock stdout (msgId ++ " Producer has been restarted")
(Just prev, curr)
| prev + 1 == curr ->
-- This is the expected behavior
printAtomic lock stdout (msgId ++ " OK")
(Just prev, curr) ->
-- This is the bug
printAtomic
lock
stderr
( "ERROR: Expected ID "
++ show (prev + 1)
++ " but got "
++ show curr
)

putMVar lastId (id msg)
threadDelay 200000
let msgId = id msg
let msgIdStr = "ID(" ++ show msgId ++ ")"
printAtomic lock stdout ("✅ " ++ msgIdStr ++ " Done")
threadDelay (10 * 1000000)
)
|> fmap Ok
|> Platform.doAnything doAnythingHandler
Expand All @@ -66,3 +44,12 @@ printAtomic lock handle msg = do
|> withMVar lock
|> forkIO
|> void

readIntEnvVar :: String -> Int -> IO Int
readIntEnvVar name defaultVal = do
valueStr <- getEnv name
valueStr
|> Text.fromList
|> Text.toInt
|> Maybe.withDefault defaultVal
|> pure
16 changes: 14 additions & 2 deletions nri-kafka/src/Kafka/Worker/Fetcher.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import qualified Kafka.Worker.Analytics as Analytics
import qualified Kafka.Worker.Partition as Partition
import qualified Kafka.Worker.Settings as Settings
import qualified Prelude
import qualified Data.Either

type EnqueueRecord = (ConsumerRecord -> Prelude.IO Partition.SeekCmd)

Expand Down Expand Up @@ -76,8 +77,12 @@ pollingLoop'
-- See https://github.com/confluentinc/librdkafka/blob/c282ba2423b2694052393c8edb0399a5ef471b3f/CHANGELOG.md?plain=1#L90-L95
--
-- We have a small app to reproduce the bug. Check out scripts/pause-resume-bug/README.md
MVar.withMVar consumerLock
<| \_ -> Consumer.pollMessageBatch consumer pollingTimeout pollBatchSize
MVar.withMVar consumerLock <| \_ -> do
Prelude.putStrLn "Polling for messages..."
em <- Consumer.pollMessageBatch consumer pollingTimeout pollBatchSize
let digits = List.filterMap recordContents em |> ByteString.intercalate ", "
Prelude.putStrLn <| "Polling done. Found messages: " ++ Prelude.show digits
Prelude.pure em
msgs <- Prelude.traverse handleKafkaError eitherMsgs
assignment <-
Consumer.assignment consumer
Expand All @@ -103,6 +108,13 @@ pollingLoop'
throttle maxMsgsPerSecondPerPartition maxPollIntervalMs (List.length appendResults) analytics now lastPollTimestamp
pollingLoop' settings enqueueRecord analytics consumer consumerLock (pollTimeIsOld now)

recordContents :: Data.Either.Either x ConsumerRecord -> Maybe ByteString.ByteString
recordContents (Data.Either.Left _) = Nothing
recordContents (Data.Either.Right record) = do
val <- Consumer.crValue record
let digits = ByteString.filter (\c -> c >= 48 && c <= 57) val
Just digits

getPartitionKey :: Consumer.ConsumerRecord k v -> (Consumer.TopicName, Consumer.PartitionId)
getPartitionKey record =
( Consumer.crTopic record,
Expand Down
30 changes: 29 additions & 1 deletion shell-ghc-8-10.nix
Original file line number Diff line number Diff line change
@@ -1,7 +1,34 @@
let
sources = import ./nix/sources.nix { };

pkgs = import sources.nixpkgs { };
pkgs = import sources.nixpkgs {
ovelays = [
(final: prev:
{

})
];
};
# rdkafka = pkgs.rdkafka.overrideAttrs (old: { src = sources.rdkafka; });
rdkafka = pkgs.stdenv.mkDerivation rec {
pname = "rdkafka";
version = "2.2.0";

# git clone https://github.com/confluentinc/librdkafka ../librdkafka
src = ../librdkafka;

nativeBuildInputs = with pkgs; [ pkg-config python3 which ];

buildInputs = with pkgs; [ zlib zstd openssl ];

env.NIX_CFLAGS_COMPILE = "-Wno-error=strict-overflow";

postPatch = ''
patchShebangs .
'';

enableParallelBuilding = true;
};
in import nix/mk-shell.nix {
pkgs = pkgs;
haskellPackages = pkgs.haskell.packages.ghc8107.extend (self: super: {
Expand All @@ -12,4 +39,5 @@ in import nix/mk-shell.nix {
# todo: resolve breaking changes in brick >= 0.72
brick = self.callHackage "brick" "0.71.1" { };
});
rdkafka = rdkafka;
}