Skip to content

Commit

Permalink
add impl for reconfiguration of ssl properties for metrics reporter
Browse files Browse the repository at this point in the history
  • Loading branch information
countableSet committed Oct 15, 2024
1 parent 85d6ca7 commit 6b1232d
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,66 +142,91 @@ static String getBootstrapServers(Map<String, ?> configs) {

@Override
public void configure(Map<String, ?> configs) {
CruiseControlMetricsReporterConfig reporterConfig = new CruiseControlMetricsReporterConfig(configs, false);

_metricsReporterCreateRetries = reporterConfig.getInt(
CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_CREATE_RETRIES_CONFIG);

Properties producerProps = buildProducerProperties(configs, reporterConfig);
createCruiseControlMetricsProducer(producerProps);
if (_producer == null) {
this.close();
}

_brokerId = Integer.parseInt((String) configs.get("broker.id"));

_cruiseControlMetricsTopic = reporterConfig.getString(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_CONFIG);
_reportingIntervalMs = reporterConfig.getLong(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG);
_kubernetesMode = reporterConfig.getBoolean(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_KUBERNETES_MODE_CONFIG);

if (reporterConfig.getBoolean(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_CONFIG)) {
try {
_metricsTopic = createMetricsTopicFromReporterConfig(reporterConfig);
Properties adminClientConfigs = CruiseControlMetricsUtils.addSslConfigs(producerProps, reporterConfig);
_adminClient = CruiseControlMetricsUtils.createAdminClient(adminClientConfigs);
_metricsTopicAutoCreateTimeoutMs = reporterConfig.getLong(
CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_TIMEOUT_MS_CONFIG);
_metricsTopicAutoCreateRetries = reporterConfig.getInt(
CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_RETRIES_CONFIG);
} catch (CruiseControlMetricsReporterException e) {
LOG.warn("Cruise Control metrics topic auto creation was disabled", e);
}
}
}

private Properties buildProducerProperties(Map<String, ?> configs, CruiseControlMetricsReporterConfig reporterConfig) {
Properties producerProps = CruiseControlMetricsReporterConfig.parseProducerConfigs(configs);

//Add BootstrapServers if not set
if (!producerProps.containsKey(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)) {
String bootstrapServers = getBootstrapServers(configs);
producerProps.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
LOG.info("Using default value of {} for {}", bootstrapServers,
CruiseControlMetricsReporterConfig.config(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
CruiseControlMetricsReporterConfig.config(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
}

//Add SecurityProtocol if not set
if (!producerProps.containsKey(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)) {
String securityProtocol = "PLAINTEXT";
producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
LOG.info("Using default value of {} for {}", securityProtocol,
CruiseControlMetricsReporterConfig.config(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
CruiseControlMetricsReporterConfig.config(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}

CruiseControlMetricsReporterConfig reporterConfig = new CruiseControlMetricsReporterConfig(configs, false);

setIfAbsent(producerProps,
ProducerConfig.CLIENT_ID_CONFIG,
reporterConfig.getString(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.CLIENT_ID_CONFIG)));
ProducerConfig.CLIENT_ID_CONFIG,
reporterConfig.getString(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.CLIENT_ID_CONFIG)));
setIfAbsent(producerProps, ProducerConfig.LINGER_MS_CONFIG,
reporterConfig.getLong(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_LINGER_MS_CONFIG).toString());
reporterConfig.getLong(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_LINGER_MS_CONFIG).toString());
setIfAbsent(producerProps, ProducerConfig.BATCH_SIZE_CONFIG,
reporterConfig.getInt(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_BATCH_SIZE_CONFIG).toString());
reporterConfig.getInt(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_BATCH_SIZE_CONFIG).toString());
setIfAbsent(producerProps, ProducerConfig.RETRIES_CONFIG, "5");
setIfAbsent(producerProps, ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
setIfAbsent(producerProps, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
setIfAbsent(producerProps, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MetricSerde.class.getName());
setIfAbsent(producerProps, ProducerConfig.ACKS_CONFIG, "all");

_metricsReporterCreateRetries = reporterConfig.getInt(
CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_CREATE_RETRIES_CONFIG);
return producerProps;
}

@Override
public Set<String> reconfigurableConfigs() {
return CruiseControlMetricsReporterConfig.RECONFIGURABLE_CONFIGS;
}

@Override
public void reconfigure(Map<String, ?> configs) {
if (_producer != null) {
_producer.close();
}

LOG.info("Reconfiguring Cruise Control metrics producer");
CruiseControlMetricsReporterConfig reporterConfig = new CruiseControlMetricsReporterConfig(configs, false);
Properties producerProps = buildProducerProperties(configs, reporterConfig);
createCruiseControlMetricsProducer(producerProps);
if (_producer == null) {
this.close();
}

_brokerId = Integer.parseInt((String) configs.get("broker.id"));

_cruiseControlMetricsTopic = reporterConfig.getString(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_CONFIG);
_reportingIntervalMs = reporterConfig.getLong(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG);
_kubernetesMode = reporterConfig.getBoolean(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_KUBERNETES_MODE_CONFIG);

if (reporterConfig.getBoolean(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_CONFIG)) {
try {
_metricsTopic = createMetricsTopicFromReporterConfig(reporterConfig);
Properties adminClientConfigs = CruiseControlMetricsUtils.addSslConfigs(producerProps, reporterConfig);
_adminClient = CruiseControlMetricsUtils.createAdminClient(adminClientConfigs);
_metricsTopicAutoCreateTimeoutMs = reporterConfig.getLong(
CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_TIMEOUT_MS_CONFIG);
_metricsTopicAutoCreateRetries = reporterConfig.getInt(
CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_RETRIES_CONFIG);
} catch (CruiseControlMetricsReporterException e) {
LOG.warn("Cruise Control metrics topic auto creation was disabled", e);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,19 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.utils.Utils;

public class CruiseControlMetricsReporterConfig extends AbstractConfig {
private static final ConfigDef CONFIG;
private static final Set<String> CONFIGS = new HashSet<>();
public static final String PREFIX = "cruise.control.metrics.reporter.";
static final Set<String> RECONFIGURABLE_CONFIGS = new HashSet<>();
// Configurations
public static final String CRUISE_CONTROL_METRICS_TOPIC_CONFIG = "cruise.control.metrics.topic";
private static final String CRUISE_CONTROL_METRICS_TOPIC_DOC = "The topic to which Cruise Control metrics reporter "
Expand Down Expand Up @@ -57,6 +61,9 @@ public class CruiseControlMetricsReporterConfig extends AbstractConfig {
public static final String CRUISE_CONTROL_METRICS_REPORTER_KUBERNETES_MODE_CONFIG = PREFIX + "kubernetes.mode";
public static final String CRUISE_CONTROL_METRICS_REPORTER_KUBERNETES_MODE_DOC = "Cruise Control metrics reporter will report "
+ "metrics using methods that are aware of container boundaries.";
public static final String CRUISE_CONTROL_METRICS_REPORTER_FORCE_RECONFIGURE_CONFIG = PREFIX + "force.reconfigure";
public static final String CRUISE_CONTROL_METRICS_REPORTER_FORCE_RECONFIGURE_CONFIG_DOC = "Cruise Control metrics reporter force reconfigure "
+ "the flag. Set it a different value (like the current date) to trigger the reconfiguration.";
// Default values
public static final String DEFAULT_CRUISE_CONTROL_METRICS_TOPIC = "__CruiseControlMetrics";
public static final Integer DEFAULT_CRUISE_CONTROL_METRICS_TOPIC_NUM_PARTITIONS = -1;
Expand All @@ -74,6 +81,10 @@ public class CruiseControlMetricsReporterConfig extends AbstractConfig {
public static final boolean DEFAULT_CRUISE_CONTROL_METRICS_REPORTER_KUBERNETES_MODE = false;
public static final int DEFAULT_CRUISE_CONTROL_METRICS_REPORTER_CREATE_RETRIES = 2;

public static final Set<String> EXCLUDED_PRODUCER_CONFIGS = Utils.mkSet(
CRUISE_CONTROL_METRICS_REPORTER_FORCE_RECONFIGURE_CONFIG
);

public CruiseControlMetricsReporterConfig(Map<?, ?> originals, boolean doLog) {
super(CONFIG, originals, doLog);
}
Expand Down Expand Up @@ -155,7 +166,15 @@ public CruiseControlMetricsReporterConfig(Map<?, ?> originals, boolean doLog) {
ConfigDef.Type.INT,
DEFAULT_CRUISE_CONTROL_METRICS_BATCH_SIZE,
ConfigDef.Importance.LOW,
CRUISE_CONTROL_METRICS_REPORTER_BATCH_SIZE_DOC);
CRUISE_CONTROL_METRICS_REPORTER_BATCH_SIZE_DOC)
.define(CRUISE_CONTROL_METRICS_REPORTER_FORCE_RECONFIGURE_CONFIG,
ConfigDef.Type.STRING,
null,
ConfigDef.Importance.LOW,
CRUISE_CONTROL_METRICS_REPORTER_FORCE_RECONFIGURE_CONFIG_DOC);
RECONFIGURABLE_CONFIGS.addAll(SslConfigs.RECONFIGURABLE_CONFIGS.stream()
.map(config -> PREFIX + config).collect(Collectors.toSet()));
RECONFIGURABLE_CONFIGS.add(CRUISE_CONTROL_METRICS_REPORTER_FORCE_RECONFIGURE_CONFIG);
}

/**
Expand All @@ -173,7 +192,7 @@ public static String config(String baseConfigName) {
static Properties parseProducerConfigs(Map<String, ?> configMap) {
Properties props = new Properties();
for (Map.Entry<String, ?> entry : configMap.entrySet()) {
if (entry.getKey().startsWith(PREFIX)) {
if (entry.getKey().startsWith(PREFIX) && !EXCLUDED_PRODUCER_CONFIGS.contains(entry.getKey())) {
props.put(entry.getKey().replace(PREFIX, ""), entry.getValue());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,33 @@

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Collectors;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaTestUtils;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.Assert;
import org.junit.Test;

import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_FORCE_RECONFIGURE_CONFIG;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_CONFIG;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.PREFIX;


public class CruiseControlMetricsReporterSslTest extends CruiseControlMetricsReporterTest {
Expand Down Expand Up @@ -53,6 +70,7 @@ public Properties overridingProps() {
props.setProperty(KafkaConfig.LogFlushIntervalMessagesProp(), "1");
props.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
props.setProperty(KafkaConfig.DefaultReplicationFactorProp(), "2");
props.setProperty(KafkaConfig.PasswordEncoderSecretProp(), "test");
return props;
}

Expand All @@ -75,4 +93,30 @@ private String appendPrefix(Object key) {
return CruiseControlMetricsReporterConfig.config((String) key);
}

@Test
public void testAlterConfig() throws Exception {
Properties props = new Properties();
setSecurityConfigs(props, "admin");
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
AdminClient adminClient = AdminClient.create(props);

String brokerId = String.valueOf(_brokers.get(0).id());
ConfigResource brokerResource = new ConfigResource(ConfigResource.Type.BROKER, brokerId);
List<AlterConfigOp> ops = new ArrayList<>();

Map<Object, Object> sslProps = _brokers.get(0).config().entrySet().stream()
.filter(entry -> SslConfigs.RECONFIGURABLE_CONFIGS.contains(entry.getKey().toString()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
sslProps.forEach((k, v) -> {
String value = v instanceof Password ? ((Password) v).value() : v.toString();
ops.add(new AlterConfigOp(new ConfigEntry("listener.name.ssl." + k, value), AlterConfigOp.OpType.SET));
ops.add(new AlterConfigOp(new ConfigEntry(PREFIX + k, value), AlterConfigOp.OpType.SET));
});
ops.add(new AlterConfigOp(new ConfigEntry(CRUISE_CONTROL_METRICS_REPORTER_FORCE_RECONFIGURE_CONFIG,
UUID.randomUUID().toString()), AlterConfigOp.OpType.SET));
AlterConfigsResult result = adminClient.incrementalAlterConfigs(Collections.singletonMap(brokerResource, ops));

result.values().get(brokerResource).get();
Thread.sleep(5000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
Expand All @@ -28,11 +29,13 @@ public class CCEmbeddedBroker implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(CCEmbeddedBroker.class);
private final Map<SecurityProtocol, Integer> _ports;
private final Map<SecurityProtocol, String> _hosts;
private final Map<Object, Object> _config;
private final KafkaServer _kafkaServer;
private int _id;
private File _logDir;

public CCEmbeddedBroker(Map<Object, Object> config) {
_config = Collections.unmodifiableMap(config);
_ports = new HashMap<>();
_hosts = new HashMap<>();

Expand Down Expand Up @@ -116,6 +119,10 @@ public int id() {
return _id;
}

public Map<Object, Object> config() {
return _config;
}

/**
* @param protocol Security protocol.
* @return Address containing host and port.
Expand Down

0 comments on commit 6b1232d

Please sign in to comment.