Skip to content

Commit

Permalink
KAFKA-18156 VerifiableConsumer should ignore "--session-timeout" when…
Browse files Browse the repository at this point in the history
… using CONSUMER protocol (#18036)

Reviewers: TaiJuWu <tjwu1217@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
  • Loading branch information
brandboat authored Dec 11, 2024
1 parent 4e7e351 commit d2ad418
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ public class CommonClientConfigs {
+ "to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, "
+ "then the broker will remove this client from the group and initiate a rebalance. Note that the value "
+ "must be in the allowable range as configured in the broker configuration by <code>group.min.session.timeout.ms</code> "
+ "and <code>group.max.session.timeout.ms</code>.";
+ "and <code>group.max.session.timeout.ms</code>. Note that this configuration is not supported when <code>group.protocol</code> "
+ "is set to \"consumer\".";

public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms";
public static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the consumer "
Expand Down
2 changes: 1 addition & 1 deletion tests/kafkatest/services/verifiable_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
* `--group-id <group-id>`
* `--topic <topic>`
* `--broker-list <brokers>`
* `--session-timeout <n>`
* `--session-timeout <n>` - note that this configuration is not supported when group protocol is consumer
* `--enable-autocommit`
* `--max-messages <n>`
* `--assignment-strategy <s>`
Expand Down
14 changes: 7 additions & 7 deletions tests/kafkatest/services/verifiable_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
}

def __init__(self, context, num_nodes, kafka, topic, group_id,
static_membership=False, max_messages=-1, session_timeout_sec=30, enable_autocommit=False,
static_membership=False, max_messages=-1, session_timeout_sec=0, enable_autocommit=False,
assignment_strategy=None, group_protocol=None, group_remote_assignor=None,
version=DEV_BRANCH, stop_timeout_sec=30, log_level="INFO", jaas_override_variables=None,
on_record_consumed=None, reset_policy="earliest", verify_offsets=True):
Expand All @@ -251,8 +251,6 @@ def __init__(self, context, num_nodes, kafka, topic, group_id,
self.session_timeout_sec = session_timeout_sec
self.enable_autocommit = enable_autocommit
self.assignment_strategy = assignment_strategy
self.group_protocol = group_protocol
self.group_remote_assignor = group_remote_assignor
self.prop_file = ""
self.stop_timeout_sec = stop_timeout_sec
self.on_record_consumed = on_record_consumed
Expand Down Expand Up @@ -417,10 +415,12 @@ def start_cmd(self, node):
else:
cmd += " --bootstrap-server %s" % self.kafka.bootstrap_servers(self.security_config.security_protocol)

cmd += " --reset-policy %s --group-id %s --topic %s --session-timeout %s" % \
(self.reset_policy, self.group_id, self.topic,
self.session_timeout_sec*1000)

cmd += " --reset-policy %s --group-id %s --topic %s" % \
(self.reset_policy, self.group_id, self.topic)

if self.session_timeout_sec > 0:
cmd += " --session-timeout %s" % self.session_timeout_sec*1000

if self.max_messages > 0:
cmd += " --max-messages %s" % str(self.max_messages)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def rolling_bounce_consumers(self, consumer, clean_shutdown=True):
consumer.stop_node(node, clean_shutdown)

wait_until(lambda: len(consumer.dead_nodes()) == 1,
timeout_sec=self.session_timeout_sec+5,
timeout_sec=60,
err_msg="Timed out waiting for the consumer to shutdown")

consumer.start_node(node)
Expand Down
21 changes: 5 additions & 16 deletions tests/kafkatest/tests/client/consumer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def rolling_bounce_consumers(self, consumer, keep_alive=0, num_bounces=5, clean_
consumer.stop_node(node, clean_shutdown)

wait_until(lambda: len(consumer.dead_nodes()) == 1,
timeout_sec=self.session_timeout_sec+5,
timeout_sec=60,
err_msg="Timed out waiting for the consumer to shutdown")

consumer.start_node(node)
Expand Down Expand Up @@ -101,14 +101,6 @@ def test_broker_rolling_bounce(self, metadata_quorum=quorum.zk, use_new_coordina
partition = TopicPartition(self.TOPIC, 0)

producer = self.setup_producer(self.TOPIC)
# The consumers' session timeouts must exceed the time it takes for a broker to roll. Consumers are likely
# to see cluster metadata consisting of just a single alive broker in the case where the cluster has just 2
# brokers and the cluster is rolling (which is what is happening here). When the consumer sees a single alive
# broker, and then that broker rolls, the consumer will be unable to connect to the cluster until that broker
# completes its roll. In the meantime, the consumer group will move to the group coordinator on the other
# broker, and that coordinator will fail the consumer and trigger a group rebalance if its session times out.
# This test is asserting that no rebalances occur, so we increase the session timeout for this to be the case.
self.session_timeout_sec = 30
consumer = self.setup_consumer(self.TOPIC, group_protocol=group_protocol)

producer.start()
Expand Down Expand Up @@ -229,7 +221,6 @@ def test_static_consumer_bounce_with_eager_assignment(self, clean_shutdown, stat
producer.start()
self.await_produced_messages(producer)

self.session_timeout_sec = 60
consumer = self.setup_consumer(self.TOPIC, static_membership=static_membership, group_protocol=group_protocol,
assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor")

Expand Down Expand Up @@ -295,7 +286,6 @@ def test_static_consumer_persisted_after_rejoin(self, bounce_mode, metadata_quor
producer = self.setup_producer(self.TOPIC)
producer.start()
self.await_produced_messages(producer)
self.session_timeout_sec = 60
consumer = self.setup_consumer(self.TOPIC, static_membership=True, group_protocol=group_protocol)
consumer.start()
self.await_all_members(consumer)
Expand Down Expand Up @@ -340,7 +330,6 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me
producer.start()
self.await_produced_messages(producer)

self.session_timeout_sec = 60
consumer = self.setup_consumer(self.TOPIC, static_membership=True, group_protocol=group_protocol)

self.num_consumers = num_conflict_consumers
Expand Down Expand Up @@ -372,7 +361,7 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me
# Stop existing nodes, so conflicting ones should be able to join.
consumer.stop_all()
wait_until(lambda: len(consumer.dead_nodes()) == len(consumer.nodes),
timeout_sec=self.session_timeout_sec+5,
timeout_sec=60,
err_msg="Timed out waiting for the consumer to shutdown")
conflict_consumer.start()
self.await_members(conflict_consumer, num_conflict_consumers)
Expand All @@ -383,13 +372,13 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me
conflict_consumer.start()

wait_until(lambda: len(consumer.joined_nodes()) + len(conflict_consumer.joined_nodes()) == len(consumer.nodes),
timeout_sec=self.session_timeout_sec*2,
timeout_sec=60,
err_msg="Timed out waiting for consumers to join, expected total %d joined, but only see %d joined from "
"normal consumer group and %d from conflict consumer group" % \
(len(consumer.nodes), len(consumer.joined_nodes()), len(conflict_consumer.joined_nodes()))
)
wait_until(lambda: len(consumer.dead_nodes()) + len(conflict_consumer.dead_nodes()) == len(conflict_consumer.nodes),
timeout_sec=self.session_timeout_sec*2,
timeout_sec=60,
err_msg="Timed out waiting for fenced consumers to die, expected total %d dead, but only see %d dead in "
"normal consumer group and %d dead in conflict consumer group" % \
(len(conflict_consumer.nodes), len(consumer.dead_nodes()), len(conflict_consumer.dead_nodes()))
Expand Down Expand Up @@ -427,7 +416,7 @@ def test_consumer_failure(self, clean_shutdown, enable_autocommit, metadata_quor
# stop the partition owner and await its shutdown
consumer.kill_node(partition_owner, clean_shutdown=clean_shutdown)
wait_until(lambda: len(consumer.joined_nodes()) == (self.num_consumers - 1) and consumer.owner(partition) is not None,
timeout_sec=self.session_timeout_sec*2+5,
timeout_sec=60,
err_msg="Timed out waiting for consumer to close")

# ensure that the remaining consumer does some work after rebalancing
Expand Down
2 changes: 1 addition & 1 deletion tests/kafkatest/tests/client/pluggable_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,5 @@ def test_start_stop(self, metadata_quorum=quorum.zk):

self.logger.debug("Waiting for %d nodes to stop" % len(consumer.nodes))
wait_until(lambda: len(consumer.dead_nodes()) == len(consumer.nodes),
timeout_sec=self.session_timeout_sec+5,
timeout_sec=60,
err_msg="Timed out waiting for consumers to shutdown")
11 changes: 5 additions & 6 deletions tests/kafkatest/tests/verifiable_consumer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@ class VerifiableConsumerTest(KafkaTest):
PRODUCER_REQUEST_TIMEOUT_SEC = 30

def __init__(self, test_context, num_consumers=1, num_producers=0,
group_id="test_group_id", session_timeout_sec=10, **kwargs):
group_id="test_group_id", **kwargs):
super(VerifiableConsumerTest, self).__init__(test_context, **kwargs)
self.num_consumers = num_consumers
self.num_producers = num_producers
self.group_id = group_id
self.session_timeout_sec = session_timeout_sec
self.consumption_timeout_sec = max(self.PRODUCER_REQUEST_TIMEOUT_SEC + 5, 2 * session_timeout_sec)
self.consumption_timeout_sec = self.PRODUCER_REQUEST_TIMEOUT_SEC + 5

def _all_partitions(self, topic, num_partitions):
partitions = set()
Expand All @@ -56,7 +55,7 @@ def min_cluster_size(self):
def setup_consumer(self, topic, static_membership=False, enable_autocommit=False,
assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor", group_remote_assignor="range", **kwargs):
return VerifiableConsumer(self.test_context, self.num_consumers, self.kafka,
topic, self.group_id, static_membership=static_membership, session_timeout_sec=self.session_timeout_sec,
topic, self.group_id, static_membership=static_membership,
assignment_strategy=assignment_strategy, enable_autocommit=enable_autocommit,
group_remote_assignor=group_remote_assignor,
log_level="TRACE", **kwargs)
Expand All @@ -81,9 +80,9 @@ def await_consumed_messages(self, consumer, min_messages=1):
def await_members(self, consumer, num_consumers):
# Wait until all members have joined the group
wait_until(lambda: len(consumer.joined_nodes()) == num_consumers,
timeout_sec=self.session_timeout_sec*2,
timeout_sec=60,
err_msg="Consumers failed to join in a reasonable amount of time")

def await_all_members(self, consumer):
self.await_members(consumer, self.num_consumers)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,11 +574,10 @@ private static ArgumentParser argParser() {
parser.addArgument("--session-timeout")
.action(store())
.required(false)
.setDefault(30000)
.type(Integer.class)
.metavar("TIMEOUT_MS")
.dest("sessionTimeout")
.help("Set the consumer's session timeout");
.help("Set the consumer's session timeout, note that this configuration is not supported when group protocol is consumer");

parser.addArgument("--verbose")
.action(storeTrue())
Expand Down Expand Up @@ -649,10 +648,15 @@ public static VerifiableConsumer createFromArgs(ArgumentParser parser, String[]
if (groupRemoteAssignor != null)
consumerProps.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, groupRemoteAssignor);
} else {
// This means we're using the old consumer group protocol.
// This means we're using the CLASSIC consumer group protocol.
consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, res.getString("assignmentStrategy"));
}

Integer sessionTimeout = res.getInt("sessionTimeout");
if (sessionTimeout != null) {
consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(sessionTimeout));
}

consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, res.getString("groupId"));

String groupInstanceId = res.getString("groupInstanceId");
Expand All @@ -664,7 +668,6 @@ public static VerifiableConsumer createFromArgs(ArgumentParser parser, String[]

consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, useAutoCommit);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, res.getString("resetPolicy"));
consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(res.getInt("sessionTimeout")));

StringDeserializer deserializer = new StringDeserializer();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps, deserializer, deserializer);
Expand Down

0 comments on commit d2ad418

Please sign in to comment.