diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/CruiseControlMetricsReporterSampler.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/CruiseControlMetricsReporterSampler.java index 1ff4888de..b4caf91f6 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/CruiseControlMetricsReporterSampler.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/CruiseControlMetricsReporterSampler.java @@ -45,6 +45,7 @@ public class CruiseControlMetricsReporterSampler implements MetricSampler { private static final long METRIC_REPORTER_CONSUMER_POLL_TIMEOUT = 5000L; // Default configs private static final String DEFAULT_METRIC_REPORTER_SAMPLER_GROUP_ID = "CruiseControlMetricsReporterSampler"; + private static final Long DEFAULT_RECONNECT_BACKOFF_MS = 50L; // static metric processor for metrics aggregation. private static final CruiseControlMetricsProcessor METRICS_PROCESSOR = new CruiseControlMetricsProcessor(); // static random token to avoid group conflict. @@ -218,6 +219,10 @@ public void configure(Map configs) { if (groupId == null) { groupId = DEFAULT_METRIC_REPORTER_SAMPLER_GROUP_ID + "-" + RANDOM.nextLong(); } + Long reconnectBackoffMs = (Long) configs.get(KafkaCruiseControlConfig.RECONNECT_BACKOFF_MS_CONFIG); + if (reconnectBackoffMs == null) { + reconnectBackoffMs = DEFAULT_RECONNECT_BACKOFF_MS; + } Properties consumerProps = new Properties(); consumerProps.putAll(configs); @@ -230,8 +235,7 @@ public void configure(Map configs) { consumerProps.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Integer.toString(Integer.MAX_VALUE)); consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MetricSerde.class.getName()); - consumerProps.setProperty(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, - (String) configs.get(KafkaCruiseControlConfig.RECONNECT_BACKOFF_MS_CONFIG)); + consumerProps.setProperty(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, reconnectBackoffMs.toString()); _metricConsumer = new KafkaConsumer<>(consumerProps); _currentPartitionAssignment = Collections.emptySet(); if (refreshPartitionAssignment()) {