Skip to content

Commit

Permalink
Avoid duplicate values for listeners and listener_security_protocol_m…
Browse files Browse the repository at this point in the history
…ap in KafkaContainer (#8850)

Currently, when KafkaContainer is started more than one time then
`KAFKA_LISTENERS` and `KAFKA_LISTENER_SECURITY_PROTOCOL_MAP` registers
an additional entry, which is duplicated.

Fixes #8619
  • Loading branch information
eddumelendez authored Jul 5, 2024
1 parent b4b1c20 commit bd919df
Showing 1 changed file with 21 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -323,14 +323,8 @@ void withClusterId(String clusterId) {
void withRaft() {
this.envVars.computeIfAbsent("CLUSTER_ID", key -> clusterId);
this.envVars.computeIfAbsent("KAFKA_NODE_ID", key -> getEnvVars().get("KAFKA_BROKER_ID"));
addEnvVar(
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
String.format("%s,CONTROLLER:PLAINTEXT", getEnvVars().get("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"))
);
addEnvVar(
"KAFKA_LISTENERS",
String.format("%s,CONTROLLER://0.0.0.0:9094", getEnvVars().get("KAFKA_LISTENERS"))
);
addEnvVar("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", kafkaListenerSecurityProtocolMap());
addEnvVar("KAFKA_LISTENERS", kafkaListeners());
addEnvVar("KAFKA_PROCESS_ROLES", "broker,controller");

String firstNetworkAlias = getNetworkAliases().stream().findFirst().orElse(null);
Expand All @@ -345,5 +339,24 @@ void withRaft() {

setWaitStrategy(Wait.forLogMessage(".*Transitioning from RECOVERY to RUNNING.*", 1));
}

private String kafkaListenerSecurityProtocolMap() {
String kafkaListenerSecurityProtocolMapEnvVar = getEnvVars().get("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP");
String kafkaListenerSecurityProtocolMap = String.format(
"%s,CONTROLLER:PLAINTEXT",
kafkaListenerSecurityProtocolMapEnvVar
);
Set<String> listenerSecurityProtocolMap = new HashSet<>(
Arrays.asList(kafkaListenerSecurityProtocolMap.split(","))
);
return String.join(",", listenerSecurityProtocolMap);
}

private String kafkaListeners() {
String kafkaListenersEnvVar = getEnvVars().get("KAFKA_LISTENERS");
String kafkaListeners = String.format("%s,CONTROLLER://0.0.0.0:9094", kafkaListenersEnvVar);
Set<String> listeners = new HashSet<>(Arrays.asList(kafkaListeners.split(",")));
return String.join(",", listeners);
}
}
}

0 comments on commit bd919df

Please sign in to comment.