From 1659026c19173930f0c69ae7bc1d9f832b633afc Mon Sep 17 00:00:00 2001 From: v0idpwn Date: Fri, 3 Sep 2021 22:27:13 +0300 Subject: [PATCH 1/6] handle timeouts in client calls brod_client safe_gen_call implementation leaked timeouts --- src/brod_client.erl | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/brod_client.erl b/src/brod_client.erl index 04245ff7..e74f969d 100644 --- a/src/brod_client.erl +++ b/src/brod_client.erl @@ -829,16 +829,20 @@ ensure_partition_workers(TopicName, State, F) -> end end). -%% Catch noproc exit exception when making gen_server:call. +%% Catches noproc and timeout exit exceptions when making gen_server:call. -spec safe_gen_call(pid() | atom(), Call, Timeout) -> Return when Call :: term(), Timeout :: infinity | integer(), - Return :: ok | {ok, term()} | {error, client_down | term()}. + Return :: ok | {ok, term()} | {error, client_down + | client_timeout | term()}. safe_gen_call(Server, Call, Timeout) -> try gen_server:call(Server, Call, Timeout) - catch exit : {noproc, _} -> - {error, client_down} + catch + exit : {noproc, _} -> + {error, client_down}; + exit : {timeout, _} -> + {error, client_timeout} end. -spec kf(kpro:field_name(), kpro:struct()) -> kpro:field_value(). From b44c4c246435ba6dbdd88f7189ea77d4c42bcfa0 Mon Sep 17 00:00:00 2001 From: v0idpwn Date: Sun, 12 Sep 2021 01:46:31 +0300 Subject: [PATCH 2/6] handle any exits in safe_gen_call --- src/brod_client.erl | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/brod_client.erl b/src/brod_client.erl index e74f969d..f2fd5eb9 100644 --- a/src/brod_client.erl +++ b/src/brod_client.erl @@ -87,12 +87,14 @@ | {producer_not_found, topic()} | { producer_not_found , topic() - , partition()}. + , partition()} + | term(). -type get_consumer_error() :: client_down | {consumer_down, noproc} | {consumer_not_found, topic()} - | {consumer_not_found, topic(), partition()}. + | {consumer_not_found, topic(), partition()} + | term(). -type get_worker_error() :: get_producer_error() | get_consumer_error(). @@ -842,7 +844,9 @@ safe_gen_call(Server, Call, Timeout) -> exit : {noproc, _} -> {error, client_down}; exit : {timeout, _} -> - {error, client_timeout} + {error, client_timeout}; + exit : {reason, _} -> + {error, reason} end. -spec kf(kpro:field_name(), kpro:struct()) -> kpro:field_value(). From 9c63795167a3306a1e33c54cec87e6dffee16f7d Mon Sep 17 00:00:00 2001 From: v0idpwn Date: Sun, 12 Sep 2021 01:50:26 +0300 Subject: [PATCH 3/6] remove client_timeout error all safe_gen_call calls have infinity timeout --- src/brod_client.erl | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/brod_client.erl b/src/brod_client.erl index f2fd5eb9..38e7189b 100644 --- a/src/brod_client.erl +++ b/src/brod_client.erl @@ -835,16 +835,13 @@ ensure_partition_workers(TopicName, State, F) -> -spec safe_gen_call(pid() | atom(), Call, Timeout) -> Return when Call :: term(), Timeout :: infinity | integer(), - Return :: ok | {ok, term()} | {error, client_down - | client_timeout | term()}. + Return :: ok | {ok, term()} | {error, client_down | term()}. safe_gen_call(Server, Call, Timeout) -> try gen_server:call(Server, Call, Timeout) catch exit : {noproc, _} -> {error, client_down}; - exit : {timeout, _} -> - {error, client_timeout}; exit : {reason, _} -> {error, reason} end. From 5825b39b5890eae5f7f412831724d3289e3de56c Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Sat, 5 Feb 2022 09:53:50 +0100 Subject: [PATCH 4/6] fix: catch gen_server:call exit exceptions --- src/brod.erl | 6 ++++-- src/brod_client.erl | 23 +++++++++++------------ src/brod_consumer.erl | 9 +++++---- src/brod_consumers_sup.erl | 6 +++--- src/brod_producers_sup.erl | 6 +++--- 5 files changed, 26 insertions(+), 24 deletions(-) diff --git a/src/brod.erl b/src/brod.erl index d8182f17..52ebc7da 100644 --- a/src/brod.erl +++ b/src/brod.erl @@ -455,7 +455,8 @@ get_partitions_count(Client, Topic) -> -spec get_consumer(client(), topic(), partition()) -> {ok, pid()} | {error, Reason} when Reason :: client_down - | {consumer_down, noproc} + | {client_down, any()} + | {consumer_down, any()} | {consumer_not_found, topic()} | {consumer_not_found, topic(), partition()}. get_consumer(Client, Topic, Partition) -> @@ -465,7 +466,8 @@ get_consumer(Client, Topic, Partition) -> -spec get_producer(client(), topic(), partition()) -> {ok, pid()} | {error, Reason} when Reason :: client_down - | {producer_down, noproc} + | {client_down, any()} + | {producer_down, any()} | {producer_not_found, topic()} | {producer_not_found, topic(), partition()}. get_producer(Client, Topic, Partition) -> diff --git a/src/brod_client.erl b/src/brod_client.erl index 38e7189b..7b2e0ad6 100644 --- a/src/brod_client.erl +++ b/src/brod_client.erl @@ -83,18 +83,16 @@ | ?CONSUMER_KEY(topic(), partition()). -type get_producer_error() :: client_down - | {producer_down, noproc} + | {client_down, any()} + | {producer_down, any()} | {producer_not_found, topic()} - | { producer_not_found - , topic() - , partition()} - | term(). + | {producer_not_found, topic(), partition()}. -type get_consumer_error() :: client_down - | {consumer_down, noproc} + | {client_down, any()} + | {consumer_down, any()} | {consumer_not_found, topic()} - | {consumer_not_found, topic(), partition()} - | term(). + | {consumer_not_found, topic(), partition()}. -type get_worker_error() :: get_producer_error() | get_consumer_error(). @@ -831,19 +829,20 @@ ensure_partition_workers(TopicName, State, F) -> end end). -%% Catches noproc and timeout exit exceptions when making gen_server:call. +%% Catches exit exceptions when making gen_server:call. -spec safe_gen_call(pid() | atom(), Call, Timeout) -> Return when Call :: term(), Timeout :: infinity | integer(), - Return :: ok | {ok, term()} | {error, client_down | term()}. + Return :: ok | {ok, term()} | {error, Reason}, + Reason :: client_down | {client_down, any()} | any(). safe_gen_call(Server, Call, Timeout) -> try gen_server:call(Server, Call, Timeout) catch exit : {noproc, _} -> {error, client_down}; - exit : {reason, _} -> - {error, reason} + exit : {Reason, _} -> + {error, {client_down, Reason}} end. -spec kf(kpro:field_name(), kpro:struct()) -> kpro:field_value(). diff --git a/src/brod_consumer.erl b/src/brod_consumer.erl index 3c154920..2a045f73 100644 --- a/src/brod_consumer.erl +++ b/src/brod_consumer.erl @@ -794,16 +794,17 @@ reset_buffer(#state{ pending_acks = #pending_acks{queue = Queue} , last_req_ref = ?undef }. -%% Catch noproc exit exception when making gen_server:call. +%% Catch exit exceptions when making gen_server:call. -spec safe_gen_call(pid() | atom(), Call, Timeout) -> Return when Call :: term(), Timeout :: infinity | integer(), - Return :: ok | {ok, term()} | {error, consumer_down | term()}. + Return :: ok | {ok, term()} | {error, any()}. safe_gen_call(Server, Call, Timeout) -> try gen_server:call(Server, Call, Timeout) - catch exit : {noproc, _} -> - {error, consumer_down} + catch + exit : {Reason, _} -> + {error, Reason} end. %% Init payload connection regardless of subscriber state. diff --git a/src/brod_consumers_sup.erl b/src/brod_consumers_sup.erl index ba876ae4..f70ed51c 100644 --- a/src/brod_consumers_sup.erl +++ b/src/brod_consumers_sup.erl @@ -67,7 +67,7 @@ stop_consumer(SupPid, TopicName) -> {ok, pid()} | {error, Reason} when Reason :: {consumer_not_found, brod:topic()} | {consumer_not_found, brod:topic(), brod:partition()} - | {consumer_down, noproc}. + | {consumer_down, any()}. find_consumer(SupPid, Topic, Partition) -> case supervisor3:find_child(SupPid, Topic) of [] -> @@ -83,8 +83,8 @@ find_consumer(SupPid, Topic, Partition) -> [Pid] -> {ok, Pid} end - catch exit : {noproc, _} -> - {error, {consumer_down, noproc}} + catch exit : {Reason, _} -> + {error, {consumer_down, Reason}} end end. diff --git a/src/brod_producers_sup.erl b/src/brod_producers_sup.erl index 284282bf..02ee8531 100644 --- a/src/brod_producers_sup.erl +++ b/src/brod_producers_sup.erl @@ -71,7 +71,7 @@ stop_producer(SupPid, TopicName) -> {ok, pid()} | {error, Reason} when Reason :: {producer_not_found, brod:topic()} | {producer_not_found, brod:topic(), brod:partition()} - | {producer_down, noproc}. + | {producer_down, any()}. find_producer(SupPid, Topic, Partition) -> case supervisor3:find_child(SupPid, Topic) of [] -> @@ -87,8 +87,8 @@ find_producer(SupPid, Topic, Partition) -> [Pid] -> {ok, Pid} end - catch exit : {noproc, _} -> - {error, {producer_down, noproc}} + catch exit : {Reason, _} -> + {error, {producer_down, Reason}} end end. From 0c933e6158466b25bfa9536479587801dba524b9 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Sat, 5 Feb 2022 11:01:46 +0100 Subject: [PATCH 5/6] fix: update kafka_protocol from 4.0.1 to 4.0.3 Prior to this change the actual time spent in establishing a Kafka connection might be longer than desired due to the timeout being used in SSL upgrade (if enabled), then API version query. This has been fixed by turning the given timeout config into a deadline, and the sub-steps will try to meet the deadline. see more details here: https://github.com/kafka4beam/kafka_protocol/pull/92 --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index 47fe6434..42755e24 100644 --- a/rebar.config +++ b/rebar.config @@ -1,5 +1,5 @@ {deps, [ {supervisor3, "1.1.11"} - , {kafka_protocol, "4.0.1"} + , {kafka_protocol, "4.0.3"} , {snappyer, "1.2.8"} ]}. {edoc_opts, [{preprocess, true}, {macros, [{build_brod_cli, true}]}]}. From 32d09d641b697f4357051e8640172d07d618a59d Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Sat, 5 Feb 2022 10:18:33 +0100 Subject: [PATCH 6/6] chore: update changelog, prepare for 3.16.2 --- changelog.md | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/changelog.md b/changelog.md index e0259de5..d5e14e40 100644 --- a/changelog.md +++ b/changelog.md @@ -1,7 +1,19 @@ * 3.16.2 + * Update kafka_protocol from 4.0.1 to 4.0.3. + Prior to this change the actual time spent in establishing a + Kafka connection might be longer than desired due to the timeout + being used in SSL upgrade (if enabled), then API version query. + This has been fixed by turning the given timeout config + into a deadline, and the sub-steps will try to meet the deadline. + see more details here: https://github.com/kafka4beam/kafka_protocol/pull/9 + * Catch `timeout` and other `DOWN` reasons when making `gen_server` call to + `brod_client`, `brod_consumer` and producer/consumer supervisor, + and return as `Reason` in `{error, Reason}`. + Previously only `noproc` reaon is caught. (#492) * Propagate `connect_timeout` config to `kpro` API functions as `timeout` arg affected APIs: connect_group_coordinator, create_topics, delete_topics, - resolve_offset, fetch, fold, fetch_committed_offsets + resolve_offset, fetch, fold, fetch_committed_offsets (#458) + * Fix bad field name in group describe request (#486) * 3.16.1 * Fix `brod` script in `brod-cli` in release. * Support `rebalance_timeout` consumer group option