Skip to content

Commit

Permalink
Fix Cruise Control initialization issue if reconnect.backoff.ms is no…
Browse files Browse the repository at this point in the history
…t explicitly set in config. (#732)
  • Loading branch information
kun du authored May 31, 2019
1 parent 266623a commit 7b2d581
Showing 1 changed file with 6 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class CruiseControlMetricsReporterSampler implements MetricSampler {
private static final long METRIC_REPORTER_CONSUMER_POLL_TIMEOUT = 5000L;
// Default configs
private static final String DEFAULT_METRIC_REPORTER_SAMPLER_GROUP_ID = "CruiseControlMetricsReporterSampler";
private static final Long DEFAULT_RECONNECT_BACKOFF_MS = 50L;
// static metric processor for metrics aggregation.
private static final CruiseControlMetricsProcessor METRICS_PROCESSOR = new CruiseControlMetricsProcessor();
// static random token to avoid group conflict.
Expand Down Expand Up @@ -218,6 +219,10 @@ public void configure(Map<String, ?> configs) {
if (groupId == null) {
groupId = DEFAULT_METRIC_REPORTER_SAMPLER_GROUP_ID + "-" + RANDOM.nextLong();
}
Long reconnectBackoffMs = (Long) configs.get(KafkaCruiseControlConfig.RECONNECT_BACKOFF_MS_CONFIG);
if (reconnectBackoffMs == null) {
reconnectBackoffMs = DEFAULT_RECONNECT_BACKOFF_MS;
}

Properties consumerProps = new Properties();
consumerProps.putAll(configs);
Expand All @@ -230,8 +235,7 @@ public void configure(Map<String, ?> configs) {
consumerProps.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Integer.toString(Integer.MAX_VALUE));
consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MetricSerde.class.getName());
consumerProps.setProperty(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG,
(String) configs.get(KafkaCruiseControlConfig.RECONNECT_BACKOFF_MS_CONFIG));
consumerProps.setProperty(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, reconnectBackoffMs.toString());
_metricConsumer = new KafkaConsumer<>(consumerProps);
_currentPartitionAssignment = Collections.emptySet();
if (refreshPartitionAssignment()) {
Expand Down

0 comments on commit 7b2d581

Please sign in to comment.