From 6b1232da1b52fc94141ce2df54eb0f90176b8b01 Mon Sep 17 00:00:00 2001 From: Rachel Aurand Date: Tue, 15 Oct 2024 15:28:19 -0700 Subject: [PATCH] add impl for reconfiguration of ssl properties for metrics reporter --- .../CruiseControlMetricsReporter.java | 85 ++++++++++++------- .../CruiseControlMetricsReporterConfig.java | 23 ++++- .../CruiseControlMetricsReporterSslTest.java | 44 ++++++++++ .../utils/CCEmbeddedBroker.java | 7 ++ 4 files changed, 127 insertions(+), 32 deletions(-) diff --git a/cruise-control-metrics-reporter/src/main/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporter.java b/cruise-control-metrics-reporter/src/main/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporter.java index 58ef293281..b654933553 100644 --- a/cruise-control-metrics-reporter/src/main/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporter.java +++ b/cruise-control-metrics-reporter/src/main/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporter.java @@ -142,6 +142,39 @@ static String getBootstrapServers(Map configs) { @Override public void configure(Map configs) { + CruiseControlMetricsReporterConfig reporterConfig = new CruiseControlMetricsReporterConfig(configs, false); + + _metricsReporterCreateRetries = reporterConfig.getInt( + CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_CREATE_RETRIES_CONFIG); + + Properties producerProps = buildProducerProperties(configs, reporterConfig); + createCruiseControlMetricsProducer(producerProps); + if (_producer == null) { + this.close(); + } + + _brokerId = Integer.parseInt((String) configs.get("broker.id")); + + _cruiseControlMetricsTopic = reporterConfig.getString(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_CONFIG); + _reportingIntervalMs = reporterConfig.getLong(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG); + _kubernetesMode = reporterConfig.getBoolean(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_KUBERNETES_MODE_CONFIG); + + if (reporterConfig.getBoolean(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_CONFIG)) { + try { + _metricsTopic = createMetricsTopicFromReporterConfig(reporterConfig); + Properties adminClientConfigs = CruiseControlMetricsUtils.addSslConfigs(producerProps, reporterConfig); + _adminClient = CruiseControlMetricsUtils.createAdminClient(adminClientConfigs); + _metricsTopicAutoCreateTimeoutMs = reporterConfig.getLong( + CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_TIMEOUT_MS_CONFIG); + _metricsTopicAutoCreateRetries = reporterConfig.getInt( + CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_RETRIES_CONFIG); + } catch (CruiseControlMetricsReporterException e) { + LOG.warn("Cruise Control metrics topic auto creation was disabled", e); + } + } + } + + private Properties buildProducerProperties(Map configs, CruiseControlMetricsReporterConfig reporterConfig) { Properties producerProps = CruiseControlMetricsReporterConfig.parseProducerConfigs(configs); //Add BootstrapServers if not set @@ -149,7 +182,7 @@ public void configure(Map configs) { String bootstrapServers = getBootstrapServers(configs); producerProps.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); LOG.info("Using default value of {} for {}", bootstrapServers, - CruiseControlMetricsReporterConfig.config(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); + CruiseControlMetricsReporterConfig.config(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); } //Add SecurityProtocol if not set @@ -157,51 +190,43 @@ public void configure(Map configs) { String securityProtocol = "PLAINTEXT"; producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); LOG.info("Using default value of {} for {}", securityProtocol, - CruiseControlMetricsReporterConfig.config(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); + CruiseControlMetricsReporterConfig.config(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); } - CruiseControlMetricsReporterConfig reporterConfig = new CruiseControlMetricsReporterConfig(configs, false); - setIfAbsent(producerProps, - ProducerConfig.CLIENT_ID_CONFIG, - reporterConfig.getString(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.CLIENT_ID_CONFIG))); + ProducerConfig.CLIENT_ID_CONFIG, + reporterConfig.getString(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.CLIENT_ID_CONFIG))); setIfAbsent(producerProps, ProducerConfig.LINGER_MS_CONFIG, - reporterConfig.getLong(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_LINGER_MS_CONFIG).toString()); + reporterConfig.getLong(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_LINGER_MS_CONFIG).toString()); setIfAbsent(producerProps, ProducerConfig.BATCH_SIZE_CONFIG, - reporterConfig.getInt(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_BATCH_SIZE_CONFIG).toString()); + reporterConfig.getInt(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_BATCH_SIZE_CONFIG).toString()); setIfAbsent(producerProps, ProducerConfig.RETRIES_CONFIG, "5"); setIfAbsent(producerProps, ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); setIfAbsent(producerProps, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); setIfAbsent(producerProps, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MetricSerde.class.getName()); setIfAbsent(producerProps, ProducerConfig.ACKS_CONFIG, "all"); - _metricsReporterCreateRetries = reporterConfig.getInt( - CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_CREATE_RETRIES_CONFIG); + return producerProps; + } + + @Override + public Set reconfigurableConfigs() { + return CruiseControlMetricsReporterConfig.RECONFIGURABLE_CONFIGS; + } + + @Override + public void reconfigure(Map configs) { + if (_producer != null) { + _producer.close(); + } + LOG.info("Reconfiguring Cruise Control metrics producer"); + CruiseControlMetricsReporterConfig reporterConfig = new CruiseControlMetricsReporterConfig(configs, false); + Properties producerProps = buildProducerProperties(configs, reporterConfig); createCruiseControlMetricsProducer(producerProps); if (_producer == null) { this.close(); } - - _brokerId = Integer.parseInt((String) configs.get("broker.id")); - - _cruiseControlMetricsTopic = reporterConfig.getString(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_CONFIG); - _reportingIntervalMs = reporterConfig.getLong(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG); - _kubernetesMode = reporterConfig.getBoolean(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_KUBERNETES_MODE_CONFIG); - - if (reporterConfig.getBoolean(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_CONFIG)) { - try { - _metricsTopic = createMetricsTopicFromReporterConfig(reporterConfig); - Properties adminClientConfigs = CruiseControlMetricsUtils.addSslConfigs(producerProps, reporterConfig); - _adminClient = CruiseControlMetricsUtils.createAdminClient(adminClientConfigs); - _metricsTopicAutoCreateTimeoutMs = reporterConfig.getLong( - CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_TIMEOUT_MS_CONFIG); - _metricsTopicAutoCreateRetries = reporterConfig.getInt( - CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_RETRIES_CONFIG); - } catch (CruiseControlMetricsReporterException e) { - LOG.warn("Cruise Control metrics topic auto creation was disabled", e); - } - } } /** diff --git a/cruise-control-metrics-reporter/src/main/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterConfig.java b/cruise-control-metrics-reporter/src/main/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterConfig.java index e3d11946fc..b2198b3e22 100644 --- a/cruise-control-metrics-reporter/src/main/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterConfig.java +++ b/cruise-control-metrics-reporter/src/main/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterConfig.java @@ -9,15 +9,19 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.utils.Utils; public class CruiseControlMetricsReporterConfig extends AbstractConfig { private static final ConfigDef CONFIG; private static final Set CONFIGS = new HashSet<>(); public static final String PREFIX = "cruise.control.metrics.reporter."; + static final Set RECONFIGURABLE_CONFIGS = new HashSet<>(); // Configurations public static final String CRUISE_CONTROL_METRICS_TOPIC_CONFIG = "cruise.control.metrics.topic"; private static final String CRUISE_CONTROL_METRICS_TOPIC_DOC = "The topic to which Cruise Control metrics reporter " @@ -57,6 +61,9 @@ public class CruiseControlMetricsReporterConfig extends AbstractConfig { public static final String CRUISE_CONTROL_METRICS_REPORTER_KUBERNETES_MODE_CONFIG = PREFIX + "kubernetes.mode"; public static final String CRUISE_CONTROL_METRICS_REPORTER_KUBERNETES_MODE_DOC = "Cruise Control metrics reporter will report " + "metrics using methods that are aware of container boundaries."; + public static final String CRUISE_CONTROL_METRICS_REPORTER_FORCE_RECONFIGURE_CONFIG = PREFIX + "force.reconfigure"; + public static final String CRUISE_CONTROL_METRICS_REPORTER_FORCE_RECONFIGURE_CONFIG_DOC = "Cruise Control metrics reporter force reconfigure " + + "the flag. Set it a different value (like the current date) to trigger the reconfiguration."; // Default values public static final String DEFAULT_CRUISE_CONTROL_METRICS_TOPIC = "__CruiseControlMetrics"; public static final Integer DEFAULT_CRUISE_CONTROL_METRICS_TOPIC_NUM_PARTITIONS = -1; @@ -74,6 +81,10 @@ public class CruiseControlMetricsReporterConfig extends AbstractConfig { public static final boolean DEFAULT_CRUISE_CONTROL_METRICS_REPORTER_KUBERNETES_MODE = false; public static final int DEFAULT_CRUISE_CONTROL_METRICS_REPORTER_CREATE_RETRIES = 2; + public static final Set EXCLUDED_PRODUCER_CONFIGS = Utils.mkSet( + CRUISE_CONTROL_METRICS_REPORTER_FORCE_RECONFIGURE_CONFIG + ); + public CruiseControlMetricsReporterConfig(Map originals, boolean doLog) { super(CONFIG, originals, doLog); } @@ -155,7 +166,15 @@ public CruiseControlMetricsReporterConfig(Map originals, boolean doLog) { ConfigDef.Type.INT, DEFAULT_CRUISE_CONTROL_METRICS_BATCH_SIZE, ConfigDef.Importance.LOW, - CRUISE_CONTROL_METRICS_REPORTER_BATCH_SIZE_DOC); + CRUISE_CONTROL_METRICS_REPORTER_BATCH_SIZE_DOC) + .define(CRUISE_CONTROL_METRICS_REPORTER_FORCE_RECONFIGURE_CONFIG, + ConfigDef.Type.STRING, + null, + ConfigDef.Importance.LOW, + CRUISE_CONTROL_METRICS_REPORTER_FORCE_RECONFIGURE_CONFIG_DOC); + RECONFIGURABLE_CONFIGS.addAll(SslConfigs.RECONFIGURABLE_CONFIGS.stream() + .map(config -> PREFIX + config).collect(Collectors.toSet())); + RECONFIGURABLE_CONFIGS.add(CRUISE_CONTROL_METRICS_REPORTER_FORCE_RECONFIGURE_CONFIG); } /** @@ -173,7 +192,7 @@ public static String config(String baseConfigName) { static Properties parseProducerConfigs(Map configMap) { Properties props = new Properties(); for (Map.Entry entry : configMap.entrySet()) { - if (entry.getKey().startsWith(PREFIX)) { + if (entry.getKey().startsWith(PREFIX) && !EXCLUDED_PRODUCER_CONFIGS.contains(entry.getKey())) { props.put(entry.getKey().replace(PREFIX, ""), entry.getValue()); } } diff --git a/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterSslTest.java b/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterSslTest.java index b93036f343..82ade23bae 100644 --- a/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterSslTest.java +++ b/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterSslTest.java @@ -6,16 +6,33 @@ import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.Properties; +import java.util.UUID; +import java.util.stream.Collectors; import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaTestUtils; import kafka.server.KafkaConfig; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.AlterConfigsResult; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.junit.Assert; +import org.junit.Test; +import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_FORCE_RECONFIGURE_CONFIG; import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG; import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_CONFIG; +import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.PREFIX; public class CruiseControlMetricsReporterSslTest extends CruiseControlMetricsReporterTest { @@ -53,6 +70,7 @@ public Properties overridingProps() { props.setProperty(KafkaConfig.LogFlushIntervalMessagesProp(), "1"); props.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1"); props.setProperty(KafkaConfig.DefaultReplicationFactorProp(), "2"); + props.setProperty(KafkaConfig.PasswordEncoderSecretProp(), "test"); return props; } @@ -75,4 +93,30 @@ private String appendPrefix(Object key) { return CruiseControlMetricsReporterConfig.config((String) key); } + @Test + public void testAlterConfig() throws Exception { + Properties props = new Properties(); + setSecurityConfigs(props, "admin"); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()); + AdminClient adminClient = AdminClient.create(props); + + String brokerId = String.valueOf(_brokers.get(0).id()); + ConfigResource brokerResource = new ConfigResource(ConfigResource.Type.BROKER, brokerId); + List ops = new ArrayList<>(); + + Map sslProps = _brokers.get(0).config().entrySet().stream() + .filter(entry -> SslConfigs.RECONFIGURABLE_CONFIGS.contains(entry.getKey().toString())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + sslProps.forEach((k, v) -> { + String value = v instanceof Password ? ((Password) v).value() : v.toString(); + ops.add(new AlterConfigOp(new ConfigEntry("listener.name.ssl." + k, value), AlterConfigOp.OpType.SET)); + ops.add(new AlterConfigOp(new ConfigEntry(PREFIX + k, value), AlterConfigOp.OpType.SET)); + }); + ops.add(new AlterConfigOp(new ConfigEntry(CRUISE_CONTROL_METRICS_REPORTER_FORCE_RECONFIGURE_CONFIG, + UUID.randomUUID().toString()), AlterConfigOp.OpType.SET)); + AlterConfigsResult result = adminClient.incrementalAlterConfigs(Collections.singletonMap(brokerResource, ops)); + + result.values().get(brokerResource).get(); + Thread.sleep(5000); + } } diff --git a/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCEmbeddedBroker.java b/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCEmbeddedBroker.java index bb12e923b2..16a3813ca3 100644 --- a/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCEmbeddedBroker.java +++ b/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCEmbeddedBroker.java @@ -8,6 +8,7 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.net.URI; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.NoSuchElementException; @@ -28,11 +29,13 @@ public class CCEmbeddedBroker implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(CCEmbeddedBroker.class); private final Map _ports; private final Map _hosts; + private final Map _config; private final KafkaServer _kafkaServer; private int _id; private File _logDir; public CCEmbeddedBroker(Map config) { + _config = Collections.unmodifiableMap(config); _ports = new HashMap<>(); _hosts = new HashMap<>(); @@ -116,6 +119,10 @@ public int id() { return _id; } + public Map config() { + return _config; + } + /** * @param protocol Security protocol. * @return Address containing host and port.