diff --git a/.vib/kafka/goss/vars.yaml b/.vib/kafka/goss/vars.yaml index fd106625b48c2..b358ba80d5404 100644 --- a/.vib/kafka/goss/vars.yaml +++ b/.vib/kafka/goss/vars.yaml @@ -16,7 +16,7 @@ directories: - /opt/bitnami/kafka/logs files: - paths: - - /opt/bitnami/kafka/config/server.properties + - /opt/bitnami/kafka/config/server.properties.original root_dir: /opt/bitnami version: bin_name: kafka-topics.sh diff --git a/bitnami/kafka/3.2/debian-11/Dockerfile b/bitnami/kafka/3.2/debian-11/Dockerfile index 6efc1a7a52f6d..bc148d73dfbca 100644 --- a/bitnami/kafka/3.2/debian-11/Dockerfile +++ b/bitnami/kafka/3.2/debian-11/Dockerfile @@ -8,10 +8,10 @@ ARG TARGETARCH LABEL com.vmware.cp.artifact.flavor="sha256:1e1b4657a77f0d47e9220f0c37b9bf7802581b93214fff7d1bd2364c8bf22e8e" \ org.opencontainers.image.base.name="docker.io/bitnami/minideb:bullseye" \ - org.opencontainers.image.created="2023-07-30T15:41:10Z" \ + org.opencontainers.image.created="2023-07-31T14:25:58Z" \ org.opencontainers.image.description="Application packaged by VMware, Inc" \ org.opencontainers.image.licenses="Apache-2.0" \ - org.opencontainers.image.ref.name="3.2.3-debian-11-r163" \ + org.opencontainers.image.ref.name="3.2.3-debian-11-r164" \ org.opencontainers.image.title="kafka" \ org.opencontainers.image.vendor="VMware, Inc." \ org.opencontainers.image.version="3.2.3" diff --git a/bitnami/kafka/3.2/debian-11/rootfs/opt/bitnami/scripts/kafka-env.sh b/bitnami/kafka/3.2/debian-11/rootfs/opt/bitnami/scripts/kafka-env.sh index bd335cd6d3390..1d31c9ceb3646 100644 --- a/bitnami/kafka/3.2/debian-11/rootfs/opt/bitnami/scripts/kafka-env.sh +++ b/bitnami/kafka/3.2/debian-11/rootfs/opt/bitnami/scripts/kafka-env.sh @@ -24,26 +24,31 @@ export BITNAMI_DEBUG="${BITNAMI_DEBUG:-false}" # By setting an environment variable matching *_FILE to a file path, the prefixed environment # variable will be overridden with the value specified in that file kafka_env_vars=( - ALLOW_PLAINTEXT_LISTENER KAFKA_INTER_BROKER_USER KAFKA_INTER_BROKER_PASSWORD + KAFKA_CONTROLLER_USER + KAFKA_CONTROLLER_PASSWORD KAFKA_CERTIFICATE_PASSWORD KAFKA_TLS_TRUSTSTORE_FILE KAFKA_TLS_TYPE KAFKA_TLS_CLIENT_AUTH + KAFKA_TLS_INTER_BROKER_AUTH + KAFKA_TLS_CONTROLLER_AUTH KAFKA_OPTS KAFKA_CFG_LISTENERS KAFKA_CFG_ADVERTISED_LISTENERS KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP KAFKA_CFG_ZOOKEEPER_CONNECT + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS KAFKA_CFG_SASL_ENABLED_MECHANISMS KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL KAFKA_CFG_INTER_BROKER_LISTENER_NAME KAFKA_CFG_MAX_REQUEST_SIZE KAFKA_CFG_MAX_PARTITION_FETCH_BYTES - KAFKA_ENABLE_KRAFT KAFKA_KRAFT_CLUSTER_ID + KAFKA_SKIP_KRAFT_STORAGE_INIT + KAFKA_CLIENT_LISTENER_NAME KAFKA_ZOOKEEPER_PROTOCOL KAFKA_ZOOKEEPER_PASSWORD KAFKA_ZOOKEEPER_USER @@ -74,8 +79,7 @@ export KAFKA_BASE_DIR="${BITNAMI_ROOT_DIR}/kafka" export KAFKA_VOLUME_DIR="/bitnami/kafka" export KAFKA_DATA_DIR="${KAFKA_VOLUME_DIR}/data" export KAFKA_CONF_DIR="${KAFKA_BASE_DIR}/config" -export KAFKA_CONF_FILE="${KAFKA_CONF_DIR}/kraft/server.properties" -export KAFKA_ZK_CONF_FILE="${KAFKA_CONF_DIR}/server.properties" +export KAFKA_CONF_FILE="${KAFKA_CONF_DIR}/server.properties" export KAFKA_MOUNTED_CONF_DIR="${KAFKA_VOLUME_DIR}/config" export KAFKA_CERTS_DIR="${KAFKA_CONF_DIR}/certs" export KAFKA_INITSCRIPTS_DIR="/docker-entrypoint-initdb.d" @@ -88,28 +92,33 @@ export KAFKA_DAEMON_USER="kafka" export KAFKA_DAEMON_GROUP="kafka" # Kafka runtime settings -export ALLOW_PLAINTEXT_LISTENER="${ALLOW_PLAINTEXT_LISTENER:-no}" export KAFKA_INTER_BROKER_USER="${KAFKA_INTER_BROKER_USER:-user}" export KAFKA_INTER_BROKER_PASSWORD="${KAFKA_INTER_BROKER_PASSWORD:-bitnami}" +export KAFKA_CONTROLLER_USER="${KAFKA_CONTROLLER_USER:-controller_user}" +export KAFKA_CONTROLLER_PASSWORD="${KAFKA_CONTROLLER_PASSWORD:-bitnami}" export KAFKA_CERTIFICATE_PASSWORD="${KAFKA_CERTIFICATE_PASSWORD:-}" export KAFKA_TLS_TRUSTSTORE_FILE="${KAFKA_TLS_TRUSTSTORE_FILE:-}" export KAFKA_TLS_TYPE="${KAFKA_TLS_TYPE:-JKS}" export KAFKA_TLS_CLIENT_AUTH="${KAFKA_TLS_CLIENT_AUTH:-required}" +export KAFKA_TLS_INTER_BROKER_AUTH="${KAFKA_TLS_INTER_BROKER_AUTH:-$KAFKA_TLS_CLIENT_AUTH}" +export KAFKA_TLS_CONTROLLER_AUTH="${KAFKA_TLS_CONTROLLER_AUTH:-$KAFKA_TLS_CLIENT_AUTH}" export KAFKA_OPTS="${KAFKA_OPTS:-}" # Kafka configuration overrides -export KAFKA_CFG_LISTENERS="${KAFKA_CFG_LISTENERS:-PLAINTEXT://:9092,CONTROLLER://:9093}" -export KAFKA_CFG_ADVERTISED_LISTENERS="${KAFKA_CFG_ADVERTISED_LISTENERS:-PLAINTEXT://:9092}" +export KAFKA_CFG_LISTENERS="${KAFKA_CFG_LISTENERS:-}" +export KAFKA_CFG_ADVERTISED_LISTENERS="${KAFKA_CFG_ADVERTISED_LISTENERS:-}" export KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP="${KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP:-}" export KAFKA_CFG_ZOOKEEPER_CONNECT="${KAFKA_CFG_ZOOKEEPER_CONNECT:-}" +export KAFKA_CFG_CONTROLLER_QUORUM_VOTERS="${KAFKA_CFG_CONTROLLER_QUORUM_VOTERS:-}" export KAFKA_CFG_SASL_ENABLED_MECHANISMS="${KAFKA_CFG_SASL_ENABLED_MECHANISMS:-PLAIN,SCRAM-SHA-256,SCRAM-SHA-512}" export KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL="${KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL:-}" export KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL="${KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL:-}" export KAFKA_CFG_INTER_BROKER_LISTENER_NAME="${KAFKA_CFG_INTER_BROKER_LISTENER_NAME:-}" export KAFKA_CFG_MAX_REQUEST_SIZE="${KAFKA_CFG_MAX_REQUEST_SIZE:-}" export KAFKA_CFG_MAX_PARTITION_FETCH_BYTES="${KAFKA_CFG_MAX_PARTITION_FETCH_BYTES:-}" -export KAFKA_ENABLE_KRAFT="${KAFKA_ENABLE_KRAFT:-yes}" export KAFKA_KRAFT_CLUSTER_ID="${KAFKA_KRAFT_CLUSTER_ID:-}" +export KAFKA_SKIP_KRAFT_STORAGE_INIT="${KAFKA_SKIP_KRAFT_STORAGE_INIT:-false}" +export KAFKA_CLIENT_LISTENER_NAME="${KAFKA_CLIENT_LISTENER_NAME:-}" # ZooKeeper connection settings export KAFKA_ZOOKEEPER_PROTOCOL="${KAFKA_ZOOKEEPER_PROTOCOL:-PLAINTEXT}" diff --git a/bitnami/kafka/3.2/debian-11/rootfs/opt/bitnami/scripts/kafka/postunpack.sh b/bitnami/kafka/3.2/debian-11/rootfs/opt/bitnami/scripts/kafka/postunpack.sh index 746821849b4cf..b6526959daf79 100755 --- a/bitnami/kafka/3.2/debian-11/rootfs/opt/bitnami/scripts/kafka/postunpack.sh +++ b/bitnami/kafka/3.2/debian-11/rootfs/opt/bitnami/scripts/kafka/postunpack.sh @@ -30,6 +30,9 @@ for dir in "$KAFKA_LOG_DIR" "$KAFKA_CONF_DIR" "$KAFKA_MOUNTED_CONF_DIR" "$KAFKA_ done chmod -R g+rwX "$KAFKA_BASE_DIR" "$KAFKA_VOLUME_DIR" "$KAFKA_DATA_DIR" "$KAFKA_INITSCRIPTS_DIR" +# Move the original server.properties, so users can skip initialization logic by mounting their own server.properties directly instead of using the MOUNTED_CONF_DIR +mv "${KAFKA_CONF_DIR}/server.properties" "${KAFKA_CONF_DIR}/server.properties.original" + # Disable logging to stdout and garbage collection # Source: https://logging.apache.org/log4j/log4j-2.4/manual/appenders.html replace_in_file "${KAFKA_BASE_DIR}/bin/kafka-server-start.sh" " [-]loggc" " " diff --git a/bitnami/kafka/3.2/debian-11/rootfs/opt/bitnami/scripts/kafka/run.sh b/bitnami/kafka/3.2/debian-11/rootfs/opt/bitnami/scripts/kafka/run.sh index b7fff77ac9016..a82f26867e701 100755 --- a/bitnami/kafka/3.2/debian-11/rootfs/opt/bitnami/scripts/kafka/run.sh +++ b/bitnami/kafka/3.2/debian-11/rootfs/opt/bitnami/scripts/kafka/run.sh @@ -16,22 +16,17 @@ set -o pipefail # Load Kafka environment variables . /opt/bitnami/scripts/kafka-env.sh -if [[ "${KAFKA_CFG_LISTENERS:-}" =~ SASL ]] || [[ "${KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP:-}" =~ SASL ]] || [[ "${KAFKA_ZOOKEEPER_PROTOCOL:-}" =~ SASL ]]; then +if [[ -f "${KAFKA_CONF_DIR}/kafka_jaas.conf" ]]; then export KAFKA_OPTS="-Djava.security.auth.login.config=${KAFKA_CONF_DIR}/kafka_jaas.conf" fi -if [[ "${KAFKA_ZOOKEEPER_PROTOCOL:-}" =~ SSL ]]; then - ZOOKEEPER_SSL_CONFIG=$(zookeeper_get_tls_config) - export KAFKA_OPTS="$KAFKA_OPTS $ZOOKEEPER_SSL_CONFIG" -fi - -flags=("$(kafka_get_conf_file)") -[[ -z "${KAFKA_EXTRA_FLAGS:-}" ]] || flags=("${flags[@]}" "${KAFKA_EXTRA_FLAGS[@]}") -START_COMMAND=("$KAFKA_HOME/bin/kafka-server-start.sh" "${flags[@]}" "$@") +cmd="$KAFKA_HOME/bin/kafka-server-start.sh" +args=("$KAFKA_CONF_FILE") +! is_empty_value "${KAFKA_EXTRA_FLAGS:-}" && args=("${args[@]}" "${KAFKA_EXTRA_FLAGS[@]}") info "** Starting Kafka **" if am_i_root; then - exec_as_user "$KAFKA_DAEMON_USER" "${START_COMMAND[@]}" + exec_as_user "$KAFKA_DAEMON_USER" "$cmd" "${args[@]}" "$@" else - exec "${START_COMMAND[@]}" + exec "$cmd" "${args[@]}" "$@" fi diff --git a/bitnami/kafka/3.2/debian-11/rootfs/opt/bitnami/scripts/kafka/setup.sh b/bitnami/kafka/3.2/debian-11/rootfs/opt/bitnami/scripts/kafka/setup.sh index 2721ad7f6e934..a1dcc1d2d1620 100755 --- a/bitnami/kafka/3.2/debian-11/rootfs/opt/bitnami/scripts/kafka/setup.sh +++ b/bitnami/kafka/3.2/debian-11/rootfs/opt/bitnami/scripts/kafka/setup.sh @@ -19,39 +19,42 @@ set -o pipefail # Map Kafka environment variables kafka_create_alias_environment_variables -if [[ -z "${KAFKA_CFG_BROKER_ID:-}" ]]; then - if [[ -n "${BROKER_ID_COMMAND:-}" ]]; then - KAFKA_CFG_BROKER_ID="$(eval "${BROKER_ID_COMMAND:-}")" - export KAFKA_CFG_BROKER_ID - elif ! is_boolean_yes "$KAFKA_ENABLE_KRAFT"; then - # By default auto allocate broker ID unless KRaft is enabled - export KAFKA_CFG_BROKER_ID=-1 - fi -fi -# Set the default tuststore locations +# Dinamically set node.id/broker.id/controller.quorum.voters if the _COMMAND environment variable is set +kafka_dynamic_environment_variables + +# Set the default tuststore locations before validation kafka_configure_default_truststore_locations -# Ensure Kafka environment variables are valid -kafka_validate # Ensure Kafka user and group exist when running as 'root' -if am_i_root; then - ensure_user_exists "$KAFKA_DAEMON_USER" --group "$KAFKA_DAEMON_GROUP" - KAFKA_OWNERSHIP_USER="$KAFKA_DAEMON_USER" -else - KAFKA_OWNERSHIP_USER="" -fi +am_i_root && ensure_user_exists "$KAFKA_DAEMON_USER" --group "$KAFKA_DAEMON_GROUP" # Ensure directories used by Kafka exist and have proper ownership and permissions for dir in "$KAFKA_LOG_DIR" "$KAFKA_CONF_DIR" "$KAFKA_MOUNTED_CONF_DIR" "$KAFKA_VOLUME_DIR" "$KAFKA_DATA_DIR"; do - ensure_dir_exists "$dir" "$KAFKA_OWNERSHIP_USER" + if am_i_root; then + ensure_dir_exists "$dir" "$KAFKA_DAEMON_USER" "$KAFKA_DAEMON_GROUP" + else + ensure_dir_exists "$dir" + fi done -# shellcheck disable=SC2148 +# Kafka validation, skipped if server.properties was mounted at either $KAFKA_MOUNTED_CONF_DIR or $KAFKA_CONF_DIR +[[ ! -f "${KAFKA_MOUNTED_CONF_DIR}/server.properties" && ! -f "$KAFKA_CONF_FILE" ]] && kafka_validate +# Kafka initialization, skipped if server.properties was mounted at $KAFKA_CONF_DIR +[[ ! -f "$KAFKA_CONF_FILE" ]] && kafka_initialize -# Ensure Kafka is initialized -kafka_initialize -# If KRaft is enabled initialize -if is_boolean_yes "$KAFKA_ENABLE_KRAFT"; then - kraft_initialize +# Initialise KRaft metadata storage if process.roles configured +if grep -q "^process.roles=" "$KAFKA_CONF_FILE" && ! is_boolean_yes "$KAFKA_SKIP_KRAFT_STORAGE_INIT" ; then + kafka_kraft_storage_initialize +fi +# Configure Zookeeper SCRAM users +if is_boolean_yes "${KAFKA_ZOOKEEPER_BOOTSTRAP_SCRAM_USERS:-}"; then + kafka_zookeeper_create_sasl_scram_users +fi +# KRaft controllers may get stuck starting when the controller quorum voters are changed. +# Workaround: Remove quorum-state file when scaling up/down controllers (Waiting proposal KIP-853) +# https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Voter+Changes +if [[ -f "${KAFKA_DATA_DIR}/__cluster_metadata-0/quorum-state" ]] && grep -q "^controller.quorum.voters=" "$KAFKA_CONF_FILE" && kafka_kraft_quorum_voters_changed; then + warn "Detected inconsitences between controller.quorum.voters and quorum-state, removing it..." + rm -f "${KAFKA_DATA_DIR}/__cluster_metadata-0/quorum-state" fi # Ensure custom initialization scripts are executed kafka_custom_init_scripts diff --git a/bitnami/kafka/3.2/debian-11/rootfs/opt/bitnami/scripts/libkafka.sh b/bitnami/kafka/3.2/debian-11/rootfs/opt/bitnami/scripts/libkafka.sh index 530f80bea419a..c38d07d1d41ad 100644 --- a/bitnami/kafka/3.2/debian-11/rootfs/opt/bitnami/scripts/libkafka.sh +++ b/bitnami/kafka/3.2/debian-11/rootfs/opt/bitnami/scripts/libkafka.sh @@ -54,11 +54,110 @@ kafka_common_conf_set() { fi } +######################## +# Returns true if at least one listener is configured using SSL +# Globals: +# KAFKA_CFG_LISTENERS +# KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP +# Arguments: +# None +# Returns: +# true/false +######################### +kafka_has_ssl_listener(){ + if ! is_empty_value "${KAFKA_CFG_LISTENERS:-}"; then + if is_empty_value "${KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP:-}"; then + if [[ "$KAFKA_CFG_LISTENERS" =~ SSL: || "$KAFKA_CFG_LISTENERS" =~ SASL_SSL: ]]; then + return + fi + else + read -r -a protocol_maps <<<"$(tr ',' ' ' <<<"$KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP")" + for protocol_map in "${protocol_maps[@]}"; do + read -r -a map <<<"$(tr ':' ' ' <<<"$protocol_map")" + # Obtain the listener and protocol from protocol map string, e.g. CONTROLLER:PLAINTEXT + listener="${map[0]}" + protocol="${map[1]}" + if [[ "$protocol" = "SSL" || "$protocol" = "SASL_SSL" ]]; then + if [[ "$KAFKA_CFG_LISTENERS" =~ $listener ]]; then + return + fi + fi + done + fi + fi + return 1 +} + +######################## +# Returns true if at least one listener is configured using SASL +# Globals: +# KAFKA_CFG_LISTENERS +# KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP +# Arguments: +# None +# Returns: +# true/false +######################### +kafka_has_sasl_listener(){ + if ! is_empty_value "${KAFKA_CFG_LISTENERS:-}"; then + if is_empty_value "${KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP:-}"; then + if [[ "$KAFKA_CFG_LISTENERS" =~ SASL_PLAINTEXT: ]] || [[ "$KAFKA_CFG_LISTENERS" =~ SASL_SSL: ]]; then + return + fi + else + read -r -a protocol_maps <<<"$(tr ',' ' ' <<<"$KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP")" + for protocol_map in "${protocol_maps[@]}"; do + read -r -a map <<<"$(tr ':' ' ' <<<"$protocol_map")" + # Obtain the listener and protocol from protocol map string, e.g. CONTROLLER:PLAINTEXT + listener="${map[0]}" + protocol="${map[1]}" + if [[ "$protocol" = "SASL_PLAINTEXT" || "$protocol" = "SASL_SSL" ]]; then + if [[ "$KAFKA_CFG_LISTENERS" =~ $listener ]]; then + return + fi + fi + done + fi + fi + return 1 +} + +######################## +# Returns true if at least one listener is configured using plaintext +# Globals: +# KAFKA_CFG_LISTENERS +# KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP +# Arguments: +# None +# Returns: +# true/false +######################### +kafka_has_plaintext_listener(){ + if ! is_empty_value "${KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP:-}"; then + read -r -a protocol_maps <<<"$(tr ',' ' ' <<<"$KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP")" + for protocol_map in "${protocol_maps[@]}"; do + read -r -a map <<<"$(tr ':' ' ' <<<"$protocol_map")" + # Obtain the listener and protocol from protocol map string, e.g. CONTROLLER:PLAINTEXT + listener="${map[0]}" + protocol="${map[1]}" + if [[ "$protocol" = "PLAINTEXT" ]]; then + if is_empty_value "${KAFKA_CFG_LISTENERS:-}" || [[ "$KAFKA_CFG_LISTENERS" =~ $listener ]]; then + return + fi + fi + done + else + if is_empty_value "${KAFKA_CFG_LISTENERS:-}" || [[ "$KAFKA_CFG_LISTENERS" =~ PLAINTEXT: ]]; then + return + fi + fi + return 1 +} + ######################## # Backwards compatibility measure to configure the TLS truststore locations # Globals: # KAFKA_CONF_FILE -# KAFKA_ZK_CONF_FILE # Arguments: # None # Returns: @@ -73,7 +172,7 @@ kafka_configure_default_truststore_locations() { # use this logic that sets the KAFKA_TLS_*_FILE variables to the previously assumed locations in case it is not set # Kafka truststore - if { [[ "${KAFKA_CFG_LISTENERS:-}" =~ SSL ]] || [[ "${KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP:-}" =~ SSL ]]; } && is_empty_value "$KAFKA_TLS_TRUSTSTORE_FILE"; then + if kafka_has_ssl_listener && is_empty_value "${KAFKA_TLS_TRUSTSTORE_FILE:-}"; then local kafka_truststore_filename="kafka.truststore.jks" [[ "$KAFKA_TLS_TYPE" = "PEM" ]] && kafka_truststore_filename="kafka.truststore.pem" if [[ -f "${KAFKA_CERTS_DIR}/${kafka_truststore_filename}" ]]; then @@ -85,7 +184,7 @@ kafka_configure_default_truststore_locations() { fi fi # Zookeeper truststore - if [[ "${KAFKA_ZOOKEEPER_PROTOCOL:-}" =~ SSL ]] && is_empty_value "$KAFKA_ZOOKEEPER_TLS_TRUSTSTORE_FILE"; then + if [[ "${KAFKA_ZOOKEEPER_PROTOCOL:-}" =~ SSL ]] && is_empty_value "${KAFKA_ZOOKEEPER_TLS_TRUSTSTORE_FILE:-}"; then local zk_truststore_filename="zookeeper.truststore.jks" [[ "$KAFKA_ZOOKEEPER_TLS_TYPE" = "PEM" ]] && zk_truststore_filename="zookeeper.truststore.pem" if [[ -f "${KAFKA_CERTS_DIR}/${zk_truststore_filename}" ]]; then @@ -102,7 +201,6 @@ kafka_configure_default_truststore_locations() { # Set a configuration setting value to server.properties # Globals: # KAFKA_CONF_FILE -# KAFKA_ZK_CONF_FILE # Arguments: # $1 - key # $2 - values (array) @@ -110,7 +208,7 @@ kafka_configure_default_truststore_locations() { # None ######################### kafka_server_conf_set() { - kafka_common_conf_set "$(kafka_get_conf_file)" "$@" + kafka_common_conf_set "$KAFKA_CONF_FILE" "$@" } ######################## @@ -159,6 +257,9 @@ kafka_create_alias_environment_variables() { suffixes=( "ADVERTISED_LISTENERS" "BROKER_ID" + "NODE_ID" + "CONTROLLER_QUORUM_VOTERS" + "PROCESS_ROLES" "DEFAULT_REPLICATION_FACTOR" "DELETE_TOPIC_ENABLE" "INTER_BROKER_LISTENER_NAME" @@ -194,6 +295,7 @@ kafka_create_alias_environment_variables() { kafka_declare_alias_env "KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE" "KAFKA_AUTO_CREATE_TOPICS_ENABLE" kafka_declare_alias_env "KAFKA_CLIENT_USERS" "KAFKA_BROKER_USER" kafka_declare_alias_env "KAFKA_CLIENT_PASSWORDS" "KAFKA_BROKER_PASSWORD" + kafka_declare_alias_env "KAFKA_CLIENT_LISTENER_NAME" "KAFKA_CLIENT_LISTENER" for s in "${suffixes[@]}"; do kafka_declare_alias_env "KAFKA_CFG_${s}" "KAFKA_${s}" done @@ -211,122 +313,176 @@ kafka_create_alias_environment_variables() { kafka_validate() { debug "Validating settings in KAFKA_* env vars..." local error_code=0 - local internal_port - local client_port # Auxiliary functions print_validation_error() { error "$1" error_code=1 } - check_allowed_listener_port() { - local -r total="$#" - for i in $(seq 1 "$((total - 1))"); do - for j in $(seq "$((i + 1))" "$total"); do - if (("${!i}" == "${!j}")); then - print_validation_error "There are listeners bound to the same port" - fi - done - done - } - check_conflicting_listener_ports() { - local validate_port_args=() - ! am_i_root && validate_port_args+=("-unprivileged") - if ! err=$(validate_port "${validate_port_args[@]}" "$1"); then - print_validation_error "An invalid port was specified in the environment variable KAFKA_CFG_LISTENERS: $err" - fi - } check_multi_value() { if [[ " ${2} " != *" ${!1} "* ]]; then print_validation_error "The allowed values for ${1} are: ${2}" fi } - - if is_boolean_yes "$KAFKA_ENABLE_KRAFT"; then - if [[ -n "${KAFKA_CFG_NODE_ID:-}" ]] || [[ -n "${KAFKA_CFG_CONTROLLER_QUORUM_VOTERS:-}" ]]; then - if [[ -z "${KAFKA_CFG_NODE_ID:-}" ]]; then - print_validation_error "KRaft requires KAFKA_CFG_NODE_ID to be set for the quorum controller" - fi - if [[ -z "$KAFKA_CFG_CONTROLLER_QUORUM_VOTERS" ]]; then - print_validation_error "KRaft requires KAFKA_CFG_CONTROLLER_QUORUM_VOTERS to be set" + # If process.roles configured, check its values are valid and perform additional checks for each + check_kraft_process_roles() { + read -r -a roles_list <<<"$(tr ',;' ' ' <<<"$KAFKA_CFG_PROCESS_ROLES")" + for role in "${roles_list[@]}"; do + case "$role" in + broker) ;; + controller) + if is_empty_value "${KAFKA_CFG_CONTROLLER_LISTENER_NAMES:-}"; then + print_validation_error "Role 'controller' enabled but environment variable KAFKA_CFG_CONTROLLER_LISTENER_NAMES was not provided." + fi + if is_empty_value "${KAFKA_CFG_LISTENERS:-}" || [[ ! "$KAFKA_CFG_LISTENERS" =~ ${KAFKA_CFG_CONTROLLER_LISTENER_NAMES} ]]; then + print_validation_error "Role 'controller' enabled but listener ${KAFKA_CFG_CONTROLLER_LISTENER_NAMES} not found in KAFKA_CFG_LISTENERS." + fi + ;; + *) + print_validation_error "Invalid KRaft process role '$role'. Supported roles are 'broker,controller'" + ;; + esac + done + } + # Check all listeners are using a unique and valid port + check_listener_ports(){ + check_allowed_port() { + local port="${1:?missing port variable}" + local -a validate_port_args=() + ! am_i_root && validate_port_args+=("-unprivileged") + validate_port_args+=("$port") + if ! err=$(validate_port "${validate_port_args[@]}"); then + print_validation_error "An invalid port ${port} was specified in the environment variable KAFKA_CFG_LISTENERS: ${err}." fi + } - old_IFS=$IFS - IFS=',' - read -r -a voters <<< "$KAFKA_CFG_CONTROLLER_QUORUM_VOTERS" - IFS=${old_IFS} - node_id_matched=false - for voter in "${voters[@]}"; do - if [[ "$voter" == *"$KAFKA_CFG_NODE_ID"* ]]; then - node_id_matched=true - break + read -r -a listeners <<<"$(tr ',' ' ' <<<"${KAFKA_CFG_LISTENERS:-}")" + local -a ports=() + for listener in "${listeners[@]}"; do + read -r -a arr <<<"$(tr ':' ' ' <<<"$listener")" + # Obtain the port from listener string, e.g. PLAINTEXT://:9092 + port="${arr[2]}" + check_allowed_port "$port" + ports+=("$port") + done + # Check each listener is using an unique port + local -a unique_ports=() + read -r -a unique_ports <<< "$(echo "${ports[@]}" | tr ' ' '\n' | sort -u | tr '\n' ' ')" + if [[ "${#ports[@]}" != "${#unique_ports[@]}" ]]; then + print_validation_error "There are listeners bound to the same port" + fi + } + check_listener_protocols(){ + local -r allowed_protocols=("PLAINTEXT" "SASL_PLAINTEXT" "SASL_SSL" "SSL") + read -r -a protocol_maps <<<"$(tr ',' ' ' <<<"$KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP")" + for protocol_map in "${protocol_maps[@]}"; do + read -r -a map <<<"$(tr ':' ' ' <<<"$protocol_map")" + # Obtain the listener and protocol from protocol map string, e.g. CONTROLLER:PLAINTEXT + listener="${map[0]}" + protocol="${map[1]}" + # Check protocol in allowed list + if [[ ! "${allowed_protocols[*]}" =~ $protocol ]]; then + print_validation_error "Authentication protocol ${protocol} is not supported!" + fi + # If inter-broker listener configured with SASL, ensure KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL is set + if [[ "$listener" = "${KAFKA_CFG_INTER_BROKER_LISTENER_NAME:-INTERNAL}" ]]; then + if [[ "$protocol" = "SASL_PLAINTEXT" ]] || [[ "$protocol" = "SASL_SSL" ]]; then + if is_empty_value "${KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL:-}"; then + print_validation_error "When using SASL for inter broker comunication the mechanism should be provided using KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL" + fi + if is_empty_value "${KAFKA_INTER_BROKER_USER:-}" || is_empty_value "${KAFKA_INTER_BROKER_PASSWORD:-}"; then + print_validation_error "In order to configure SASL authentication for Kafka inter-broker communications, you must provide the SASL credentials. Set the environment variables KAFKA_INTER_BROKER_USER and KAFKA_INTER_BROKER_PASSWORD to configure the credentials for SASL authentication with between brokers." + fi + fi + # If controller listener configured with SASL, ensure KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL is set + elif [[ "${KAFKA_CFG_CONTROLLER_LISTENER_NAMES:-CONTROLLER}" =~ $listener ]]; then + if [[ "$protocol" = "SASL_PLAINTEXT" ]] || [[ "$protocol" = "SASL_SSL" ]]; then + if is_empty_value "${KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL:-}"; then + print_validation_error "When using SASL for controller comunication the mechanism should be provided at KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL" + fi + if is_empty_value "${KAFKA_CONTROLLER_USER:-}" || is_empty_value "${KAFKA_CONTROLLER_PASSWORD:-}"; then + print_validation_error "In order to configure SASL authentication for Kafka control plane communications, you must provide the SASL credentials. Set the environment variables KAFKA_CONTROLLER_USER and KAFKA_CONTROLLER_PASSWORD to configure the credentials for SASL authentication with between controllers." + fi + fi + else + if [[ "$protocol" = "SASL_PLAINTEXT" ]] || [[ "$protocol" = "SASL_SSL" ]]; then + if is_empty_value "${KAFKA_CLIENT_USERS:-}" || is_empty_value "${KAFKA_CLIENT_PASSWORDS:-}"; then + print_validation_error "In order to configure SASL authentication for Kafka, you must provide the SASL credentials. Set the environment variables KAFKA_CLIENT_USERS and KAFKA_CLIENT_PASSWORDS to configure the credentials for SASL authentication with clients." + fi fi - done - if [[ "$node_id_matched" == false ]]; then - warn "KAFKA_CFG_NODE_ID must match what is set in KAFKA_CFG_CONTROLLER_QUORUM_VOTERS" fi - fi + done + } - if [[ -n "${KAFKA_CFG_PROCESS_ROLES:-}" ]]; then - old_IFS=$IFS - IFS=',' - read -r -a roles <<< "$KAFKA_CFG_PROCESS_ROLES" - IFS=${old_IFS} - controller_exists=false - for val in "${roles[@]}"; - do - if [[ "$val" == "controller" ]]; then - controller_exists=true - break - fi - done - if [[ "$controller_exists" == false ]]; then - warn "KAFKA_CFG_PROCESS_ROLES must include 'controller' for KRaft" - fi + if is_empty_value "${KAFKA_CFG_PROCESS_ROLES:-}" && is_empty_value "${KAFKA_CFG_ZOOKEEPER_CONNECT:-}"; then + print_validation_error "Kafka haven't been configured to work in either Raft or Zookeper mode. Please make sure at least one of the modes is configured." + fi + # Check KRaft mode + if ! is_empty_value "${KAFKA_CFG_PROCESS_ROLES:-}"; then + # Raft + if [[ "$(kafka_get_version)" =~ ^3\.2\. ]]; then + warn "KRaft mode is not production-ready in Kafka 3.2, for production environments, we recommend upgrading " fi - if [[ -n "${KAFKA_CFG_LISTENERS:-}" ]]; then - old_IFS=$IFS - IFS=',' - read -r -a listener <<< "$KAFKA_CFG_LISTENERS" - IFS=${old_IFS} - controller_exists=false - for val in "${listener[@]}"; - do - if [[ $val == *"CONTROLLER"* ]]; then - controller_exists=true - break + # Only allow Zookeeper configuration if migration mode is enabled + if ! is_empty_value "${KAFKA_CFG_ZOOKEEPER_CONNECT:-}" && + { is_empty_value "${KAFKA_CFG_ZOOKEEPER_METADATA_MIGRATION_ENABLE:-}" || ! is_boolean_yes "$KAFKA_CFG_ZOOKEEPER_METADATA_MIGRATION_ENABLE"; }; then + print_validation_error "Both KRaft mode and Zookeeper modes are configured, but KAFKA_CFG_ZOOKEEPER_METADATA_MIGRATION_ENABLE is not enabled" + fi + if is_empty_value "${KAFKA_CFG_NODE_ID:-}"; then + print_validation_error "KRaft mode requires an unique node.id, please set the environment variable KAFKA_CFG_NODE_ID" + fi + if is_empty_value "${KAFKA_CFG_CONTROLLER_QUORUM_VOTERS:-}"; then + print_validation_error "KRaft mode requires KAFKA_CFG_CONTROLLER_QUORUM_VOTERS to be set" + fi + check_kraft_process_roles + fi + # Check Zookeeper mode + if ! is_empty_value "${KAFKA_CFG_ZOOKEEPER_CONNECT:-}"; then + # If SSL/SASL_SSL protocol configured, check certificates are provided + if [[ "$KAFKA_ZOOKEEPER_PROTOCOL" =~ SSL ]]; then + if [[ "$KAFKA_ZOOKEEPER_TLS_TYPE" = "JKS" ]]; then + # Fail if truststore is not provided + if [[ ! -f "$KAFKA_ZOOKEEPER_TLS_TRUSTSTORE_FILE" ]]; then + print_validation_error "In order to configure the TLS encryption for Zookeeper with JKS certs you must mount your zookeeper.truststore.jks cert to the ${KAFKA_MOUNTED_CONF_DIR}/certs directory." + fi + # Warn if keystore is not provided, only required if Zookeper mTLS is enabled (ZOO_TLS_CLIENT_AUTH) + if [[ ! -f "${KAFKA_CERTS_DIR}/zookeeper.keystore.jks" ]] && [[ ! -f "${KAFKA_MOUNTED_CONF_DIR}/certs/zookeeper.keystore.jks" ]]; then + warn "In order to configure the mTLS for Zookeeper with JKS certs you must mount your zookeeper.keystore.jks cert to the ${KAFKA_MOUNTED_CONF_DIR}/certs directory." + fi + elif [[ "$KAFKA_ZOOKEEPER_TLS_TYPE" = "PEM" ]]; then + # Fail if CA / validation cert is not provided + if [[ ! -f "$KAFKA_ZOOKEEPER_TLS_TRUSTSTORE_FILE" ]]; then + print_validation_error "In order to configure the TLS encryption for Zookeeper with PEM certs you must mount your zookeeper.truststore.pem cert to the ${KAFKA_MOUNTED_CONF_DIR}/certs directory." + fi + # Warn if node key or cert are not provided, only required if Zookeper mTLS is enabled (ZOO_TLS_CLIENT_AUTH) + if { [[ ! -f "${KAFKA_CERTS_DIR}/zookeeper.keystore.pem" ]] || [[ ! -f "${KAFKA_CERTS_DIR}/zookeeper.keystore.key" ]]; } && + { [[ ! -f "${KAFKA_MOUNTED_CONF_DIR}/certs/zookeeper.keystore.pem" ]] || [[ ! -f "${KAFKA_MOUNTED_CONF_DIR}/certs/zookeeper.keystore.key" ]]; }; then + warn "In order to configure the mTLS for Zookeeper with PEM certs you must mount your zookeeper.keystore.pem cert and zookeeper.keystore.key key to the ${KAFKA_MOUNTED_CONF_DIR}/certs directory." + fi fi - done - if [[ "$controller_exists" == false ]]; then - warn "KAFKA_CFG_LISTENERS must include a listener for CONTROLLER" + fi + # If SASL/SASL_SSL protocol configured, check certificates are provided + if [[ "$KAFKA_ZOOKEEPER_PROTOCOL" =~ SASL ]]; then + if is_empty_value "${KAFKA_ZOOKEEPER_USER:-}" || is_empty_value "${KAFKA_ZOOKEEPER_PASSWORD:-}"; then + print_validation_error "In order to configure SASL authentication for Kafka, you must provide the SASL credentials. Set the environment variables KAFKA_ZOOKEEPER_USER and KAFKA_ZOOKEEPER_PASSWORD, to configure the credentials for SASL authentication with Zookeeper." fi fi + # If using plaintext protocol, check it is explicitly allowed + if [[ "$KAFKA_ZOOKEEPER_PROTOCOL" = "PLAINTEXT" ]]; then + warn "The KAFKA_ZOOKEEPER_PROTOCOL environment variable does not configure SASL and/or SSL, this setting is not recommended for production environments." + fi fi - - if [[ ${KAFKA_CFG_LISTENERS:-} =~ INTERNAL://:([0-9]*) ]]; then - internal_port="${BASH_REMATCH[1]}" - check_allowed_listener_port "$internal_port" - fi - if [[ ${KAFKA_CFG_LISTENERS:-} =~ CLIENT://:([0-9]*) ]]; then - client_port="${BASH_REMATCH[1]}" - check_allowed_listener_port "$client_port" - fi - [[ -n ${internal_port:-} && -n ${client_port:-} ]] && check_conflicting_listener_ports "$internal_port" "$client_port" - if [[ -n "${KAFKA_PORT_NUMBER:-}" ]] || [[ -n "${KAFKA_CFG_PORT:-}" ]]; then - warn "The environment variables KAFKA_PORT_NUMBER and KAFKA_CFG_PORT are deprecated, you can specify the port number to use for each listener using the KAFKA_CFG_LISTENERS environment variable instead." - fi - - read -r -a users <<<"$(tr ',;' ' ' <<<"${KAFKA_CLIENT_USERS}")" - read -r -a passwords <<<"$(tr ',;' ' ' <<<"${KAFKA_CLIENT_PASSWORDS}")" - if [[ "${#users[@]}" -ne "${#passwords[@]}" ]]; then - print_validation_error "Specify the same number of passwords on KAFKA_CLIENT_PASSWORDS as the number of users on KAFKA_CLIENT_USERS!" - fi - - if is_boolean_yes "$ALLOW_PLAINTEXT_LISTENER"; then - warn "You set the environment variable ALLOW_PLAINTEXT_LISTENER=$ALLOW_PLAINTEXT_LISTENER. For safety reasons, do not use this flag in a production environment." + # Check listener ports are unique and allowed + check_listener_ports + # Check listeners are mapped to a valid security protocol + check_listener_protocols + # Warn users if plaintext listeners are configured + if kafka_has_plaintext_listener; then + warn "Kafka has been configured with a PLAINTEXT listener, this setting is not recommended for production environments." fi - if [[ "${KAFKA_CFG_LISTENERS:-}" =~ SSL ]] || [[ "${KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP:-}" =~ SSL ]]; then + # If SSL/SASL_SSL listeners configured, check certificates are provided + if kafka_has_ssl_listener; then if [[ "$KAFKA_TLS_TYPE" = "JKS" ]] && { [[ ! -f "${KAFKA_CERTS_DIR}/kafka.keystore.jks" ]] || [[ ! -f "$KAFKA_TLS_TRUSTSTORE_FILE" ]]; } && { [[ ! -f "${KAFKA_MOUNTED_CONF_DIR}/certs/kafka.keystore.jks" ]] || [[ ! -f "$KAFKA_TLS_TRUSTSTORE_FILE" ]]; }; then @@ -336,174 +492,173 @@ kafka_validate() { { [[ ! -f "${KAFKA_MOUNTED_CONF_DIR}/certs/kafka.keystore.pem" ]] || [[ ! -f "${KAFKA_MOUNTED_CONF_DIR}/certs/kafka.keystore.key" ]] || [[ ! -f "$KAFKA_TLS_TRUSTSTORE_FILE" ]]; }; then print_validation_error "In order to configure the TLS encryption for Kafka with PEM certs you must mount your kafka.keystore.pem, kafka.keystore.key and kafka.truststore.pem certs to the ${KAFKA_MOUNTED_CONF_DIR}/certs directory." fi - elif [[ "${KAFKA_CFG_LISTENERS:-}" =~ SASL ]] || [[ "${KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP:-}" =~ SASL ]]; then - if [[ -z "$KAFKA_CLIENT_PASSWORDS" && -z "$KAFKA_INTER_BROKER_PASSWORD" ]]; then - print_validation_error "In order to configure SASL authentication for Kafka, you must provide the SASL credentials. Set the environment variables KAFKA_CLIENT_USERS and KAFKA_CLIENT_PASSWORDS, to configure the credentials for SASL authentication with clients, or set the environment variables KAFKA_INTER_BROKER_USER and KAFKA_INTER_BROKER_PASSWORD, to configure the credentials for SASL authentication between brokers." - fi - elif ! is_boolean_yes "$ALLOW_PLAINTEXT_LISTENER"; then - print_validation_error "The KAFKA_CFG_LISTENERS environment variable does not configure a secure listener. Set the environment variable ALLOW_PLAINTEXT_LISTENER=yes to allow the container to be started with a plaintext listener. This is only recommended for development." - fi - if ! is_boolean_yes "$KAFKA_ENABLE_KRAFT"; then - if [[ "${KAFKA_ZOOKEEPER_PROTOCOL}" =~ SSL ]]; then - if [[ "$KAFKA_ZOOKEEPER_TLS_TYPE" = "JKS" ]] && - [[ ! -f "$KAFKA_ZOOKEEPER_TLS_TRUSTSTORE_FILE" ]]; then - print_validation_error "In order to configure the TLS encryption for Zookeeper with JKS certs you must mount your zookeeper.truststore.jks cert to the ${KAFKA_MOUNTED_CONF_DIR}/certs directory." - elif [[ "$KAFKA_ZOOKEEPER_TLS_TYPE" = "PEM" ]] && - [[ ! -f "$KAFKA_ZOOKEEPER_TLS_TRUSTSTORE_FILE" ]]; then - print_validation_error "In order to configure the TLS encryption for Zookeeper with PEM certs you must mount your zookeeper.truststore.pem cert to the ${KAFKA_MOUNTED_CONF_DIR}/certs directory." - fi - if [[ "$KAFKA_ZOOKEEPER_TLS_TYPE" = "JKS" ]] && - [[ ! -f "${KAFKA_CERTS_DIR}/zookeeper.keystore.jks" ]] && [[ ! -f "${KAFKA_MOUNTED_CONF_DIR}/certs/zookeeper.keystore.jks" ]]; then - warn "In order to configure the mTLS for Zookeeper with JKS certs you must mount your zookeeper.keystore.jks cert to the ${KAFKA_MOUNTED_CONF_DIR}/certs directory." - elif [[ "$KAFKA_ZOOKEEPER_TLS_TYPE" = "PEM" ]] && - { [[ ! -f "${KAFKA_CERTS_DIR}/zookeeper.keystore.pem" ]] || [[ ! -f "${KAFKA_CERTS_DIR}/zookeeper.keystore.key" ]]; } && - { [[ ! -f "${KAFKA_MOUNTED_CONF_DIR}/certs/zookeeper.keystore.pem" ]] || [[ ! -f "${KAFKA_MOUNTED_CONF_DIR}/certs/zookeeper.keystore.key" ]]; }; then - warn "In order to configure the mTLS for Zookeeper with PEM certs you must mount your zookeeper.keystore.pem cert and zookeeper.keystore.key key to the ${KAFKA_MOUNTED_CONF_DIR}/certs directory." - fi - elif [[ "${KAFKA_ZOOKEEPER_PROTOCOL}" =~ SASL ]]; then - if [[ -z "$KAFKA_ZOOKEEPER_USER" ]] || [[ -z "$KAFKA_ZOOKEEPER_PASSWORD" ]]; then - print_validation_error "In order to configure SASL authentication for Kafka, you must provide the SASL credentials. Set the environment variables KAFKA_ZOOKEEPER_USER and KAFKA_ZOOKEEPER_PASSWORD, to configure the credentials for SASL authentication with Zookeeper." - fi - elif ! is_boolean_yes "$ALLOW_PLAINTEXT_LISTENER"; then - print_validation_error "The KAFKA_ZOOKEEPER_PROTOCOL environment variable does not configure a secure protocol. Set the environment variable ALLOW_PLAINTEXT_LISTENER=yes to allow the container to be started with a plaintext listener. This is only recommended for development." + fi + # If SASL/SASL_SSL listeners configured, check passwords are provided + if kafka_has_sasl_listener; then + if is_empty_value "${KAFKA_CFG_SASL_ENABLED_MECHANISMS:-}"; then + print_validation_error "Specified SASL protocol but no SASL mechanisms provided in KAFKA_CFG_SASL_ENABLED_MECHANISMS" fi fi + # Check users and passwords lists are the same size + read -r -a users <<<"$(tr ',;' ' ' <<<"${KAFKA_CLIENT_USERS:-}")" + read -r -a passwords <<<"$(tr ',;' ' ' <<<"${KAFKA_CLIENT_PASSWORDS:-}")" + if [[ "${#users[@]}" -ne "${#passwords[@]}" ]]; then + print_validation_error "Specify the same number of passwords on KAFKA_CLIENT_PASSWORDS as the number of users on KAFKA_CLIENT_USERS!" + fi check_multi_value "KAFKA_TLS_TYPE" "JKS PEM" check_multi_value "KAFKA_ZOOKEEPER_TLS_TYPE" "JKS PEM" + check_multi_value "KAFKA_ZOOKEEPER_PROTOCOL" "PLAINTEXT SASL SSL SASL_SSL" check_multi_value "KAFKA_TLS_CLIENT_AUTH" "none requested required" [[ "$error_code" -eq 0 ]] || return "$error_code" } + ######################## -# Generate JAAS authentication file +# Get kafka version # Globals: # KAFKA_* # Arguments: -# $1 - Authentication protocol to use for the internal listener -# $2 - Authentication protocol to use for the client listener +# None +# Returns: +# version +######################### +kafka_get_version() { + local -a cmd=("kafka-topics.sh" "--version") + am_i_root && cmd=("run_as_user" "$KAFKA_DAEMON_USER" "${cmd[@]}") + + read -r -a ver_split <<< "$("${cmd[@]}")" + echo "${ver_split[0]}" +} + +######################### +# Configure JAAS for a given listener and SASL mechanisms +# Globals: +# KAFKA_* +# Arguments: +# $1 - Name of the listener JAAS will be configured for +# $2 - Comma-separated list of SASL mechanisms to configure +# $3 - Comma-separated list of usernames +# $4 - Comma-separated list of passwords # Returns: # None ######################### -kafka_generate_jaas_authentication_file() { - local -r internal_protocol="${1:-}" - local -r client_protocol="${2:-}" - - if [[ ! -f "${KAFKA_CONF_DIR}/kafka_jaas.conf" ]]; then - info "Generating JAAS authentication file" - - read -r -a users <<<"$(tr ',;' ' ' <<<"${KAFKA_CLIENT_USERS:-}")" - read -r -a passwords <<<"$(tr ',;' ' ' <<<"${KAFKA_CLIENT_PASSWORDS:-}")" - - if [[ "${client_protocol:-}" =~ SASL ]]; then - if [[ "${KAFKA_CFG_SASL_ENABLED_MECHANISMS:-}" =~ PLAIN ]]; then - cat >>"${KAFKA_CONF_DIR}/kafka_jaas.conf" <>"${KAFKA_CONF_DIR}/kafka_jaas.conf" <>"${KAFKA_CONF_DIR}/kafka_jaas.conf" <>"${KAFKA_CONF_DIR}/kafka_jaas.conf" <>"${KAFKA_CONF_DIR}/kafka_jaas.conf" <>"${KAFKA_CONF_DIR}/kafka_jaas.conf" <>"${KAFKA_CONF_DIR}/kafka_jaas.conf" <>"${KAFKA_CONF_DIR}/kafka_jaas.conf" <>"${KAFKA_CONF_DIR}/kafka_jaas.conf" <>"${KAFKA_CONF_DIR}/kafka_jaas.conf" <>"${KAFKA_CONF_DIR}/kafka_jaas.conf" <>"${KAFKA_CONF_DIR}/kafka_jaas.conf" <>"${KAFKA_CONF_DIR}/kafka_jaas.conf" <>"${KAFKA_CONF_DIR}/kafka_jaas.conf" <>"${KAFKA_CONF_DIR}/kafka_jaas.conf" < 1{print line"\\n\\"}{line=$0;}END{print $0" "}' <"${1:?missing file}" @@ -559,7 +722,7 @@ kafka_configure_ssl() { remove_previous_cert_value() { local key="${1:?missing key}" files=( - "$(kafka_get_conf_file)" + "${KAFKA_CONF_FILE}" "${KAFKA_CONF_DIR}/producer.properties" "${KAFKA_CONF_DIR}/consumer.properties" ) @@ -581,117 +744,8 @@ kafka_configure_ssl() { elif [[ "$KAFKA_TLS_TYPE" = "JKS" ]]; then configure_both ssl.keystore.location "$KAFKA_CERTS_DIR"/kafka.keystore.jks configure_both ssl.truststore.location "$kafka_truststore_location" - ! is_empty_value "$KAFKA_CERTIFICATE_PASSWORD" && configure_both ssl.keystore.password "$KAFKA_CERTIFICATE_PASSWORD" - ! is_empty_value "$KAFKA_CERTIFICATE_PASSWORD" && configure_both ssl.truststore.password "$KAFKA_CERTIFICATE_PASSWORD" - fi - SSL_CONFIGURED=true # prevents configuring SSL more than once -} - -######################## -# Configure Kafka for inter-broker communications -# Globals: -# None -# Arguments: -# $1 - Authentication protocol to use for the internal listener -# Returns: -# None -######################### -kafka_configure_internal_communications() { - local -r protocol="${1:?missing environment variable protocol}" - local -r allowed_protocols=("PLAINTEXT" "SASL_PLAINTEXT" "SASL_SSL" "SSL") - info "Configuring Kafka for inter-broker communications with ${protocol} authentication." - - if [[ "${allowed_protocols[*]}" =~ $protocol ]]; then - kafka_server_conf_set security.inter.broker.protocol "$protocol" - if [[ "$protocol" = "PLAINTEXT" ]]; then - warn "Inter-broker communications are configured as PLAINTEXT. This is not safe for production environments." - fi - if [[ "$protocol" = "SASL_PLAINTEXT" ]] || [[ "$protocol" = "SASL_SSL" ]]; then - # IMPORTANT: Do not confuse SASL/PLAIN with PLAINTEXT - # For more information, see: https://docs.confluent.io/current/kafka/authentication_sasl/authentication_sasl_plain.html#sasl-plain-overview) - if [[ -n "$KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL" ]]; then - kafka_server_conf_set sasl.mechanism.inter.broker.protocol "$KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL" - else - error "When using SASL for inter broker comunication the mechanism should be provided at KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL" - exit 1 - fi - fi - if [[ "$protocol" = "SASL_SSL" ]] || [[ "$protocol" = "SSL" ]]; then - kafka_configure_ssl - # We need to enable 2 way authentication on SASL_SSL so brokers authenticate each other. - # It won't affect client communications unless the SSL protocol is for them. - kafka_server_conf_set ssl.client.auth "$KAFKA_TLS_CLIENT_AUTH" - fi - else - error "Authentication protocol ${protocol} is not supported!" - exit 1 - fi -} - -######################## -# Configure Kafka for client communications -# Globals: -# None -# Arguments: -# $1 - Authentication protocol to use for the client listener -# Returns: -# None -######################### -kafka_configure_client_communications() { - local -r protocol="${1:?missing environment variable protocol}" - local -r allowed_protocols=("PLAINTEXT" "SASL_PLAINTEXT" "SASL_SSL" "SSL") - info "Configuring Kafka for client communications with ${protocol} authentication." - - if [[ "${allowed_protocols[*]}" =~ ${protocol} ]]; then - kafka_server_conf_set security.inter.broker.protocol "$protocol" - if [[ "$protocol" = "PLAINTEXT" ]]; then - warn "Client communications are configured using PLAINTEXT listeners. For safety reasons, do not use this in a production environment." - fi - if [[ "$protocol" = "SASL_PLAINTEXT" ]] || [[ "$protocol" = "SASL_SSL" ]]; then - # The below lines would need to be updated to support other SASL implementations (i.e. GSSAPI) - # IMPORTANT: Do not confuse SASL/PLAIN with PLAINTEXT - # For more information, see: https://docs.confluent.io/current/kafka/authentication_sasl/authentication_sasl_plain.html#sasl-plain-overview) - kafka_server_conf_set sasl.mechanism.inter.broker.protocol "$KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL" - fi - if [[ "$protocol" = "SASL_SSL" ]] || [[ "$protocol" = "SSL" ]]; then - kafka_configure_ssl - fi - if [[ "$protocol" = "SSL" ]]; then - kafka_server_conf_set ssl.client.auth "$KAFKA_TLS_CLIENT_AUTH" - fi - else - error "Authentication protocol ${protocol} is not supported!" - exit 1 - fi -} - -######################## -# Configure Kafka for external-client communications -# Globals: -# None -# Arguments: -# $1 - Authentication protocol to use for the external-client listener -# Returns: -# None -######################### -kafka_configure_external_client_communications() { - local -r protocol="${1:?missing environment variable protocol}" - local -r allowed_protocols=("PLAINTEXT" "SASL_PLAINTEXT" "SASL_SSL" "SSL") - info "Configuring Kafka for external client communications with ${protocol} authentication." - - if [[ "${allowed_protocols[*]}" =~ ${protocol} ]]; then - if [[ "$protocol" = "PLAINTEXT" ]]; then - warn "External client communications are configured using PLAINTEXT listeners. For safety reasons, do not use this in a production environment." - fi - if [[ "$protocol" = "SASL_SSL" ]] || [[ "$protocol" = "SSL" ]]; then - kafka_configure_ssl - fi - if [[ "$protocol" = "SSL" ]]; then - kafka_server_conf_set ssl.client.auth "$KAFKA_TLS_CLIENT_AUTH" - fi - else - error "Authentication protocol ${protocol} is not supported!" - exit 1 + ! is_empty_value "${KAFKA_CERTIFICATE_PASSWORD:-}" && configure_both ssl.keystore.password "$KAFKA_CERTIFICATE_PASSWORD" + ! is_empty_value "${KAFKA_CERTIFICATE_PASSWORD:-}" && configure_both ssl.truststore.password "$KAFKA_CERTIFICATE_PASSWORD" fi } @@ -704,7 +758,7 @@ kafka_configure_external_client_communications() { # Returns: # String ######################### -zookeeper_get_tls_config() { +kafka_zookeeper_configure_tls() { # Note that ZooKeeper does not support a key password different from the keystore password, # so be sure to set the key password in the keystore to be identical to the keystore password; # otherwise the connection attempt to Zookeeper will fail. @@ -720,13 +774,13 @@ zookeeper_get_tls_config() { keystore_location="${KAFKA_CERTS_DIR}/zookeeper.keystore.pem" fi - echo "-Dzookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty \ - -Dzookeeper.client.secure=true \ - -Dzookeeper.ssl.keyStore.location=${keystore_location} \ - -Dzookeeper.ssl.keyStore.password=${KAFKA_ZOOKEEPER_TLS_KEYSTORE_PASSWORD} \ - -Dzookeeper.ssl.trustStore.location=${kafka_zk_truststore_location} \ - -Dzookeeper.ssl.trustStore.password=${KAFKA_ZOOKEEPER_TLS_TRUSTSTORE_PASSWORD} \ - -Dzookeeper.ssl.hostnameVerification=${KAFKA_ZOOKEEPER_TLS_VERIFY_HOSTNAME}" + kafka_server_conf_set "zookeeper.clientCnxnSocket" "org.apache.zookeeper.ClientCnxnSocketNetty" + kafka_server_conf_set "zookeeper.client.secure" "true" + ! is_empty_value "${keystore_location:-}" && kafka_server_conf_set "zookeeper.ssl.keyStore.location" "${keystore_location}" + ! is_empty_value "${KAFKA_ZOOKEEPER_TLS_KEYSTORE_PASSWORD:-}" && kafka_server_conf_set "zookeeper.ssl.keyStore.password" "${KAFKA_ZOOKEEPER_TLS_KEYSTORE_PASSWORD}" + ! is_empty_value "${kafka_zk_truststore_location:-}" && kafka_server_conf_set "zookeeper.ssl.trustStore.location" "${kafka_zk_truststore_location}" + ! is_empty_value "${KAFKA_ZOOKEEPER_TLS_TRUSTSTORE_PASSWORD:-}" && kafka_server_conf_set "zookeeper.ssl.trustStore.password" "${KAFKA_ZOOKEEPER_TLS_TRUSTSTORE_PASSWORD}" + ! is_empty_value "${KAFKA_ZOOKEEPER_TLS_VERIFY_HOSTNAME:-}" && kafka_server_conf_set "zookeeper.ssl.hostnameVerification" "${KAFKA_ZOOKEEPER_TLS_VERIFY_HOSTNAME}" } ######################## @@ -741,8 +795,8 @@ zookeeper_get_tls_config() { kafka_configure_from_environment_variables() { # List of special cases to apply to the variables local -r exception_regexps=( - "s/sasl.ssl/sasl_ssl/g" - "s/sasl.plaintext/sasl_plaintext/g" + "s/sasl\.ssl/sasl_ssl/g" + "s/sasl\.plaintext/sasl_plaintext/g" ) # Map environment variables to config properties for var in "${!KAFKA_CFG_@}"; do @@ -764,7 +818,7 @@ kafka_configure_from_environment_variables() { } ######################## -# Configure Kafka configuration files to set up message sizes +# Initialize KRaft storage # Globals: # KAFKA_* # Arguments: @@ -772,17 +826,58 @@ kafka_configure_from_environment_variables() { # Returns: # None ######################### -kafka_configure_producer_consumer_message_sizes() { - if [[ -n "$KAFKA_CFG_MAX_REQUEST_SIZE" ]]; then - kafka_common_conf_set "$KAFKA_CONF_DIR/producer.properties" max.request.size "$KAFKA_CFG_MAX_REQUEST_SIZE" +kafka_kraft_storage_initialize() { + local args=("--config" "$KAFKA_CONF_FILE" "--ignore-formatted") + info "Initializing KRaft storage metadata" + + # If cluster.id found in meta.properties, use it + if [[ -f "${KAFKA_DATA_DIR}/meta.properties" ]]; then + KAFKA_KRAFT_CLUSTER_ID=$(grep "^cluster.id=" "${KAFKA_DATA_DIR}/meta.properties" | sed -E 's/^cluster\.id=(\S+)$/\1/') fi - if [[ -n "$KAFKA_CFG_MAX_PARTITION_FETCH_BYTES" ]]; then - kafka_common_conf_set "$KAFKA_CONF_DIR/consumer.properties" max.partition.fetch.bytes "$KAFKA_CFG_MAX_PARTITION_FETCH_BYTES" + + if is_empty_value "${KAFKA_KRAFT_CLUSTER_ID:-}"; then + warn "KAFKA_KRAFT_CLUSTER_ID not set - If using multiple nodes then you must use the same Cluster ID for each one" + KAFKA_KRAFT_CLUSTER_ID="$("${KAFKA_HOME}/bin/kafka-storage.sh" random-uuid)" + info "Generated Kafka cluster ID '${KAFKA_KRAFT_CLUSTER_ID}'" fi + args+=("--cluster-id" "$KAFKA_KRAFT_CLUSTER_ID") + + # SCRAM users are configured during the cluster bootstrapping process and can later be manually updated using kafka-config.sh + if is_boolean_yes "${KAFKA_KRAFT_BOOTSTRAP_SCRAM_USERS:-}"; then + info "Adding KRaft SCRAM users at storage bootstrap" + read -r -a users <<<"$(tr ',;' ' ' <<<"${KAFKA_CLIENT_USERS}")" + read -r -a passwords <<<"$(tr ',;' ' ' <<<"${KAFKA_CLIENT_PASSWORDS}")" + # Configure SCRAM-SHA-256 if enabled + if grep -Eq "^sasl.enabled.mechanisms=.*SCRAM-SHA-256" "$KAFKA_CONF_FILE"; then + for ((i = 0; i < ${#users[@]}; i++)); do + args+=("--add-scram" "SCRAM-SHA-256=[name=${users[i]},password=${passwords[i]}]") + done + fi + # Configure SCRAM-SHA-512 if enabled + if grep -Eq "^sasl.enabled.mechanisms=.*SCRAM-SHA-512" "$KAFKA_CONF_FILE"; then + for ((i = 0; i < ${#users[@]}; i++)); do + args+=("--add-scram" "SCRAM-SHA-512=[name=${users[i]},password=${passwords[i]}]") + done + fi + # Add interbroker credentials + if grep -Eq "^sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256" "$KAFKA_CONF_FILE"; then + args+=("--add-scram" "SCRAM-SHA-256=[name=${KAFKA_INTER_BROKER_USER},password=${KAFKA_INTER_BROKER_PASSWORD}]") + elif grep -Eq "^sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512" "$KAFKA_CONF_FILE"; then + args+=("--add-scram" "SCRAM-SHA-512=[name=${KAFKA_INTER_BROKER_USER},password=${KAFKA_INTER_BROKER_PASSWORD}]") + fi + # Add controller credentials + if grep -Eq "^sasl.mechanism.controller.protocol=SCRAM-SHA-256" "$KAFKA_CONF_FILE"; then + args+=("--add-scram" "SCRAM-SHA-256=[name=${KAFKA_CONTROLLER_USER},password=${KAFKA_CONTROLLER_PASSWORD}]") + elif grep -Eq "^sasl.mechanism.controller.protocol=SCRAM-SHA-512" "$KAFKA_CONF_FILE"; then + args+=("--add-scram" "SCRAM-SHA-512=[name=${KAFKA_CONTROLLER_USER},password=${KAFKA_CONTROLLER_PASSWORD}]") + fi + fi + info "Formatting storage directories to add metadata..." + "${KAFKA_HOME}/bin/kafka-storage.sh" format "${args[@]}" } ######################## -# Initialize KRaft +# Detects inconsitences between the configuration at KAFKA_CONF_FILE and cluster-state file # Globals: # KAFKA_* # Arguments: @@ -790,17 +885,21 @@ kafka_configure_producer_consumer_message_sizes() { # Returns: # None ######################### -kraft_initialize() { - info "Initializing KRaft..." +kafka_kraft_quorum_voters_changed(){ + read -r -a quorum_voters_conf_ids <<<"$(grep "^controller.quorum.voters=" "$KAFKA_CONF_FILE" | sed "s/^controller.quorum.voters=//" | tr "," " " | sed -E "s/\@\S+//g")" + read -r -a quorum_voters_state_ids <<< "$(grep -Eo "\{\"voterId\":[0-9]+\}" "${KAFKA_DATA_DIR}/__cluster_metadata-0/quorum-state" | grep -Eo "[0-9]+" | tr "\n" " ")" - if [[ -z "$KAFKA_KRAFT_CLUSTER_ID" ]]; then - warn "KAFKA_KRAFT_CLUSTER_ID not set - If using multiple nodes then you must use the same Cluster ID for each one" - KAFKA_KRAFT_CLUSTER_ID="$("${KAFKA_HOME}/bin/kafka-storage.sh" random-uuid)" - info "Generated Kafka cluster ID '${KAFKA_KRAFT_CLUSTER_ID}'" + if [[ "${#quorum_voters_conf_ids[@]}" != "${#quorum_voters_state_ids[@]}" ]]; then + true + else + read -r -a sorted_state <<< "$(echo "${quorum_voters_conf_ids[@]}" | tr ' ' '\n' | sort | tr '\n' ' ')" + read -r -a sorted_conf <<< "$(echo "${quorum_voters_state_ids[@]}" | tr ' ' '\n' | sort | tr '\n' ' ')" + if [[ "${sorted_state[*]}" = "${sorted_conf[*]}" ]]; then + false + else + true + fi fi - - info "Formatting storage directories to add metadata..." - debug_execute "$KAFKA_HOME/bin/kafka-storage.sh" format --config "$(kafka_get_conf_file)" --cluster-id "$KAFKA_KRAFT_CLUSTER_ID" --ignore-formatted } ######################## @@ -814,11 +913,6 @@ kraft_initialize() { ######################### kafka_initialize() { info "Initializing Kafka..." - # DEPRECATED. Copy files in old conf directory to maintain compatibility with Helm chart. - if ! is_dir_empty "$KAFKA_BASE_DIR"/conf; then - warn "Detected files mounted to $KAFKA_BASE_DIR/conf. This is deprecated and files should be mounted to $KAFKA_MOUNTED_CONF_DIR." - cp -Lr "$KAFKA_BASE_DIR"/conf/* "$KAFKA_CONF_DIR" - fi # Check for mounted configuration files if ! is_dir_empty "$KAFKA_MOUNTED_CONF_DIR"; then cp -Lr "$KAFKA_MOUNTED_CONF_DIR"/* "$KAFKA_CONF_DIR" @@ -832,49 +926,167 @@ kafka_initialize() { fi done - # DEPRECATED. Check for server.properties file in old conf directory to maintain compatibility with Helm chart. - if [[ ! -f "$KAFKA_BASE_DIR"/conf/server.properties ]] && [[ ! -f "$KAFKA_MOUNTED_CONF_DIR"/server.properties ]]; then + if [[ ! -f "${KAFKA_MOUNTED_CONF_DIR}/server.properties" ]]; then info "No injected configuration files found, creating default config files" + # Restore original server.properties but remove Zookeeper/KRaft specific settings for compatibility with both architectures + cp "${KAFKA_CONF_DIR}/server.properties.original" "$KAFKA_CONF_FILE" + kafka_server_unify_conf + # Configure Kafka settings kafka_server_conf_set log.dirs "$KAFKA_DATA_DIR" kafka_configure_from_environment_variables - # When setting up a Kafka cluster with N brokers, we have several listeners: - # - INTERNAL: used for inter-broker communications - # - CLIENT: used for communications with consumers/producers within the same network - # - (optional) EXTERNAL: used for communications with consumers/producers on different networks - local internal_protocol - local client_protocol - local external_client_protocol - if [[ ${KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP:-} =~ INTERNAL:([a-zA-Z_]*) ]]; then - internal_protocol="${BASH_REMATCH[1]}" - kafka_configure_internal_communications "$internal_protocol" + # Configure Kafka producer/consumer to set up message sizes + ! is_empty_value "${KAFKA_CFG_MAX_REQUEST_SIZE:-}" && kafka_common_conf_set "$KAFKA_CONF_DIR/producer.properties" max.request.size "$KAFKA_CFG_MAX_REQUEST_SIZE" + ! is_empty_value "${KAFKA_CFG_MAX_PARTITION_FETCH_BYTES:-}" && kafka_common_conf_set "$KAFKA_CONF_DIR/consumer.properties" max.partition.fetch.bytes "$KAFKA_CFG_MAX_PARTITION_FETCH_BYTES" + # Zookeeper mode additional settings + if ! is_empty_value "${KAFKA_CFG_ZOOKEEPER_CONNECT:-}"; then + if [[ "$KAFKA_ZOOKEEPER_PROTOCOL" =~ SSL ]]; then + kafka_zookeeper_configure_tls + fi + if [[ "$KAFKA_ZOOKEEPER_PROTOCOL" =~ SASL ]]; then + kafka_zookeeper_configure_jaas + fi fi - if [[ ${KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP:-} =~ CLIENT:([a-zA-Z_]*) ]]; then - client_protocol="${BASH_REMATCH[1]}" - kafka_configure_client_communications "$client_protocol" + # If at least one listener uses SSL or SASL_SSL, ensure SSL is configured + if kafka_has_ssl_listener; then + kafka_configure_ssl fi - if [[ ${KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP:-} =~ EXTERNAL:([a-zA-Z_]*) ]]; then - external_client_protocol="${BASH_REMATCH[1]}" - kafka_configure_external_client_communications "$external_client_protocol" + # If at least one listener uses SASL_PLAINTEXT or SASL_SSL, ensure SASL is configured + if kafka_has_sasl_listener; then + if [[ "$KAFKA_CFG_SASL_ENABLED_MECHANISMS" =~ SCRAM ]]; then + if ! is_empty_value "${KAFKA_CFG_PROCESS_ROLES:-}"; then + if [[ "$(kafka_get_version)" =~ ^3\.2\.|^3\.3\.|^3\.4\. ]]; then + # NOTE: This will depend on Kafka version when support for SCRAM is added + warn "KRaft mode requires Kafka version 3.5 or higher for SCRAM to be supported. SCRAM SASL mechanisms will now be disabled." + KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN + else + export KAFKA_KRAFT_BOOTSTRAP_SCRAM_USERS="true" + fi + fi + if ! is_empty_value "${KAFKA_CFG_ZOOKEEPER_CONNECT:-}"; then + export KAFKA_KRAFT_BOOTSTRAP_SCRAM_USERS="true" + fi + fi + kafka_server_conf_set sasl.enabled.mechanisms "$KAFKA_CFG_SASL_ENABLED_MECHANISMS" fi + # Settings for each Kafka Listener are configured individually + read -r -a protocol_maps <<<"$(tr ',' ' ' <<<"$KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP")" + for protocol_map in "${protocol_maps[@]}"; do + read -r -a map <<<"$(tr ':' ' ' <<<"$protocol_map")" + # Obtain the listener and protocol from protocol map string, e.g. CONTROLLER:PLAINTEXT + listener="${map[0]}" + protocol="${map[1]}" + listener_lower="$(echo "$listener" | tr '[:upper:]' '[:lower:]')" - if [[ "${internal_protocol:-}" =~ "SASL" || "${client_protocol:-}" =~ "SASL" || "${external_client_protocol:-}" =~ "SASL" ]] || [[ "${KAFKA_ZOOKEEPER_PROTOCOL}" =~ SASL ]]; then - if [[ -n "$KAFKA_CFG_SASL_ENABLED_MECHANISMS" ]]; then - kafka_server_conf_set sasl.enabled.mechanisms "$KAFKA_CFG_SASL_ENABLED_MECHANISMS" - kafka_generate_jaas_authentication_file "${internal_protocol:-}" "${client_protocol:-}" - [[ "$KAFKA_CFG_SASL_ENABLED_MECHANISMS" =~ "SCRAM" ]] && kafka_create_sasl_scram_zookeeper_users - else - print_validation_error "Specified SASL protocol but no SASL mechanisms provided in KAFKA_CFG_SASL_ENABLED_MECHANISMS" + if [[ "$protocol" = "SSL" || "$protocol" = "SASL_SSL" ]]; then + kafka_server_conf_set "listener.name.${listener_lower}.ssl.client.auth" "$KAFKA_TLS_INTER_BROKER_AUTH" fi - fi - # Remove security.inter.broker.protocol if KAFKA_CFG_INTER_BROKER_LISTENER_NAME is configured - if [[ -n "${KAFKA_CFG_INTER_BROKER_LISTENER_NAME:-}" ]]; then - remove_in_file "$(kafka_get_conf_file)" "security.inter.broker.protocol" false - fi - kafka_configure_producer_consumer_message_sizes + if [[ "$protocol" = "SASL_PLAINTEXT" || "$protocol" = "SASL_SSL" ]]; then + local role="" + if [[ "$listener" = "${KAFKA_CFG_INTER_BROKER_LISTENER_NAME:-INTERNAL}" ]]; then + kafka_server_conf_set sasl.mechanism.inter.broker.protocol "$KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL" + role="inter-broker" + elif [[ "${KAFKA_CFG_CONTROLLER_LISTENER_NAMES:-CONTROLLER}" =~ $listener ]]; then + kafka_server_conf_set sasl.mechanism.controller.protocol "$KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL" + kafka_server_conf_set "listener.name.${listener_lower}.sasl.enabled.mechanisms" "$KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL" + role="controller" + fi + # If KAFKA_CLIENT_LISTENER_NAME is found in the listeners list, configure the producer/consumer accordingly + if [[ "$listener" = "${KAFKA_CLIENT_LISTENER_NAME:-CLIENT}" ]]; then + kafka_configure_consumer_producer_jaas + kafka_producer_consumer_conf_set security.protocol "$protocol" + kafka_producer_consumer_conf_set sasl.mechanism "${KAFKA_CLIENT_SASL_MECHANISM:-$(kafka_client_sasl_mechanism)}" + fi + kafka_configure_server_jaas "$listener_lower" "${role:-}" + fi + done + else + info "Detected mounted server.properties file at ${KAFKA_MOUNTED_CONF_DIR}/server.properties. Skipping configuration based on env variables" fi true } +######################## +# Returns the most secure SASL mechanism available for Kafka clients +# Globals: +# KAFKA_* +# Arguments: +# None +# Returns: +# None +######################## +kafka_client_sasl_mechanism() { + local sasl_mechanism="" + + if [[ "$KAFKA_CFG_SASL_ENABLED_MECHANISMS" =~ SCRAM-SHA-512 ]]; then + sasl_mechanism="SCRAM-SHA-512" + elif [[ "$KAFKA_CFG_SASL_ENABLED_MECHANISMS" =~ SCRAM-SHA-256 ]]; then + sasl_mechanism="SCRAM-SHA-256" + elif [[ "$KAFKA_CFG_SASL_ENABLED_MECHANISMS" =~ PLAIN ]]; then + sasl_mechanism="PLAIN" + fi + echo "$sasl_mechanism" +} + +######################## +# Removes default settings referencing Zookeeper mode or KRaft mode +# Globals: +# KAFKA_* +# Arguments: +# None +# Returns: +# None +######################## +kafka_server_unify_conf() { + local -r remove_regexps=( + #Zookeeper + "s/^zookeeper\./#zookeeper./g" + "s/^group\.initial/#group.initial/g" + "s/^broker\./#broker./g" + "s/^node\./#node./g" + "s/^process\./#process./g" + "s/^listeners=/#listeners=/g" + "s/^listener\./#listener./g" + "s/^controller\./#controller./g" + "s/^inter\.broker/#inter.broker/g" + "s/^advertised\.listeners/#advertised.listeners/g" + ) + + # Map environment variables to config properties + for regex in "${remove_regexps[@]}"; do + sed -i "${regex}" "$KAFKA_CONF_FILE" + done +} + +######################## +# Dinamically set node.id/broker.id/controller.quorum.voters if their alternative environment variable _COMMAND is set +# Globals: +# KAFKA_*_COMMAND +# Arguments: +# None +# Returns: +# None +######################### +kafka_dynamic_environment_variables() { + # KRaft mode + if ! is_empty_value "${KAFKA_NODE_ID_COMMAND:-}"; then + KAFKA_CFG_NODE_ID="$(eval "${KAFKA_NODE_ID_COMMAND}")" + export KAFKA_CFG_NODE_ID + fi + if ! is_empty_value "${KAFKA_CONTROLLER_QUORUM_VOTERS_COMMAND:-}"; then + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS="$(eval "${KAFKA_CONTROLLER_QUORUM_VOTERS_COMMAND}")" + export KAFKA_CFG_CONTROLLER_QUORUM_VOTERS + fi + # Zookeeper mode + # DEPRECATED - BROKER_ID_COMMAND has been deprecated, please use KAFKA_BROKER_ID_COMMAND instead + if ! is_empty_value "${KAFKA_BROKER_ID_COMMAND:-}"; then + KAFKA_CFG_BROKER_ID="$(eval "${KAFKA_BROKER_ID_COMMAND}")" + export KAFKA_CFG_BROKER_ID + elif ! is_empty_value "${BROKER_ID_COMMAND:-}"; then + KAFKA_CFG_BROKER_ID="$(eval "${BROKER_ID_COMMAND}")" + export KAFKA_CFG_BROKER_ID + fi +} + ######################## # Run custom initialization scripts # Globals: @@ -955,20 +1167,3 @@ kafka_stop() { ! is_kafka_running && return stop_service_using_pid "$KAFKA_PID_FILE" TERM } - -######################## -# Get configuration file to use -# Globals: -# KAFKA_ENABLE_KRAFT -# Arguments: -# None -# Returns: -# Path to the conf file to use -######################### -kafka_get_conf_file() { - if is_boolean_yes "$KAFKA_ENABLE_KRAFT"; then - echo "$KAFKA_CONF_FILE" - else - echo "$KAFKA_ZK_CONF_FILE" - fi -}