Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make startup more robust and prevent auto topic creation when using CruiseControlMetricsReporterSampler #2211

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

k0b3rIT
Copy link
Contributor

@k0b3rIT k0b3rIT commented Oct 23, 2024

Summary

  1. Why: During the first startup of Kafka brokers and Cruise Control, it could happen that one of the broker is lagging behind, or the cluster is not ready (not enough broker is available) to create the metrics topic (cruise.control.metrics.topic.auto.create=true) with the desired guaranties (replica count). If we (startup script/automation) did not wait enough with the CC start command, then it could fail.
    There is now retry logic in the CruiseControlMetricsReporterSampler initialization to handle this edge case.
    2 things could happen depending on the Kafka's auto.create.topics.enable config

auto.create.topics.enable=false

CC startup fails with the following error after the unsuccessful refreshPartitionAssignment() as the topic does not exist:

java.lang.IllegalStateException: Cruise Control cannot find partitions for the metrics reporter that topic matches __CruiseControlMetrics in the target cluster.
	at com.linkedin.kafka.cruisecontrol.monitor.sampling.CruiseControlMetricsReporterSampler.configure(CruiseControlMetricsReporterSampler.java:195) ~[classes/:?]
	at com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfigUtils.getConfiguredInstance(KafkaCruiseControlConfigUtils.java:49) ~[classes/:?]
	at com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig.getConfiguredInstance(KafkaCruiseControlConfig.java:98) ~[classes/:?]

auto.create.topics.enable=true

During the refreshPartitionAssignment() the Kafka Consumer send a topicMetadata request to the brokers. This request triggers the auto topic creation mechanism in Kafka. Based on which broker actually creates the topic, the topic creation can be sync or async, meaning the refreshPartitionAssignment() could fail or succeed. In either case, the topic creation won't use the desired topic properties we defined in the CruiseControlMetricsReporter config cruise.control.metrics.topic.num.partitions, cruise.control.metrics.topic.replication.factor. It will use a default topic configuration silently.
If the refreshPartitionAssignment() succeed, we did not even notice anything.
If the refreshPartitionAssignment() fails, the CC won't start. However, the topic auto creation got triggered, and the brokers will create the topic. If we check the logs, we will see the error above, but we will have a hard time to figure out why the error happened since the topic exist in Kafka (with the default topic config). If we start the CC again, then it starts without any issue, and will use the topic created by the topic auto creation mechanism.

  1. What:
  • Changed the Kafka consumer creation to disable the topic auto creation during the metadata request.
  • Introduced a mechanism to explicitly wait for the metrics topic to appear during the CruiseControlMetricsReporterSampler configuration.

Expected Behavior

Metrics topic got created with the defined configuration.
CC tolerate lagging, slow broker startup

Actual Behavior

Metrics topic got created with the default topic config
CC fails instantly if the topic does not exist during the startup

Steps to Reproduce

  1. Set the following broker configs
          metric.reporters: com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
          cruise.control.metrics.topic: __CruiseControlMetrics
          cruise.control.metrics.topic.auto.create: true
          cruise.control.metrics.topic.num.partitions: 1
          cruise.control.metrics.topic.replication.factor: 3
          cruise.control.metrics.topic.auto.create.retries: 12
          cruise.control.metrics.topic.auto.create.timeout.ms: 15000
          auto.create.topics.enable: true
  1. Start a 2 node Kafka cluster (this simulates the lagging 3rd broker)
  2. Start CC

Known Workarounds

No

Categorization

  • documentation
  • bugfix
  • new feature
  • refactor
  • security/CVE
  • other

@@ -36,6 +37,10 @@ public class CruiseControlMetricsReporterSampler extends AbstractMetricSampler {
// Configurations
public static final String METRIC_REPORTER_SAMPLER_BOOTSTRAP_SERVERS = "metric.reporter.sampler.bootstrap.servers";
public static final String METRIC_REPORTER_TOPIC = "metric.reporter.topic";
public static final String METRIC_REPORTER_TOPIC_ASSERT_ATTEMPTS = "metric.reporter.topic.assert.attempts";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we should call the config metric.reporter.sampler.topic.assert.attempts as it is a config of the sampler and not the reporter.

Also please update the proper section of the Configurations.md file with your configuration. I think you should also mention that this uses exponential backoff and too big numbers will cause infrequent and long backoff between retries after a while.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx, I have fixed it!

@@ -166,9 +184,16 @@ public void configure(Map<String, ?> configs) {
.CRUISE_CONTROL_METRICS_REPORTER_LINGER_MS_CONFIG));
_metricConsumer = createMetricConsumer(configs, CONSUMER_CLIENT_ID_PREFIX);
_currentPartitionAssignment = Collections.emptySet();

