Skip to content

Commit

Permalink
Merge branch '3.3.x'
Browse files Browse the repository at this point in the history
Closes gh-43563
  • Loading branch information
philwebb committed Dec 18, 2024
2 parents 809d6f3 + ba916cb commit ef43160
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.util.unit.DataSize;

/**
Expand Down Expand Up @@ -1399,60 +1400,67 @@ public Map<String, Object> buildProperties() {

public Map<String, Object> buildProperties(SslBundles sslBundles) {
validate();
Properties properties = new Properties();
if (getBundle() != null) {
properties.in(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG)
.accept(SslBundleSslEngineFactory.class.getName());
properties.in(SslBundle.class.getName()).accept(sslBundles.getBundle(getBundle()));
}
else {
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(this::getKeyPassword).to(properties.in(SslConfigs.SSL_KEY_PASSWORD_CONFIG));
map.from(this::getKeyStoreCertificateChain)
.to(properties.in(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG));
map.from(this::getKeyStoreKey).to(properties.in(SslConfigs.SSL_KEYSTORE_KEY_CONFIG));
map.from(this::getKeyStoreLocation)
.as(this::resourceToPath)
.to(properties.in(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG));
map.from(this::getKeyStorePassword).to(properties.in(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG));
map.from(this::getKeyStoreType).to(properties.in(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG));
map.from(this::getTrustStoreCertificates)
.to(properties.in(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG));
map.from(this::getTrustStoreLocation)
.as(this::resourceToPath)
.to(properties.in(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
map.from(this::getTrustStorePassword).to(properties.in(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
map.from(this::getTrustStoreType).to(properties.in(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG));
map.from(this::getProtocol).to(properties.in(SslConfigs.SSL_PROTOCOL_CONFIG));
String bundleName = getBundle();
if (StringUtils.hasText(bundleName)) {
return buildPropertiesForSslBundle(sslBundles, bundleName);
}
Properties properties = new Properties();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(this::getKeyPassword).to(properties.in(SslConfigs.SSL_KEY_PASSWORD_CONFIG));
map.from(this::getKeyStoreCertificateChain)
.to(properties.in(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG));
map.from(this::getKeyStoreKey).to(properties.in(SslConfigs.SSL_KEYSTORE_KEY_CONFIG));
map.from(this::getKeyStoreLocation)
.as(this::resourceToPath)
.to(properties.in(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG));
map.from(this::getKeyStorePassword).to(properties.in(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG));
map.from(this::getKeyStoreType).to(properties.in(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG));
map.from(this::getTrustStoreCertificates).to(properties.in(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG));
map.from(this::getTrustStoreLocation)
.as(this::resourceToPath)
.to(properties.in(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
map.from(this::getTrustStorePassword).to(properties.in(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
map.from(this::getTrustStoreType).to(properties.in(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG));
map.from(this::getProtocol).to(properties.in(SslConfigs.SSL_PROTOCOL_CONFIG));
return properties;
}

private Map<String, Object> buildPropertiesForSslBundle(SslBundles sslBundles, String name) {
Properties properties = new Properties();
properties.in(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG).accept(SslBundleSslEngineFactory.class.getName());
properties.in(SslBundle.class.getName()).accept(sslBundles.getBundle(name));
return properties;
}

private void validate() {
MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleNonNullValuesIn((entries) -> {
MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleMatchingValuesIn((entries) -> {
entries.put("spring.kafka.ssl.key-store-key", getKeyStoreKey());
entries.put("spring.kafka.ssl.key-store-location", getKeyStoreLocation());
});
MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleNonNullValuesIn((entries) -> {
}, this::hasValue);
MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleMatchingValuesIn((entries) -> {
entries.put("spring.kafka.ssl.trust-store-certificates", getTrustStoreCertificates());
entries.put("spring.kafka.ssl.trust-store-location", getTrustStoreLocation());
});
MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleNonNullValuesIn((entries) -> {
}, this::hasValue);
MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleMatchingValuesIn((entries) -> {
entries.put("spring.kafka.ssl.bundle", getBundle());
entries.put("spring.kafka.ssl.key-store-key", getKeyStoreKey());
});
MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleNonNullValuesIn((entries) -> {
}, this::hasValue);
MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleMatchingValuesIn((entries) -> {
entries.put("spring.kafka.ssl.bundle", getBundle());
entries.put("spring.kafka.ssl.key-store-location", getKeyStoreLocation());
});
MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleNonNullValuesIn((entries) -> {
}, this::hasValue);
MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleMatchingValuesIn((entries) -> {
entries.put("spring.kafka.ssl.bundle", getBundle());
entries.put("spring.kafka.ssl.trust-store-certificates", getTrustStoreCertificates());
});
MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleNonNullValuesIn((entries) -> {
}, this::hasValue);
MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleMatchingValuesIn((entries) -> {
entries.put("spring.kafka.ssl.bundle", getBundle());
entries.put("spring.kafka.ssl.trust-store-location", getTrustStoreLocation());
});
}, this::hasValue);
}

private boolean hasValue(Object value) {
return (value instanceof String string) ? StringUtils.hasText(string) : value != null;
}

private String resourceToPath(Resource resource) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2023 the original author or authors.
* Copyright 2012-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,20 @@ void sslPemConfiguration() {
"-----BEGINchain");
}

@Test
void sslPemConfigurationWithEmptyBundle() {
KafkaProperties properties = new KafkaProperties();
properties.getSsl().setKeyStoreKey("-----BEGINkey");
properties.getSsl().setTrustStoreCertificates("-----BEGINtrust");
properties.getSsl().setKeyStoreCertificateChain("-----BEGINchain");
properties.getSsl().setBundle("");
Map<String, Object> consumerProperties = properties.buildConsumerProperties();
assertThat(consumerProperties).containsEntry(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, "-----BEGINkey");
assertThat(consumerProperties).containsEntry(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, "-----BEGINtrust");
assertThat(consumerProperties).containsEntry(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG,
"-----BEGINchain");
}

@Test
void sslBundleConfiguration() {
KafkaProperties properties = new KafkaProperties();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2023 the original author or authors.
* Copyright 2012-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,8 +20,10 @@
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.springframework.util.Assert;
Expand Down Expand Up @@ -96,11 +98,23 @@ private static String buildMessage(Set<String> mutuallyExclusiveNames, Set<Strin
* @param entries a consumer used to populate the entries to check
*/
public static void throwIfMultipleNonNullValuesIn(Consumer<Map<String, Object>> entries) {
Map<String, Object> map = new LinkedHashMap<>();
throwIfMultipleMatchingValuesIn(entries, Objects::nonNull);
}

/**
* Throw a new {@link MutuallyExclusiveConfigurationPropertiesException} if multiple
* values are defined in a set of entries that match the given predicate.
* @param <V> the value type
* @param entries a consumer used to populate the entries to check
* @param predicate the predicate used to check for matching values
* @since 3.3.7
*/
public static <V> void throwIfMultipleMatchingValuesIn(Consumer<Map<String, V>> entries, Predicate<V> predicate) {
Map<String, V> map = new LinkedHashMap<>();
entries.accept(map);
Set<String> configuredNames = map.entrySet()
.stream()
.filter((entry) -> entry.getValue() != null)
.filter((entry) -> predicate.test(entry.getValue()))
.map(Map.Entry::getKey)
.collect(Collectors.toCollection(LinkedHashSet::new));
if (configuredNames.size() > 1) {
Expand Down

0 comments on commit ef43160

Please sign in to comment.