LOG.info("Waiting for metrics reporter topic [{}] to be available in the target cluster.", _metricReporterTopic);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: calling the cluster "target" is a bit misleading as it is rather a source cluster (source of the metrics), but I think we should just say "...to be available in the Kafka cluster".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I did not change that, this was the original form (with "target") as you can see:


I reused that in the new exception message.
I am happy to change it if you think it is better to use the mentioned form.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to Kafka cluster

LOG.info("Waiting for metrics reporter topic [{}] to be available in the target cluster.", _metricReporterTopic);
if (!CruiseControlMetricsUtils.retry(()->!this.isMetricsTopicExists(), metricTopicAssertAttempts)) {
throw new IllegalStateException("Cruise Control cannot find the metrics reporter topic that matches [" + _metricReporterTopic
+ "] in the target cluster.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing about "target" as above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to Kafka cluster

throw new IllegalStateException("Cruise Control cannot find partitions for the metrics reporter that topic matches "
+ _metricReporterTopic + " in the target cluster.");
throw new IllegalStateException("Cruise Control cannot find partitions for the metrics reporter that topic matches ["
+ _metricReporterTopic + "] in the target cluster.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another "target".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to Kafka cluster

@@ -166,9 +184,16 @@ public void configure(Map<String, ?> configs) {
.CRUISE_CONTROL_METRICS_REPORTER_LINGER_MS_CONFIG));
_metricConsumer = createMetricConsumer(configs, CONSUMER_CLIENT_ID_PREFIX);
_currentPartitionAssignment = Collections.emptySet();

LOG.info("Waiting for metrics reporter topic [{}] to be available in the target cluster.", _metricReporterTopic);
if (!CruiseControlMetricsUtils.retry(()->!this.isMetricsTopicExists(), metricTopicAssertAttempts)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I see it uses exponential backoff periods. At the 7th retry it is already bit more than 10 minutes which I think may be unreasonably long. And for the 8th retry it waits 21 minutes which may be too long between retries. I'm not sure if this is the good approach, at least not with the current parameters.
I think we should either use 1 as a base, so we retry every 5 second for n times, or something closer to 1, like 1.25. With this parameter we can try 18 times until we get to 22 minutes total. To round it up, 20 times will get you a 35 minute total retry time. I think it's more reasonable to try 20 times in 35 minutes than 6 as it allows quicker startup.
Another approach is to use constant retry intervals with a given timeout. I think that is a more user-friendly approach as it's easier to calculate with that compared to exponents, especially that listTopics() is just a metadata call which is OK to do every 10 seconds or so with a single consumer. Overall I'm OK with the exponent approach if you or others agree on this but I favor the interval+timeout one for usability reasons. Hopefully if we set a good default, users won't have to change it too often and then it may matter less.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually, I have applied your suggestion and changed the base to 1 and scale to 5 to retry every 5s.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the exponential growth is ok but with a cap. i.e. 30 secs. or 1 minute maybe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, that I think could also work. It looks like a good middle ground between the two extremes.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@k0b3rIT - Would you be able to update it with this recommendation?

Copy link
Contributor Author

@k0b3rIT k0b3rIT Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I don't want to expose everything as a config (base, scaleMs, maxAttempt, maxSleep) to make it easier for the users. Can we agree on this setup:
scaleMs = 2000
base = 2
maxSleep = 30
maxAttempt = configurable
delay = scaleMs * base ^ attempt

2*2^0 = 2s
2*2^1 = 4s
2*2^2 = 8s
2*2^3 = 16s
2*2^4 = 30s (capped to 30)
2*2^5 = 30s (capped to 30)
.....

@viktorsomogyi
Copy link
Contributor

@mhratson would you please review this?

@viktorsomogyi
Copy link
Contributor

@mhratson would you please review this or suggest someone who can?

@viktorsomogyi
Copy link
Contributor

@CCisGG would you please review it if you have some time?

@bgrishinko
Copy link

Made a comment on a previous discussion about the pros/cons of exponential backoff vs. timed retry. I think there's a happy medium where you use exponential backoff up to a cap'd time, i.e. 30 secs. or 1 minute.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants