From ed331dec1c6e92d6762ba2b8c5e175fc03c7ff4e Mon Sep 17 00:00:00 2001 From: Marko Strukelj Date: Tue, 19 Dec 2023 18:29:23 +0100 Subject: [PATCH] Add support for server bearer token + make authentication optional for introspection endpoint Signed-off-by: Marko Strukelj --- .../JaasClientOauthLoginCallbackHandler.java | 7 +- .../oauth/common/FileBasedTokenProvider.java | 5 + .../strimzi/kafka/oauth/common/LogUtil.java | 18 +- .../oauth/common/StaticTokenProvider.java | 7 + .../oauth/services/ConfigurationKey.java | 5 + .../kafka/oauth/services/ValidatorKey.java | 134 +++++++-- .../kafka/oauth/services/Validators.java | 2 +- .../validator/JWTSignatureValidator.java | 159 ++++++---- .../OAuthIntrospectionValidator.java | 92 ++++-- .../kafka/oauth/validator/TokenValidator.java | 5 + .../oauth/common/HttpUtilTimeoutTest.java | 4 +- .../oauth/validator/ConfigIdHashTest.java | 11 +- ...asServerOauthValidatorCallbackHandler.java | 99 +++++-- .../kafka/oauth/server/ServerConfig.java | 10 + .../keycloak-auth-tests/docker-compose.yml | 4 +- .../server/AdminServerRequestHandler.java | 20 +- .../server/AuthServerRequestHandler.java | 105 ++++--- .../strimzi/testsuite/oauth/server/Mode.java | 4 + testsuite/mockoauth-tests/docker-compose.yml | 6 +- .../testsuite/oauth/MockOAuthTests.java | 6 +- .../mockoauth/AuthorizationEndpointsTest.java | 266 +++++++++++++++++ .../oauth/mockoauth/JWKSKeyUseTest.java | 3 + .../oauth/mockoauth/JaasServerConfigTest.java | 278 ++++++++++++++---- 23 files changed, 1015 insertions(+), 235 deletions(-) create mode 100644 testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/AuthorizationEndpointsTest.java diff --git a/oauth-client/src/main/java/io/strimzi/kafka/oauth/client/JaasClientOauthLoginCallbackHandler.java b/oauth-client/src/main/java/io/strimzi/kafka/oauth/client/JaasClientOauthLoginCallbackHandler.java index b6b50627..0467960e 100644 --- a/oauth-client/src/main/java/io/strimzi/kafka/oauth/client/JaasClientOauthLoginCallbackHandler.java +++ b/oauth-client/src/main/java/io/strimzi/kafka/oauth/client/JaasClientOauthLoginCallbackHandler.java @@ -38,7 +38,6 @@ import java.util.Properties; import java.util.Set; -import static io.strimzi.kafka.oauth.common.Config.OAUTH_HTTP_RETRY_PAUSE_MILLIS; import static io.strimzi.kafka.oauth.common.ConfigUtil.getConnectTimeout; import static io.strimzi.kafka.oauth.common.ConfigUtil.getReadTimeout; import static io.strimzi.kafka.oauth.common.DeprecationUtil.isAccessTokenJwt; @@ -246,14 +245,14 @@ private int getHttpRetries(ClientConfig config) { } private long getHttpRetryPauseMillis(ClientConfig config, int retries) { - long retryPauseMillis = config.getValueAsLong(OAUTH_HTTP_RETRY_PAUSE_MILLIS, 0); + long retryPauseMillis = config.getValueAsLong(Config.OAUTH_HTTP_RETRY_PAUSE_MILLIS, 0); if (retries > 0) { if (retryPauseMillis < 0) { retryPauseMillis = 0; - LOG.warn("The configured value of '{}' is less than zero and will be ignored", OAUTH_HTTP_RETRY_PAUSE_MILLIS); + LOG.warn("The configured value of '{}' is less than zero and will be ignored", Config.OAUTH_HTTP_RETRY_PAUSE_MILLIS); } if (retryPauseMillis <= 0) { - LOG.warn("No pause between http retries configured. Consider setting '{}' to greater than zero to avoid flooding the authorization server with requests.", OAUTH_HTTP_RETRY_PAUSE_MILLIS); + LOG.warn("No pause between http retries configured. Consider setting '{}' to greater than zero to avoid flooding the authorization server with requests.", Config.OAUTH_HTTP_RETRY_PAUSE_MILLIS); } } return retryPauseMillis; diff --git a/oauth-common/src/main/java/io/strimzi/kafka/oauth/common/FileBasedTokenProvider.java b/oauth-common/src/main/java/io/strimzi/kafka/oauth/common/FileBasedTokenProvider.java index 80c54ece..447b7a4b 100644 --- a/oauth-common/src/main/java/io/strimzi/kafka/oauth/common/FileBasedTokenProvider.java +++ b/oauth-common/src/main/java/io/strimzi/kafka/oauth/common/FileBasedTokenProvider.java @@ -52,4 +52,9 @@ public String token() { throw new IllegalStateException(e); } } + + @Override + public String toString() { + return "FileBasedTokenProvider: {path: '" + filePath + "'}"; + } } diff --git a/oauth-common/src/main/java/io/strimzi/kafka/oauth/common/LogUtil.java b/oauth-common/src/main/java/io/strimzi/kafka/oauth/common/LogUtil.java index 8823ae23..d8c2ac53 100644 --- a/oauth-common/src/main/java/io/strimzi/kafka/oauth/common/LogUtil.java +++ b/oauth-common/src/main/java/io/strimzi/kafka/oauth/common/LogUtil.java @@ -11,11 +11,11 @@ public class LogUtil { /** * Return masked input text. - * + *

* Masking checks the length of input. If less than 8 it returns '**********'. * If less than 20 it prints out first letter in clear text, and then additional 9x '*' irrespective of actual input size. * If input length is greater than 20 chars, it prints out first 4 in clear text followed by '***..***' followed by last 4. - * + *

* The idea is to give some information for debugging while not leaking too much information about secrets. * * @param input String with sensitive date which should be masked @@ -33,9 +33,21 @@ public static String mask(String input) { } if (len < 20) { - return "" + input.charAt(0) + "*********"; + return input.charAt(0) + "*********"; } return input.substring(0, 4) + "**" + input.substring(len - 4, len); } + + /** + * Wrap the value in single quotes, or return null if value is null + * + * @param value The value to wrap in single quotes + * @return The quoted value + */ + public static String singleQuote(String value) { + if (value == null) + return null; + return "'" + value + "'"; + } } diff --git a/oauth-common/src/main/java/io/strimzi/kafka/oauth/common/StaticTokenProvider.java b/oauth-common/src/main/java/io/strimzi/kafka/oauth/common/StaticTokenProvider.java index 59191ead..e6d1154a 100644 --- a/oauth-common/src/main/java/io/strimzi/kafka/oauth/common/StaticTokenProvider.java +++ b/oauth-common/src/main/java/io/strimzi/kafka/oauth/common/StaticTokenProvider.java @@ -5,6 +5,8 @@ package io.strimzi.kafka.oauth.common; +import static io.strimzi.kafka.oauth.common.LogUtil.mask; + /** * A TokenProvider that contains an immutable token that is returned every time a {@link io.strimzi.kafka.oauth.common.StaticTokenProvider#token()} method is called. */ @@ -24,4 +26,9 @@ public StaticTokenProvider(final String token) { public String token() { return token; } + + @Override + public String toString() { + return "StaticTokenProvider: {token: '" + mask(token) + "'}"; + } } diff --git a/oauth-common/src/main/java/io/strimzi/kafka/oauth/services/ConfigurationKey.java b/oauth-common/src/main/java/io/strimzi/kafka/oauth/services/ConfigurationKey.java index 5bd9f41a..10d958e1 100644 --- a/oauth-common/src/main/java/io/strimzi/kafka/oauth/services/ConfigurationKey.java +++ b/oauth-common/src/main/java/io/strimzi/kafka/oauth/services/ConfigurationKey.java @@ -73,4 +73,9 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(configId); } + + @Override + public String toString() { + return "ConfigurationKey {configId: " + configId + ", validatorKey: " + validatorKey + "}"; + } } diff --git a/oauth-common/src/main/java/io/strimzi/kafka/oauth/services/ValidatorKey.java b/oauth-common/src/main/java/io/strimzi/kafka/oauth/services/ValidatorKey.java index a9138f4d..8ce0d89c 100644 --- a/oauth-common/src/main/java/io/strimzi/kafka/oauth/services/ValidatorKey.java +++ b/oauth-common/src/main/java/io/strimzi/kafka/oauth/services/ValidatorKey.java @@ -8,14 +8,20 @@ import java.util.Objects; +import static io.strimzi.kafka.oauth.common.LogUtil.mask; +import static io.strimzi.kafka.oauth.common.LogUtil.singleQuote; + /** * The class that holds the validator configuration and is used to compare different configurations for equality. * It also calculates a unique identifier based on the configuration that is stable across application restarts. - * + *

* This mechanism allows sharing a single validator across multiple listeners, as long as they are configured with same config parameter values */ public class ValidatorKey { + private final String clientId; + private final String clientSecret; + private final String bearerToken; private final String validIssuerUri; private final String audience; private final String customClaimCheck; @@ -39,7 +45,10 @@ public class ValidatorKey { private final String configIdHash; @SuppressWarnings("checkstyle:ParameterNumber") - ValidatorKey(String validIssuerUri, + ValidatorKey(String clientId, + String clientSecret, + String bearerToken, + String validIssuerUri, String audience, String customClaimCheck, String usernameClaim, @@ -57,6 +66,9 @@ public class ValidatorKey { boolean enableMetrics, boolean includeAcceptHeader) { + this.clientId = clientId; + this.clientSecret = clientSecret; + this.bearerToken = bearerToken; this.validIssuerUri = validIssuerUri; this.audience = audience; this.customClaimCheck = customClaimCheck; @@ -75,7 +87,11 @@ public class ValidatorKey { this.enableMetrics = enableMetrics; this.includeAcceptHeader = includeAcceptHeader; - this.configIdHash = IOUtil.hashForObjects(validIssuerUri, + this.configIdHash = IOUtil.hashForObjects( + clientId, + clientSecret, + bearerToken, + validIssuerUri, audience, customClaimCheck, usernameClaim, @@ -94,12 +110,16 @@ public class ValidatorKey { includeAcceptHeader); } + @SuppressWarnings({"checkstyle:CyclomaticComplexity"}) @Override public boolean equals(Object o) { if (this == o) return true; if (!(o instanceof ValidatorKey)) return false; ValidatorKey that = (ValidatorKey) o; - return hasHostnameVerifier == that.hasHostnameVerifier && + return Objects.equals(clientId, that.clientId) && + Objects.equals(clientSecret, that.clientSecret) && + Objects.equals(bearerToken, that.bearerToken) && + hasHostnameVerifier == that.hasHostnameVerifier && Objects.equals(validIssuerUri, that.validIssuerUri) && Objects.equals(audience, that.audience) && Objects.equals(customClaimCheck, that.customClaimCheck) && @@ -120,7 +140,11 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(validIssuerUri, + return Objects.hash( + clientId, + clientSecret, + bearerToken, + validIssuerUri, audience, customClaimCheck, usernameClaim, @@ -166,6 +190,9 @@ public static class JwtValidatorKey extends ValidatorKey { /** * Create a new instance. Arguments have to include all validator config options. * + * @param clientId clientId + * @param clientSecret clientSecret + * @param bearerToken bearerToken * @param validIssuerUri validIssuerUri * @param audience audience * @param customClaimCheck customClaimCheck @@ -192,7 +219,10 @@ public static class JwtValidatorKey extends ValidatorKey { * @param includeAcceptHeader includeAcceptHeader */ @SuppressWarnings("checkstyle:parameternumber") - public JwtValidatorKey(String validIssuerUri, + public JwtValidatorKey(String clientId, + String clientSecret, + String bearerToken, + String validIssuerUri, String audience, String customClaimCheck, String usernameClaim, @@ -218,7 +248,10 @@ public JwtValidatorKey(String validIssuerUri, boolean failFast, boolean includeAcceptHeader) { - super(validIssuerUri, + super(clientId, + clientSecret, + bearerToken, + validIssuerUri, audience, customClaimCheck, usernameClaim, @@ -284,6 +317,38 @@ public int hashCode() { public String getConfigIdHash() { return configIdHash; } + + @Override + public String toString() { + return "JwtValidatorKey {clientId: " + singleQuote(super.clientId) + + ", clientSecret: " + singleQuote(mask(super.clientSecret)) + + ", bearerToken: " + singleQuote(mask(super.bearerToken)) + + ", validIssuerUri: " + singleQuote(super.validIssuerUri) + + ", audience: " + singleQuote(super.audience) + + ", customClaimCheck: " + singleQuote(super.customClaimCheck) + + ", usernameClaim: " + singleQuote(super.usernameClaim) + + ", fallbackUsernameClaim: " + singleQuote(super.fallbackUsernameClaim) + + ", fallbackUsernamePrefix: " + singleQuote(super.fallbackUsernamePrefix) + + ", groupQuery: " + singleQuote(super.groupQuery) + + ", groupDelimiter: [" + super.groupDelimiter + + "], sslTruststore: " + singleQuote(super.sslTruststore) + + ", sslStorePassword: " + singleQuote(mask(super.sslStorePassword)) + + ", sslStoreType: " + singleQuote(super.sslStoreType) + + ", sslRandom: " + singleQuote(super.sslRandom) + + ", hasHostnameVerifier: " + super.hasHostnameVerifier + + ", connectTimeout: " + super.connectTimeout + + ", readTimeout: " + super.readTimeout + + ", enableMetrics: " + super.enableMetrics + + ", includeAcceptHeader: " + super.includeAcceptHeader + + ", jwksEndpointUri: " + singleQuote(jwksEndpointUri) + + ", jwksRefreshSeconds: " + jwksRefreshSeconds + + ", jwksExpirySeconds: " + jwksExpirySeconds + + ", jwksRefreshMinPauseSeconds: " + jwksRefreshMinPauseSeconds + + ", jwksIgnoreKeyUse: " + jwksIgnoreKeyUse + + ", checkAccessTokenType: " + checkAccessTokenType + + ", failFast: " + failFast + + "}"; + } } /** @@ -294,8 +359,6 @@ public static class IntrospectionValidatorKey extends ValidatorKey { private final String introspectionEndpoint; private final String userInfoEndpoint; private final String validTokenType; - private final String clientId; - private final String clientSecret; private final int retries; private final long retryPauseMillis; private final String configIdHash; @@ -303,6 +366,9 @@ public static class IntrospectionValidatorKey extends ValidatorKey { /** * Create a new instance. Arguments have to include all validator config options. * + * @param clientId clientId + * @param clientSecret clientSecret + * @param bearerToken bearerToken * @param validIssuerUri validIssuerUri * @param audience audience * @param customClaimCheck customClaimCheck @@ -319,8 +385,6 @@ public static class IntrospectionValidatorKey extends ValidatorKey { * @param introspectionEndpoint introspectionEndpoint * @param userInfoEndpoint userInfoEndpoint * @param validTokenType validTokenType - * @param clientId clientId - * @param clientSecret clientSecret * @param connectTimeout connectTimeout * @param readTimeout readTimeout * @param enableMetrics enableMetrics @@ -329,7 +393,10 @@ public static class IntrospectionValidatorKey extends ValidatorKey { * @param includeAcceptHeader includeAcceptHeader */ @SuppressWarnings("checkstyle:parameternumber") - public IntrospectionValidatorKey(String validIssuerUri, + public IntrospectionValidatorKey(String clientId, + String clientSecret, + String bearerToken, + String validIssuerUri, String audience, String customClaimCheck, String usernameClaim, @@ -346,8 +413,6 @@ public IntrospectionValidatorKey(String validIssuerUri, String introspectionEndpoint, String userInfoEndpoint, String validTokenType, - String clientId, - String clientSecret, int connectTimeout, int readTimeout, boolean enableMetrics, @@ -355,7 +420,10 @@ public IntrospectionValidatorKey(String validIssuerUri, long retryPauseMillis, boolean includeAcceptHeader) { - super(validIssuerUri, + super(clientId, + clientSecret, + bearerToken, + validIssuerUri, audience, customClaimCheck, usernameClaim, @@ -375,8 +443,6 @@ public IntrospectionValidatorKey(String validIssuerUri, this.introspectionEndpoint = introspectionEndpoint; this.userInfoEndpoint = userInfoEndpoint; this.validTokenType = validTokenType; - this.clientId = clientId; - this.clientSecret = clientSecret; this.retries = retries; this.retryPauseMillis = retryPauseMillis; @@ -399,8 +465,6 @@ public boolean equals(Object o) { return Objects.equals(introspectionEndpoint, that.introspectionEndpoint) && Objects.equals(userInfoEndpoint, that.userInfoEndpoint) && Objects.equals(validTokenType, that.validTokenType) && - Objects.equals(clientId, that.clientId) && - Objects.equals(clientSecret, that.clientSecret) && Objects.equals(retries, that.retries) && Objects.equals(retryPauseMillis, that.retryPauseMillis); } @@ -411,8 +475,6 @@ public int hashCode() { introspectionEndpoint, userInfoEndpoint, validTokenType, - clientId, - clientSecret, retries, retryPauseMillis); } @@ -421,5 +483,35 @@ public int hashCode() { public String getConfigIdHash() { return configIdHash; } + + @Override + public String toString() { + return "IntrospectionValidatorKey {clientId: " + singleQuote(super.clientId) + + ", clientSecret: " + singleQuote(mask(super.clientSecret)) + + ", bearerToken: " + singleQuote(mask(super.bearerToken)) + + ", validIssuerUri: " + singleQuote(super.validIssuerUri) + + ", audience: " + singleQuote(super.audience) + + ", customClaimCheck: " + singleQuote(super.customClaimCheck) + + ", usernameClaim: " + singleQuote(super.usernameClaim) + + ", fallbackUsernameClaim: " + singleQuote(super.fallbackUsernameClaim) + + ", fallbackUsernamePrefix: " + singleQuote(super.fallbackUsernamePrefix) + + ", groupQuery: " + singleQuote(super.groupQuery) + + ", groupDelimiter: [" + super.groupDelimiter + + "], sslTruststore: " + singleQuote(super.sslTruststore) + + ", sslStorePassword: " + singleQuote(mask(super.sslStorePassword)) + + ", sslStoreType: " + singleQuote(super.sslStoreType) + + ", sslRandom: " + singleQuote(super.sslRandom) + + ", hasHostnameVerifier: " + super.hasHostnameVerifier + + ", connectTimeout: " + super.connectTimeout + + ", readTimeout: " + super.readTimeout + + ", enableMetrics: " + super.enableMetrics + + ", includeAcceptHeader: " + super.includeAcceptHeader + + ", introspectionEndpoint: " + singleQuote(introspectionEndpoint) + + ", userInfoEndpoint: " + singleQuote(userInfoEndpoint) + + ", validTokenType: " + singleQuote(validTokenType) + + ", retries: " + retries + + ", retryPauseMillis: " + retryPauseMillis + + "}"; + } } } diff --git a/oauth-common/src/main/java/io/strimzi/kafka/oauth/services/Validators.java b/oauth-common/src/main/java/io/strimzi/kafka/oauth/services/Validators.java index cede86df..9dc71598 100644 --- a/oauth-common/src/main/java/io/strimzi/kafka/oauth/services/Validators.java +++ b/oauth-common/src/main/java/io/strimzi/kafka/oauth/services/Validators.java @@ -37,7 +37,7 @@ public TokenValidator get(ConfigurationKey key, Supplier factory // If key with the same configId exists already it has to have an equal validatorKey (the same configuration) // In that case, the existing ValidatorEntry will be reused if (!key.getValidatorKey().equals(previous.key.getValidatorKey())) { - throw new ConfigException("Configuration id " + key.getConfigId() + " with different configuration has already been assigned"); + throw new ConfigException("Configuration id '" + key.getConfigId() + "' with different configuration has already been assigned (" + previous.key + "\n\tversus:\n\t" + key + ")"); } return previous.validator; } diff --git a/oauth-common/src/main/java/io/strimzi/kafka/oauth/validator/JWTSignatureValidator.java b/oauth-common/src/main/java/io/strimzi/kafka/oauth/validator/JWTSignatureValidator.java index 402fa0ba..df205044 100644 --- a/oauth-common/src/main/java/io/strimzi/kafka/oauth/validator/JWTSignatureValidator.java +++ b/oauth-common/src/main/java/io/strimzi/kafka/oauth/validator/JWTSignatureValidator.java @@ -18,6 +18,7 @@ import io.strimzi.kafka.oauth.common.PrincipalExtractor; import io.strimzi.kafka.oauth.common.TimeUtil; import io.strimzi.kafka.oauth.common.TokenInfo; +import io.strimzi.kafka.oauth.common.TokenProvider; import io.strimzi.kafka.oauth.jsonpath.JsonPathFilterQuery; import io.strimzi.kafka.oauth.jsonpath.JsonPathQuery; import io.strimzi.kafka.oauth.metrics.JwksHttpSensorKeyProducer; @@ -43,6 +44,8 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static io.strimzi.kafka.oauth.common.LogUtil.mask; +import static io.strimzi.kafka.oauth.common.OAuthAuthenticator.base64encode; import static io.strimzi.kafka.oauth.validator.TokenValidationException.Status; /** @@ -66,6 +69,10 @@ public class JWTSignatureValidator implements TokenValidator { private static final DefaultJWSVerifierFactory VERIFIER_FACTORY = new DefaultJWSVerifierFactory(); private final String validatorId; + private final String clientId; + private final String clientSecret; + private final TokenProvider bearerTokenProvider; + private final URI keysUri; private final String issuerUri; private final int maxStaleSeconds; @@ -89,6 +96,8 @@ public class JWTSignatureValidator implements TokenValidator { private BackOffTaskScheduler fastScheduler; + private final ScheduledExecutorService executor; + private final boolean enableMetrics; private final OAuthMetrics metrics; private final SensorKeyProducer jwksHttpSensorKeyProducer; @@ -98,6 +107,9 @@ public class JWTSignatureValidator implements TokenValidator { * Create a new instance. * * @param validatorId A unique id to associate with this validator for the purpose of validator lifecycle and metrics tracking + * @param clientId The clientId of the OAuth2 client representing this Kafka broker - used to authenticate to the introspection endpoint using Basic authentication + * @param clientSecret The secret of the OAuth2 client representing this Kafka broker - used to authenticate to the introspection endpoint using Basic authentication + * @param bearerTokenProvider The provider of the bearer token as an alternative to clientId and secret of the OAuth2 client representing this Kafka broker - used to authenticate to the introspection endpoint using Bearer authentication * @param keysEndpointUri The JWKS endpoint url at the authorization server * @param socketFactory The optional SSL socket factory to use when establishing the connection to authorization server * @param verifier The optional hostname verifier used to validate the TLS certificate by the authorization server @@ -120,6 +132,9 @@ public class JWTSignatureValidator implements TokenValidator { */ @SuppressWarnings("checkstyle:ParameterNumber") public JWTSignatureValidator(String validatorId, + String clientId, + String clientSecret, + TokenProvider bearerTokenProvider, String keysEndpointUri, SSLSocketFactory socketFactory, HostnameVerifier verifier, @@ -145,36 +160,19 @@ public JWTSignatureValidator(String validatorId, } this.validatorId = validatorId; - if (keysEndpointUri == null) { - throw new IllegalArgumentException("keysEndpointUri == null"); - } - try { - this.keysUri = new URI(keysEndpointUri); - } catch (URISyntaxException e) { - throw new IllegalArgumentException("Invalid keysEndpointUri: " + keysEndpointUri, e); - } + this.clientId = clientId; + this.clientSecret = clientSecret; + this.bearerTokenProvider = bearerTokenProvider; - if (socketFactory != null && !"https".equals(keysUri.getScheme())) { - throw new IllegalArgumentException("SSL socket factory set but keysEndpointUri not 'https'"); - } - this.socketFactory = socketFactory; + checkAuthorizationOptions(clientId, bearerTokenProvider); - if (verifier != null && !"https".equals(keysUri.getScheme())) { - throw new IllegalArgumentException("Certificate hostname verifier set but keysEndpointUri not 'https'"); - } - this.hostnameVerifier = verifier; + this.issuerUri = checkIssuerUri(validIssuerUri); + this.keysUri = checkKeysEndpointUri(keysEndpointUri); + this.socketFactory = checkSocketFactory(socketFactory); + this.hostnameVerifier = checkHostnameVerifier(verifier); this.principalExtractor = principalExtractor; - if (validIssuerUri != null) { - try { - new URI(validIssuerUri); - } catch (URISyntaxException e) { - throw new IllegalArgumentException("Value of validIssuerUri not a valid URI: " + validIssuerUri, e); - } - } - this.issuerUri = validIssuerUri; - validateRefreshConfig(refreshSeconds, expirySeconds); this.maxStaleSeconds = expirySeconds; @@ -197,35 +195,80 @@ public JWTSignatureValidator(String validatorId, metrics = enableMetrics ? Services.getInstance().getMetrics() : null; jwksHttpSensorKeyProducer = new JwksHttpSensorKeyProducer(validatorId, keysUri); - ScheduledExecutorService executor = setupExecutorAndFetchInitialKeys(refreshSeconds, refreshMinPauseSeconds, failFast); + executor = setupExecutorAndFetchInitialKeys(refreshSeconds, refreshMinPauseSeconds, failFast); // set up periodic timer to trigger fastScheduler job every refreshSeconds setupRefreshKeysJob(executor, refreshSeconds); } finally { if (log.isDebugEnabled()) { log.debug("Configured JWTSignatureValidator:" - + "\n validatorId: " + validatorId - + "\n keysEndpointUri: " + keysEndpointUri - + "\n sslSocketFactory: " + socketFactory - + "\n hostnameVerifier: " + hostnameVerifier - + "\n principalExtractor: " + principalExtractor - + "\n groupsClaimQuery: " + groupsClaimQuery - + "\n groupsClaimDelimiter: " + groupsClaimDelimiter - + "\n validIssuerUri: " + validIssuerUri - + "\n certsRefreshSeconds: " + refreshSeconds - + "\n certsRefreshMinPauseSeconds: " + refreshMinPauseSeconds - + "\n certsExpirySeconds: " + expirySeconds - + "\n certsIgnoreKeyUse: " + ignoreKeyUse - + "\n checkAccessTokenType: " + checkAccessTokenType - + "\n audience: " + audience - + "\n customClaimCheck: " + customClaimCheck - + "\n connectTimeoutSeconds: " + connectTimeoutSeconds - + "\n readTimeoutSeconds: " + readTimeoutSeconds - + "\n enableMetrics: " + enableMetrics - + "\n failFast: " + failFast - + "\n includeAcceptHeader: " + includeAcceptHeader); + + "\n\t validatorId: " + validatorId + + "\n\t clientId: " + clientId + + "\n\t clientSecret: " + mask(clientSecret) + + "\n\t bearerTokenProvider: " + bearerTokenProvider + + "\n\t keysEndpointUri: " + keysEndpointUri + + "\n\t sslSocketFactory: " + socketFactory + + "\n\t hostnameVerifier: " + hostnameVerifier + + "\n\t principalExtractor: " + principalExtractor + + "\n\t groupsClaimQuery: " + groupsClaimQuery + + "\n\t groupsClaimDelimiter: " + groupsClaimDelimiter + + "\n\t validIssuerUri: " + validIssuerUri + + "\n\t certsRefreshSeconds: " + refreshSeconds + + "\n\t certsRefreshMinPauseSeconds: " + refreshMinPauseSeconds + + "\n\t certsExpirySeconds: " + expirySeconds + + "\n\t certsIgnoreKeyUse: " + ignoreKeyUse + + "\n\t checkAccessTokenType: " + checkAccessTokenType + + "\n\t audience: " + audience + + "\n\t customClaimCheck: " + customClaimCheck + + "\n\t connectTimeoutSeconds: " + connectTimeoutSeconds + + "\n\t readTimeoutSeconds: " + readTimeoutSeconds + + "\n\t enableMetrics: " + enableMetrics + + "\n\t failFast: " + failFast + + "\n\t includeAcceptHeader: " + includeAcceptHeader); + } + } + } + + private static void checkAuthorizationOptions(String clientId, TokenProvider bearerTokenProvider) { + if (clientId != null && bearerTokenProvider != null) { + throw new IllegalArgumentException("Can't use both clientId and bearerToken"); + } + } + + private URI checkKeysEndpointUri(String keysEndpointUri) { + if (keysEndpointUri == null) { + throw new IllegalArgumentException("keysEndpointUri == null"); + } + try { + return new URI(keysEndpointUri); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("Invalid keys endpoint uri: " + keysEndpointUri, e); + } + } + + private HostnameVerifier checkHostnameVerifier(HostnameVerifier verifier) { + if (verifier != null && !"https".equals(keysUri.getScheme())) { + throw new IllegalArgumentException("Certificate hostname verifier set but keysEndpointUri not 'https'"); + } + return verifier; + } + + private static String checkIssuerUri(String validIssuerUri) { + if (validIssuerUri != null) { + try { + new URI(validIssuerUri); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("Value of validIssuerUri not a valid URI: " + validIssuerUri, e); } } + return validIssuerUri; + } + + private SSLSocketFactory checkSocketFactory(SSLSocketFactory socketFactory) { + if (socketFactory != null && !"https".equals(keysUri.getScheme())) { + throw new IllegalArgumentException("SSL socket factory set but keysEndpointUri not 'https'"); + } + return socketFactory; } private ScheduledExecutorService setupExecutorAndFetchInitialKeys(int refreshSeconds, int refreshMinPauseSeconds, boolean failFast) { @@ -260,7 +303,7 @@ private ScheduledExecutorService setupExecutorAndFetchInitialKeys(int refreshSec private JsonPathFilterQuery parseCustomClaimCheck(String customClaimCheck) { if (customClaimCheck != null) { String query = customClaimCheck.trim(); - if (query.length() == 0) { + if (query.isEmpty()) { throw new IllegalArgumentException("Value of customClaimCheck is empty"); } return JsonPathFilterQuery.parse(query); @@ -271,7 +314,7 @@ private JsonPathFilterQuery parseCustomClaimCheck(String customClaimCheck) { private JsonPathQuery parseGroupsQuery(String groupsQuery) { if (groupsQuery != null) { String query = groupsQuery.trim(); - if (query.length() == 0) { + if (query.isEmpty()) { throw new IllegalArgumentException("Value of groupsClaimQuery is empty"); } return JsonPathQuery.parse(query); @@ -281,7 +324,7 @@ private JsonPathQuery parseGroupsQuery(String groupsQuery) { private String parseGroupsDelimiter(String groupsDelimiter) { if (groupsDelimiter != null) { - if (groupsDelimiter.length() == 0) { + if (groupsDelimiter.isEmpty()) { throw new IllegalArgumentException("Value of groupsClaimDelimiter is empty"); } } @@ -338,7 +381,8 @@ private PublicKey getKeyUnlessStale(String id) { private void fetchKeys() { long requestStartTime = System.currentTimeMillis(); try { - String response = HttpUtil.get(keysUri, socketFactory, hostnameVerifier, null, String.class, connectTimeout, readTimeout, includeAcceptHeader); + String authorization = generateAuthorizationHeader(); + String response = HttpUtil.get(keysUri, socketFactory, hostnameVerifier, authorization, String.class, connectTimeout, readTimeout, includeAcceptHeader); addJwksHttpMetricSuccessTime(requestStartTime); Map newCache = new HashMap<>(); @@ -375,6 +419,16 @@ private void fetchKeys() { } } + private String generateAuthorizationHeader() { + String authorization = null; + if (bearerTokenProvider != null) { + authorization = "Bearer " + bearerTokenProvider.token(); + } else if (clientId != null) { + authorization = "Basic " + base64encode(clientId + ':' + clientSecret); + } + return authorization; + } + @SuppressFBWarnings(value = "BC_UNCONFIRMED_CAST_OF_RETURN_VALUE", justification = "We tell TokenVerifier to parse AccessToken. It will return AccessToken or fail.") public TokenInfo validate(String token) { @@ -510,6 +564,11 @@ public String getValidatorId() { return validatorId; } + @Override + public void close() { + executor.shutdownNow(); + } + private void addJwksHttpMetricSuccessTime(long startTimeMs) { if (enableMetrics) { diff --git a/oauth-common/src/main/java/io/strimzi/kafka/oauth/validator/OAuthIntrospectionValidator.java b/oauth-common/src/main/java/io/strimzi/kafka/oauth/validator/OAuthIntrospectionValidator.java index f09770bc..491d6c5f 100644 --- a/oauth-common/src/main/java/io/strimzi/kafka/oauth/validator/OAuthIntrospectionValidator.java +++ b/oauth-common/src/main/java/io/strimzi/kafka/oauth/validator/OAuthIntrospectionValidator.java @@ -12,6 +12,7 @@ import io.strimzi.kafka.oauth.common.PrincipalExtractor; import io.strimzi.kafka.oauth.common.TimeUtil; import io.strimzi.kafka.oauth.common.TokenInfo; +import io.strimzi.kafka.oauth.common.TokenProvider; import io.strimzi.kafka.oauth.jsonpath.JsonPathFilterQuery; import io.strimzi.kafka.oauth.jsonpath.JsonPathQuery; import io.strimzi.kafka.oauth.metrics.IntrospectHttpSensorKeyProducer; @@ -59,6 +60,7 @@ public class OAuthIntrospectionValidator implements TokenValidator { private final String validTokenType; private final String clientId; private final String clientSecret; + private final TokenProvider bearerTokenProvider; private final String audience; private final JsonPathFilterQuery customClaimMatcher; private final SSLSocketFactory socketFactory; @@ -84,6 +86,9 @@ public class OAuthIntrospectionValidator implements TokenValidator { * Create a new instance. * * @param id A unique id to associate with this validator for the purpose of validator lifecycle and metrics tracking + * @param clientId The clientId of the OAuth2 client representing this Kafka broker - used to authenticate to the introspection endpoint using Basic authentication + * @param clientSecret The secret of the OAuth2 client representing this Kafka broker - used to authenticate to the introspection endpoint using Basic authentication + * @param bearerTokenProvider The provider of the bearer token as an alternative to clientId and secret of the OAuth2 client representing this Kafka broker - used to authenticate to the introspection endpoint using Bearer authentication * @param introspectionEndpointUri The introspection endpoint url at the authorization server * @param socketFactory The optional SSL socket factory to use when establishing the connection to authorization server * @param verifier The optional hostname verifier used to validate the TLS certificate by the authorization server @@ -93,8 +98,6 @@ public class OAuthIntrospectionValidator implements TokenValidator { * @param issuerUri The required value of the 'iss' claim in the introspection endpoint response * @param userInfoUri The optional user info endpoint url at the authorization server, used as a failover when user id can't be extracted from the introspection endpoint response * @param validTokenType The optional token type enforcement - only the specified token type is accepted as valid - * @param clientId The clientId of the OAuth2 client representing this Kafka broker - needed to authenticate to the introspection endpoint - * @param clientSecret The secret of the OAuth2 client representing this Kafka broker - needed to authenticate to the introspection endpoint * @param audience The optional audience check. If specified, the 'aud' attribute of the introspection endpoint response needs to contain the configured clientId * @param customClaimCheck The optional JSONPath filter query for additional custom attribute checking * @param connectTimeoutSeconds The maximum time to wait for connection to authorization server to be established (in seconds) @@ -106,6 +109,9 @@ public class OAuthIntrospectionValidator implements TokenValidator { */ @SuppressWarnings("checkstyle:ParameterNumber") public OAuthIntrospectionValidator(String id, + String clientId, + String clientSecret, + TokenProvider bearerTokenProvider, String introspectionEndpointUri, SSLSocketFactory socketFactory, HostnameVerifier verifier, @@ -115,8 +121,6 @@ public OAuthIntrospectionValidator(String id, String issuerUri, String userInfoUri, String validTokenType, - String clientId, - String clientSecret, String audience, String customClaimCheck, int connectTimeoutSeconds, @@ -146,6 +150,10 @@ public OAuthIntrospectionValidator(String id, this.validTokenType = validTokenType; this.clientId = clientId; this.clientSecret = clientSecret; + this.bearerTokenProvider = bearerTokenProvider; + + checkAuthorizationOptions(clientId, bearerTokenProvider); + this.audience = audience; this.customClaimMatcher = parseCustomClaimCheck(customClaimCheck); @@ -166,33 +174,40 @@ public OAuthIntrospectionValidator(String id, if (log.isDebugEnabled()) { log.debug("Configured OAuthIntrospectionValidator:" - + "\n id: " + id - + "\n introspectionEndpointUri: " + introspectionURI - + "\n sslSocketFactory: " + socketFactory - + "\n hostnameVerifier: " + hostnameVerifier - + "\n principalExtractor: " + principalExtractor - + "\n groupsClaimQuery: " + groupsClaimQuery - + "\n groupsClaimDelimiter: " + groupsClaimDelimiter - + "\n validIssuerUri: " + validIssuerURI - + "\n userInfoUri: " + userInfoURI - + "\n validTokenType: " + validTokenType - + "\n clientId: " + clientId - + "\n clientSecret: " + mask(clientSecret) - + "\n audience: " + audience - + "\n customClaimCheck: " + customClaimCheck - + "\n connectTimeoutSeconds: " + connectTimeoutSeconds - + "\n readTimeoutSeconds: " + readTimeoutSeconds - + "\n enableMetrics: " + enableMetrics - + "\n retries: " + retries - + "\n retryPauseMillis: " + retryPauseMillis - + "\n includeAcceptHeader: " + includeAcceptHeader + + "\n\t id: " + id + + "\n\t introspectionEndpointUri: " + introspectionURI + + "\n\t sslSocketFactory: " + socketFactory + + "\n\t hostnameVerifier: " + hostnameVerifier + + "\n\t principalExtractor: " + principalExtractor + + "\n\t groupsClaimQuery: " + groupsClaimQuery + + "\n\t groupsClaimDelimiter: " + groupsClaimDelimiter + + "\n\t validIssuerUri: " + validIssuerURI + + "\n\t userInfoUri: " + userInfoURI + + "\n\t validTokenType: " + validTokenType + + "\n\t clientId: " + clientId + + "\n\t clientSecret: " + mask(clientSecret) + + "\n\t bearerTokenProvider: " + bearerTokenProvider + + "\n\t audience: " + audience + + "\n\t customClaimCheck: " + customClaimCheck + + "\n\t connectTimeoutSeconds: " + connectTimeoutSeconds + + "\n\t readTimeoutSeconds: " + readTimeoutSeconds + + "\n\t enableMetrics: " + enableMetrics + + "\n\t retries: " + retries + + "\n\t retryPauseMillis: " + retryPauseMillis + + "\n\t includeAcceptHeader: " + includeAcceptHeader ); } } + private static void checkAuthorizationOptions(String clientId, TokenProvider bearerTokenProvider) { + if (clientId != null && bearerTokenProvider != null) { + throw new IllegalArgumentException("Can't use both clientId and bearerToken"); + } + } + private HostnameVerifier checkHostnameVerifier(HostnameVerifier verifier) { if (verifier != null && !"https".equals(introspectionURI.getScheme())) { - throw new IllegalArgumentException("Certificate hostname verifier set but keysEndpointUri not 'https'"); + throw new IllegalArgumentException("Certificate hostname verifier set but introspectionEndpointUri not 'https'"); } return verifier; } @@ -208,7 +223,7 @@ private URI checkUserInfoUri(String userInfoUri) { return null; } - private String checkIssuerUri(String issuerUri) { + private static String checkIssuerUri(String issuerUri) { if (issuerUri != null) { try { new URI(issuerUri); @@ -248,7 +263,7 @@ private String checkValidatorId(String validatorId) { private JsonPathFilterQuery parseCustomClaimCheck(String customClaimCheck) { if (customClaimCheck != null) { String query = customClaimCheck.trim(); - if (query.length() == 0) { + if (query.isEmpty()) { throw new IllegalArgumentException("Value of customClaimCheck is empty"); } return JsonPathFilterQuery.parse(query); @@ -259,7 +274,7 @@ private JsonPathFilterQuery parseCustomClaimCheck(String customClaimCheck) { private JsonPathQuery parseGroupsQuery(String groupsQuery) { if (groupsQuery != null) { String query = groupsQuery.trim(); - if (query.length() == 0) { + if (query.isEmpty()) { throw new IllegalArgumentException("Value of groupsClaimQuery is empty"); } return JsonPathQuery.parse(query); @@ -269,7 +284,7 @@ private JsonPathQuery parseGroupsQuery(String groupsQuery) { private String parseGroupsDelimiter(String groupsDelimiter) { if (groupsDelimiter != null) { - if (groupsDelimiter.length() == 0) { + if (groupsDelimiter.isEmpty()) { throw new IllegalArgumentException("Value of groupsClaimDelimiter is empty"); } } @@ -279,9 +294,7 @@ private String parseGroupsDelimiter(String groupsDelimiter) { @SuppressWarnings({"checkstyle:NPathComplexity", "checkstyle:CyclomaticComplexity"}) public TokenInfo validate(String token) { - String authorization = clientSecret != null ? - "Basic " + base64encode(clientId + ':' + clientSecret) : - null; + String authorization = generateAuthorizationHeader(); StringBuilder body = new StringBuilder("token=").append(token); @@ -364,6 +377,16 @@ public TokenInfo validate(String token) { return new TokenInfo(token, scopes, principal, groups, iat, expiresMillis); } + private String generateAuthorizationHeader() { + String authorization = null; + if (bearerTokenProvider != null) { + authorization = "Bearer " + bearerTokenProvider.token(); + } else if (clientId != null) { + authorization = "Basic " + base64encode(clientId + ':' + clientSecret); + } + return authorization; + } + private Set extractGroupsFromResponse(JsonNode userInfoJson) { JsonNode result = groupsMatcher.apply(userInfoJson); if (result == null) { @@ -456,6 +479,11 @@ public String getValidatorId() { return validatorId; } + @Override + public void close() { + // nothing to do + } + class IntrospectMetricsHandler implements MetricsHandler { @Override diff --git a/oauth-common/src/main/java/io/strimzi/kafka/oauth/validator/TokenValidator.java b/oauth-common/src/main/java/io/strimzi/kafka/oauth/validator/TokenValidator.java index 5b6d6290..e5db0bd5 100644 --- a/oauth-common/src/main/java/io/strimzi/kafka/oauth/validator/TokenValidator.java +++ b/oauth-common/src/main/java/io/strimzi/kafka/oauth/validator/TokenValidator.java @@ -25,4 +25,9 @@ public interface TokenValidator { * @return A validator id */ String getValidatorId(); + + /** + * Close any allocated resources like background threads + */ + void close(); } diff --git a/oauth-common/src/test/java/io/strimzi/kafka/oauth/common/HttpUtilTimeoutTest.java b/oauth-common/src/test/java/io/strimzi/kafka/oauth/common/HttpUtilTimeoutTest.java index 316c8ef0..c853466e 100644 --- a/oauth-common/src/test/java/io/strimzi/kafka/oauth/common/HttpUtilTimeoutTest.java +++ b/oauth-common/src/test/java/io/strimzi/kafka/oauth/common/HttpUtilTimeoutTest.java @@ -115,9 +115,9 @@ public void doTest() throws Exception { // Test validator try { - OAuthIntrospectionValidator validator = new OAuthIntrospectionValidator("test", "http://192.168.255.255:26309", + OAuthIntrospectionValidator validator = new OAuthIntrospectionValidator("test", "kafka", "kafka-secret", null, "http://192.168.255.255:26309", null, null, new PrincipalExtractor(), null, null, "http://172.0.0.13/", null, "Bearer", - "kafka", "kafka-secret", null, null, timeout, timeout, false, 0, 0, true); + null, null, timeout, timeout, false, 0, 0, true); start = System.currentTimeMillis(); validator.validate("token"); diff --git a/oauth-common/src/test/java/io/strimzi/kafka/oauth/validator/ConfigIdHashTest.java b/oauth-common/src/test/java/io/strimzi/kafka/oauth/validator/ConfigIdHashTest.java index 5de0880a..e7c5212a 100644 --- a/oauth-common/src/test/java/io/strimzi/kafka/oauth/validator/ConfigIdHashTest.java +++ b/oauth-common/src/test/java/io/strimzi/kafka/oauth/validator/ConfigIdHashTest.java @@ -16,18 +16,21 @@ public void testValidatorKey() { ValidatorKey vkey = getKey(null, null); ValidatorKey vkey2 = getKey(null, null); - Assert.assertEquals("Config id hash mismatch", "1ed03b31", vkey.getConfigIdHash()); + Assert.assertEquals("Config id hash mismatch", "f433dbf8", vkey.getConfigIdHash()); Assert.assertEquals("Config id hash should be the same", vkey.getConfigIdHash(), vkey2.getConfigIdHash()); ValidatorKey key3 = getKey("group", null); ValidatorKey key4 = getKey(null, "group"); - Assert.assertEquals("Config id hash mismatch", "1afa0b66", key3.getConfigIdHash()); - Assert.assertEquals("Config id hash mismatch", "0d8122fb", key4.getConfigIdHash()); + Assert.assertEquals("Config id hash mismatch", "024dcc79", key3.getConfigIdHash()); + Assert.assertEquals("Config id hash mismatch", "dfec1585", key4.getConfigIdHash()); } ValidatorKey getKey(String groupQuery, String groupDelimiter) { return new ValidatorKey.IntrospectionValidatorKey( + "example-client", + "example-client-secret", + null, "http://mockoauth:8080", null, "@.aud='http://example.com/'", @@ -44,8 +47,6 @@ ValidatorKey getKey(String groupQuery, String groupDelimiter) { "http://mockoauth:8080/introspect", null, null, - "example-client", - "example-client-secret", 60, 60, true, diff --git a/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/JaasServerOauthValidatorCallbackHandler.java b/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/JaasServerOauthValidatorCallbackHandler.java index 6aabf63d..c7628800 100644 --- a/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/JaasServerOauthValidatorCallbackHandler.java +++ b/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/JaasServerOauthValidatorCallbackHandler.java @@ -7,8 +7,11 @@ import io.strimzi.kafka.oauth.common.Config; import io.strimzi.kafka.oauth.common.ConfigException; import io.strimzi.kafka.oauth.common.ConfigUtil; +import io.strimzi.kafka.oauth.common.FileBasedTokenProvider; import io.strimzi.kafka.oauth.common.IOUtil; import io.strimzi.kafka.oauth.common.PrincipalExtractor; +import io.strimzi.kafka.oauth.common.StaticTokenProvider; +import io.strimzi.kafka.oauth.common.TokenProvider; import io.strimzi.kafka.oauth.jsonpath.JsonPathFilterQuery; import io.strimzi.kafka.oauth.metrics.IntrospectValidationSensorKeyProducer; import io.strimzi.kafka.oauth.metrics.JwksValidationSensorKeyProducer; @@ -123,13 +126,15 @@ *

*

* Optional sasl.jaas.config configuration: *

* @@ -270,6 +275,9 @@ public void delegatedConfigure(Map configs, String saslMechanism, Lis String clientId = config.getValue(Config.OAUTH_CLIENT_ID); String clientSecret = config.getValue(Config.OAUTH_CLIENT_SECRET); + String bearerToken = config.getValue(ServerConfig.OAUTH_SERVER_BEARER_TOKEN); + String bearerTokenLocation = config.getValue(ServerConfig.OAUTH_SERVER_BEARER_TOKEN_LOCATION); + TokenProvider bearerTokenProvider = configureTokenProvider(bearerToken, bearerTokenLocation); if (checkAudience && clientId == null) { throw new ConfigException("OAuth validator configuration error: '" + Config.OAUTH_CLIENT_ID + "' must be set when '" @@ -295,7 +303,7 @@ public void delegatedConfigure(Map configs, String saslMechanism, Lis configureMetrics(configs); if (jwksUri != null) { - String effectiveConfigId = setupJWKSValidator(configId, jwksUri, validIssuerUri, checkTokenType, + String effectiveConfigId = setupJWKSValidator(configId, clientId, clientSecret, bearerTokenProvider, jwksUri, validIssuerUri, checkTokenType, usernameClaim, fallbackUsernameClaim, fallbackUsernamePrefix, groupQuery, groupDelimiter, audience, customClaimCheck, sslTruststore, sslPassword, sslType, sslRnd, includeAcceptHeader); @@ -303,8 +311,8 @@ public void delegatedConfigure(Map configs, String saslMechanism, Lis URI jwksEndpointUri = config.getValueAsURI(ServerConfig.OAUTH_JWKS_ENDPOINT_URI); validationSensorKeyProducer = new JwksValidationSensorKeyProducer(effectiveConfigId, saslMechanism, jwksEndpointUri); } else { - String effectiveConfigId = setupIntrospectionValidator(configId, validIssuerUri, usernameClaim, fallbackUsernameClaim, fallbackUsernamePrefix, - groupQuery, groupDelimiter, clientId, clientSecret, audience, customClaimCheck, + String effectiveConfigId = setupIntrospectionValidator(configId, clientId, clientSecret, bearerTokenProvider, validIssuerUri, usernameClaim, fallbackUsernameClaim, fallbackUsernamePrefix, + groupQuery, groupDelimiter, audience, customClaimCheck, sslTruststore, sslPassword, sslType, sslRnd, includeAcceptHeader); URI introspectionUri = config.getValueAsURI(ServerConfig.OAUTH_INTROSPECTION_ENDPOINT_URI); @@ -312,6 +320,24 @@ public void delegatedConfigure(Map configs, String saslMechanism, Lis } } + private TokenProvider configureTokenProvider(String token, String tokenLocation) { + if (tokenLocation != null) { + try { + if (token != null) { + log.warn("Server bearer token location is configured ('{}'), server bearer token will be ignored ('{}').", ServerConfig.OAUTH_SERVER_BEARER_TOKEN_LOCATION, ServerConfig.OAUTH_SERVER_BEARER_TOKEN); + } + return new FileBasedTokenProvider(tokenLocation); + + } catch (IllegalArgumentException e) { + throw new ConfigException("Specified server bearer token location is invalid ('" + ServerConfig.OAUTH_SERVER_BEARER_TOKEN_LOCATION + "'): " + e.getMessage()); + } + } + if (token != null) { + return new StaticTokenProvider(token); + } + return null; + } + private void configureMetrics(Map configs) { if (!Services.isAvailable()) { Services.configure(configs); @@ -324,15 +350,33 @@ private void configureMetrics(Map configs) { } @SuppressWarnings("checkstyle:ParameterNumber") - private String setupIntrospectionValidator(String configId, String validIssuerUri, String usernameClaim, String fallbackUsernameClaim, String fallbackUsernamePrefix, - String groupQuery, String groupDelimiter, String clientId, String clientSecret, String audience, String customClaimCheck, - String sslTruststore, String sslPassword, String sslType, String sslRnd, boolean includeAcceptHeader) { + private String setupIntrospectionValidator( + String configId, + String clientId, + String clientSecret, + TokenProvider bearerTokenProvider, + String validIssuerUri, + String usernameClaim, + String fallbackUsernameClaim, + String fallbackUsernamePrefix, + String groupQuery, + String groupDelimiter, + String audience, + String customClaimCheck, + String sslTruststore, + String sslPassword, + String sslType, + String sslRnd, + boolean includeAcceptHeader) { String introspectionEndpoint = config.getValue(ServerConfig.OAUTH_INTROSPECTION_ENDPOINT_URI); String userInfoEndpoint = config.getValue(ServerConfig.OAUTH_USERINFO_ENDPOINT_URI); String validTokenType = config.getValue(ServerConfig.OAUTH_VALID_TOKEN_TYPE); ValidatorKey vkey = new ValidatorKey.IntrospectionValidatorKey( + clientId, + clientSecret, + bearerTokenProvider != null ? bearerTokenProvider.toString() : null, validIssuerUri, audience, customClaimCheck, @@ -349,8 +393,6 @@ private String setupIntrospectionValidator(String configId, String validIssuerUr introspectionEndpoint, userInfoEndpoint, validTokenType, - clientId, - clientSecret, connectTimeout, readTimeout, enableMetrics, @@ -362,6 +404,9 @@ private String setupIntrospectionValidator(String configId, String validIssuerUr Supplier factory = () -> new OAuthIntrospectionValidator( effectiveConfigId, + clientId, + clientSecret, + bearerTokenProvider, introspectionEndpoint, socketFactory, verifier, @@ -371,8 +416,6 @@ private String setupIntrospectionValidator(String configId, String validIssuerUr validIssuerUri, userInfoEndpoint, validTokenType, - clientId, - clientSecret, audience, customClaimCheck, connectTimeout, @@ -389,10 +432,26 @@ private String setupIntrospectionValidator(String configId, String validIssuerUr } @SuppressWarnings("checkstyle:ParameterNumber") - private String setupJWKSValidator(String configId, String jwksUri, String validIssuerUri, boolean checkTokenType, - String usernameClaim, String fallbackUsernameClaim, String fallbackUsernamePrefix, - String groupQuery, String groupDelimiter, String audience, String customClaimCheck, - String sslTruststore, String sslPassword, String sslType, String sslRnd, boolean includeAcceptHeader) { + private String setupJWKSValidator( + String configId, + String clientId, + String clientSecret, + TokenProvider bearerTokenProvider, + String jwksUri, + String validIssuerUri, + boolean checkTokenType, + String usernameClaim, + String fallbackUsernameClaim, + String fallbackUsernamePrefix, + String groupQuery, + String groupDelimiter, + String audience, + String customClaimCheck, + String sslTruststore, + String sslPassword, + String sslType, + String sslRnd, + boolean includeAcceptHeader) { int jwksRefreshSeconds = config.getValueAsInt(ServerConfig.OAUTH_JWKS_REFRESH_SECONDS, 300); int jwksExpirySeconds = config.getValueAsInt(ServerConfig.OAUTH_JWKS_EXPIRY_SECONDS, 360); @@ -401,6 +460,9 @@ private String setupJWKSValidator(String configId, String jwksUri, String validI boolean jwksIgnoreKeyUse = config.getValueAsBoolean(ServerConfig.OAUTH_JWKS_IGNORE_KEY_USE, false); ValidatorKey vkey = new ValidatorKey.JwtValidatorKey( + clientId, + clientSecret, + bearerTokenProvider != null ? bearerTokenProvider.toString() : null, validIssuerUri, audience, customClaimCheck, @@ -431,6 +493,9 @@ private String setupJWKSValidator(String configId, String jwksUri, String validI Supplier factory = () -> new JWTSignatureValidator( effectiveConfigId, + clientId, + clientSecret, + bearerTokenProvider, jwksUri, socketFactory, verifier, @@ -561,7 +626,7 @@ private void configureHttpRetries(ServerConfig config) { @Override public void close() { - + validator.close(); } @Override diff --git a/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/ServerConfig.java b/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/ServerConfig.java index 7dc9ba1a..8e685126 100644 --- a/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/ServerConfig.java +++ b/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/ServerConfig.java @@ -109,6 +109,16 @@ public class ServerConfig extends Config { @Deprecated public static final String OAUTH_VALIDATION_SKIP_TYPE_CHECK = "oauth.validation.skip.type.check"; + /** + * "oauth.server.bearer.token" + */ + public static final String OAUTH_SERVER_BEARER_TOKEN = "oauth.server.bearer.token"; + + /** + * "oauth.server.bearer.token.location" + */ + public static final String OAUTH_SERVER_BEARER_TOKEN_LOCATION = "oauth.server.bearer.token.location"; + /** * Create a new instance */ diff --git a/testsuite/keycloak-auth-tests/docker-compose.yml b/testsuite/keycloak-auth-tests/docker-compose.yml index 6694e43e..ee6c1687 100644 --- a/testsuite/keycloak-auth-tests/docker-compose.yml +++ b/testsuite/keycloak-auth-tests/docker-compose.yml @@ -89,7 +89,7 @@ services: - KAFKA_LISTENER_NAME_JWTPLAIN_OAUTHBEARER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required oauth.config.id=\"JWTPLAIN\" oauth.jwks.endpoint.uri=\"http://keycloak:8080/auth/realms/kafka-authz/protocol/openid-connect/certs\" oauth.valid.issuer.uri=\"http://keycloak:8080/auth/realms/kafka-authz\" unsecuredLoginStringClaim_sub=\"admin\" ; - KAFKA_LISTENER_NAME_JWTPLAIN_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler - - KAFKA_LISTENER_NAME_JWTPLAIN_PLAIN_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required oauth.config.id=\"JWTPLAIN\" oauth.token.endpoint.uri=\"http://keycloak:8080/auth/realms/kafka-authz/protocol/openid-connect/token\" oauth.client.id=\"kafka\" oauth.client.secret=\"kafka-secret\" oauth.jwks.endpoint.uri=\"http://keycloak:8080/auth/realms/kafka-authz/protocol/openid-connect/certs\" oauth.valid.issuer.uri=\"http://keycloak:8080/auth/realms/kafka-authz\" unsecuredLoginStringClaim_sub=\"admin\" ; + - KAFKA_LISTENER_NAME_JWTPLAIN_PLAIN_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required oauth.config.id=\"JWTPLAIN\" oauth.token.endpoint.uri=\"http://keycloak:8080/auth/realms/kafka-authz/protocol/openid-connect/token\" oauth.jwks.endpoint.uri=\"http://keycloak:8080/auth/realms/kafka-authz/protocol/openid-connect/certs\" oauth.valid.issuer.uri=\"http://keycloak:8080/auth/realms/kafka-authz\" unsecuredLoginStringClaim_sub=\"admin\" ; - KAFKA_LISTENER_NAME_JWTPLAIN_PLAIN_SASL_SERVER_CALLBACK_HANDLER_CLASS=io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler - KAFKA_LISTENER_NAME_INTROSPECTPLAIN_SASL_ENABLED_MECHANISMS=OAUTHBEARER,PLAIN @@ -115,7 +115,7 @@ services: - KAFKA_LISTENER_NAME_FLOOD_OAUTHBEARER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required oauth.config.id=\"FLOOD\" oauth.jwks.endpoint.uri=\"http://keycloak:8080/auth/realms/flood/protocol/openid-connect/certs\" oauth.valid.issuer.uri=\"http://keycloak:8080/auth/realms/flood\" unsecuredLoginStringClaim_sub=\"admin\" ; - KAFKA_LISTENER_NAME_FLOOD_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler - - KAFKA_LISTENER_NAME_FLOOD_PLAIN_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required oauth.config.id=\"FLOOD\" oauth.token.endpoint.uri=\"http://keycloak:8080/auth/realms/flood/protocol/openid-connect/token\" oauth.client.id=\"kafka\" oauth.client.secret=\"kafka-secret\" oauth.jwks.endpoint.uri=\"http://keycloak:8080/auth/realms/flood/protocol/openid-connect/certs\" oauth.valid.issuer.uri=\"http://keycloak:8080/auth/realms/flood\" unsecuredLoginStringClaim_sub=\"admin\" ; + - KAFKA_LISTENER_NAME_FLOOD_PLAIN_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required oauth.config.id=\"FLOOD\" oauth.token.endpoint.uri=\"http://keycloak:8080/auth/realms/flood/protocol/openid-connect/token\" oauth.jwks.endpoint.uri=\"http://keycloak:8080/auth/realms/flood/protocol/openid-connect/certs\" oauth.valid.issuer.uri=\"http://keycloak:8080/auth/realms/flood\" unsecuredLoginStringClaim_sub=\"admin\" ; - KAFKA_LISTENER_NAME_FLOOD_PLAIN_SASL_SERVER_CALLBACK_HANDLER_CLASS=io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler - KAFKA_LISTENER_NAME_JWTPLAINWITHOUTCC_SASL_ENABLED_MECHANISMS=PLAIN diff --git a/testsuite/mock-oauth-server/src/main/java/io/strimzi/testsuite/oauth/server/AdminServerRequestHandler.java b/testsuite/mock-oauth-server/src/main/java/io/strimzi/testsuite/oauth/server/AdminServerRequestHandler.java index 59549e4c..c5b58ef2 100644 --- a/testsuite/mock-oauth-server/src/main/java/io/strimzi/testsuite/oauth/server/AdminServerRequestHandler.java +++ b/testsuite/mock-oauth-server/src/main/java/io/strimzi/testsuite/oauth/server/AdminServerRequestHandler.java @@ -269,16 +269,22 @@ private void processClientsRequest(HttpServerRequest req, String[] path) { String secret = json.getString("secret"); String clientAssertion = json.getString("clientAssertion"); - if (secret == null) { - if (clientAssertion == null) { - sendResponse(req, BAD_REQUEST, "Required attribute 'secret' is null or missing."); - return; - } else { - secret = clientAssertion; + String bearerToken = json.getString("bearerToken"); + + if (bearerToken != null) { + verticle.createOrUpdateClient(clientId, bearerToken); + } else { + if (secret == null) { + if (clientAssertion == null) { + sendResponse(req, BAD_REQUEST, "One of ['bearerToken', 'secret'] attributes has to be set."); + return; + } else { + secret = clientAssertion; + } } + verticle.createOrUpdateClient(clientId, secret); } - verticle.createOrUpdateClient(clientId, secret); sendResponse(req, OK); } catch (Exception e) { diff --git a/testsuite/mock-oauth-server/src/main/java/io/strimzi/testsuite/oauth/server/AuthServerRequestHandler.java b/testsuite/mock-oauth-server/src/main/java/io/strimzi/testsuite/oauth/server/AuthServerRequestHandler.java index ff4c9398..abd56fe8 100644 --- a/testsuite/mock-oauth-server/src/main/java/io/strimzi/testsuite/oauth/server/AuthServerRequestHandler.java +++ b/testsuite/mock-oauth-server/src/main/java/io/strimzi/testsuite/oauth/server/AuthServerRequestHandler.java @@ -28,7 +28,10 @@ import java.security.NoSuchAlgorithmException; import java.util.Date; -import java.util.function.Consumer; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN; @@ -56,6 +59,8 @@ import static io.strimzi.testsuite.oauth.server.Endpoint.USERINFO; import static io.strimzi.testsuite.oauth.server.Mode.MODE_200; import static io.strimzi.testsuite.oauth.server.Mode.MODE_200_DELAYED; +import static io.strimzi.testsuite.oauth.server.Mode.MODE_200_PROTECTED; +import static io.strimzi.testsuite.oauth.server.Mode.MODE_200_UNPROTECTED; import static io.strimzi.testsuite.oauth.server.Mode.MODE_FAILING_500; import static io.strimzi.testsuite.oauth.server.Mode.MODE_JWKS_RSA_WITHOUT_SIG_USE; import static io.strimzi.testsuite.oauth.server.Mode.MODE_JWKS_RSA_WITH_SIG_USE; @@ -109,18 +114,18 @@ public void handle(HttpServerRequest req) { private boolean processRequest(Endpoint endpoint, Mode mode, HttpServerRequest req) throws NoSuchAlgorithmException, JOSEException, InterruptedException { if (endpoint == Endpoint.JWKS && - isOneOf(mode, MODE_200, MODE_JWKS_RSA_WITH_SIG_USE, MODE_JWKS_RSA_WITHOUT_SIG_USE)) { + isOneOf(mode, MODE_200, MODE_200_PROTECTED, MODE_JWKS_RSA_WITH_SIG_USE, MODE_JWKS_RSA_WITHOUT_SIG_USE)) { processJwksRequest(req, mode); } else if (endpoint == TOKEN && mode == MODE_200) { - processTokenRequest(req); + processTokenRequest(req, mode); } else if (endpoint == FAILING_TOKEN) { processFailingRequest(req, endpoint, mode, this::processTokenRequest); - } else if (endpoint == INTROSPECT && mode == MODE_200) { - processIntrospectRequest(req); + } else if (endpoint == INTROSPECT && (mode == MODE_200 || mode == MODE_200_UNPROTECTED)) { + processIntrospectRequest(req, mode); } else if (endpoint == FAILING_INTROSPECT) { processFailingRequest(req, endpoint, mode, this::processIntrospectRequest); } else if (endpoint == USERINFO && mode == MODE_200) { - processUserInfoRequest(req); + processUserInfoRequest(req, mode); } else if (endpoint == FAILING_USERINFO) { processFailingRequest(req, endpoint, mode, this::processUserInfoRequest); } else if (endpoint == GRANTS && (mode == MODE_200 || mode == MODE_200_DELAYED)) { @@ -128,9 +133,9 @@ private boolean processRequest(Endpoint endpoint, Mode mode, HttpServerRequest r //verticle.getVertx().setTimer(1000, v -> processGrantsRequest(req)); Thread.sleep(2000); - processGrantsRequest(req); + processGrantsRequest(req, mode); } else { - processGrantsRequest(req); + processGrantsRequest(req, mode); } } else if (endpoint == FAILING_GRANTS) { processFailingRequest(req, endpoint, mode, this::processGrantsRequest); @@ -176,7 +181,7 @@ private static boolean generateResponse(HttpServerRequest req, Mode mode) { return result; } - private void processGrantsRequest(HttpServerRequest req) { + private void processGrantsRequest(HttpServerRequest req, Mode mode) { if (req.method() != POST) { sendResponse(req, METHOD_NOT_ALLOWED); return; @@ -243,7 +248,7 @@ private void processGrantsRequest(HttpServerRequest req) { }); } - private void processTokenRequest(HttpServerRequest req) { + private void processTokenRequest(HttpServerRequest req, Mode mode) { if (req.method() != POST) { sendResponse(req, METHOD_NOT_ALLOWED); return; @@ -356,7 +361,7 @@ private boolean supportedGrantType(String grantType, String clientId) { ("client_credentials".equals(grantType) || "password".equals(grantType) || "refresh_token".equals(grantType)); } - private void processIntrospectRequest(HttpServerRequest req) { + private void processIntrospectRequest(HttpServerRequest req, Mode mode) { if (req.method() != POST) { sendResponse(req, METHOD_NOT_ALLOWED); return; @@ -375,12 +380,14 @@ private void processIntrospectRequest(HttpServerRequest req) { return; } - String authorization = req.headers().get("Authorization"); - String clientId = authorizeClient(authorization); + if (mode != MODE_200_UNPROTECTED) { + String authorization = req.headers().get("Authorization"); + String clientId = authorizeClient(authorization); - if (clientId == null) { - sendResponse(req, UNAUTHORIZED); - return; + if (clientId == null) { + sendResponse(req, UNAUTHORIZED); + return; + } } // Let's take the info from the token itself @@ -432,7 +439,7 @@ private void processIntrospectRequest(HttpServerRequest req) { }); } - private void processFailingRequest(HttpServerRequest req, Endpoint endpoint, Mode mode, Consumer requestCompletingFunction) { + private void processFailingRequest(HttpServerRequest req, Endpoint endpoint, Mode mode, BiConsumer requestCompletingFunction) { // Always fail without a coinflip if this mode is used if (mode == MODE_FAILING_500) { @@ -445,10 +452,10 @@ private void processFailingRequest(HttpServerRequest req, Endpoint endpoint, Mod // fail if (generateResponse(req, mode)) return; } - requestCompletingFunction.accept(req); + requestCompletingFunction.accept(req, mode); } - private void processUserInfoRequest(HttpServerRequest req) { + private void processUserInfoRequest(HttpServerRequest req, Mode mode) { if (req.method() != GET) { sendResponse(req, METHOD_NOT_ALLOWED); return; @@ -461,7 +468,7 @@ private void processUserInfoRequest(HttpServerRequest req) { } String token = authorization.substring(7); - if (token.length() == 0) { + if (token.isEmpty()) { sendResponse(req, BAD_REQUEST); return; } @@ -557,26 +564,46 @@ private String createRefreshToken(String clientId, String username) { } private String authorizeClient(String authorization) { - if (authorization == null || !authorization.startsWith("Basic ")) { - return null; - } - String decoded = base64decode(authorization.substring(6)); - int pos = decoded.indexOf(":"); - if (pos == -1) { + if (authorization == null) { return null; } - String clientId = decoded.substring(0, pos); - String secret = decoded.substring(pos + 1); + if (authorization.startsWith("Basic ")) { + String decoded = base64decode(authorization.substring(6)); + int pos = decoded.indexOf(":"); + if (pos == -1) { + return null; + } - String existingClientSecret = verticle.getClients().get(clientId); - if (existingClientSecret == null) { - log.info("Unknown clientId: " + clientId); + String clientId = decoded.substring(0, pos); + String secret = decoded.substring(pos + 1); + + String existingClientSecret = verticle.getClients().get(clientId); + if (existingClientSecret == null) { + log.info("Unknown clientId: " + clientId); + } + if (!secret.equals(existingClientSecret)) { + return null; + } + return clientId; } - if (!secret.equals(existingClientSecret)) { - return null; + + if (authorization.startsWith("Bearer ")) { + String token = authorization.substring(7); + + List matchingClients = verticle.getClients().entrySet().stream() + .filter(e -> token.equals(e.getValue())) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + + if (matchingClients.isEmpty()) { + log.info("Unknown service account token: " + token); + } else { + return matchingClients.get(0); + } } - return clientId; + + return null; } private String authorizeClientUsingAssertion(final String clientId, final String clientAssertion, final String clientAssertionType) { @@ -606,6 +633,16 @@ private void processJwksRequest(HttpServerRequest req, Mode mode) throws NoSuchA return; } switch (mode) { + case MODE_200_PROTECTED: + String authorization = req.headers().get("Authorization"); + String clientId = authorizeClient(authorization); + + if (clientId == null) { + sendResponse(req, UNAUTHORIZED); + return; + } + // no break; on purpose + // pass through to next case block case MODE_200: case MODE_JWKS_RSA_WITH_SIG_USE: sendResponse(req, OK, jwksWithSig()); diff --git a/testsuite/mock-oauth-server/src/main/java/io/strimzi/testsuite/oauth/server/Mode.java b/testsuite/mock-oauth-server/src/main/java/io/strimzi/testsuite/oauth/server/Mode.java index 0fcedc1f..539d69f4 100644 --- a/testsuite/mock-oauth-server/src/main/java/io/strimzi/testsuite/oauth/server/Mode.java +++ b/testsuite/mock-oauth-server/src/main/java/io/strimzi/testsuite/oauth/server/Mode.java @@ -8,6 +8,10 @@ enum Mode { MODE_200, + /** Used by jwks endpoint, which by default does not require 'Authorization' header, but requires it if this mode is enabled*/ + MODE_200_PROTECTED, + /** Used by introspection endpoint, which by default requires 'Authorization' header, but not if this mode is enabled */ + MODE_200_UNPROTECTED, MODE_400, MODE_401, MODE_403, diff --git a/testsuite/mockoauth-tests/docker-compose.yml b/testsuite/mockoauth-tests/docker-compose.yml index 1d01b785..9af6e973 100644 --- a/testsuite/mockoauth-tests/docker-compose.yml +++ b/testsuite/mockoauth-tests/docker-compose.yml @@ -75,7 +75,7 @@ services: - KAFKA_LISTENER_NAME_JWTPLAIN_OAUTHBEARER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required oauth.config.id=\"JWTPLAIN\" oauth.fail.fast=\"false\" oauth.jwks.endpoint.uri=\"https://mockoauth:8090/jwks\" oauth.valid.issuer.uri=\"https://mockoauth:8090\" unsecuredLoginStringClaim_sub=\"admin\" ; - KAFKA_LISTENER_NAME_JWTPLAIN_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler - - KAFKA_LISTENER_NAME_JWTPLAIN_PLAIN_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required oauth.config.id=\"JWTPLAIN\" oauth.token.endpoint.uri=\"https://mockoauth:8090/token\" oauth.client.id=\"kafka\" oauth.client.secret=\"kafka-secret\" oauth.fail.fast=\"false\" oauth.jwks.endpoint.uri=\"https://mockoauth:8090/jwks\" oauth.valid.issuer.uri=\"https://mockoauth:8090\" unsecuredLoginStringClaim_sub=\"admin\" ; + - KAFKA_LISTENER_NAME_JWTPLAIN_PLAIN_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required oauth.config.id=\"JWTPLAIN\" oauth.token.endpoint.uri=\"https://mockoauth:8090/token\" oauth.fail.fast=\"false\" oauth.jwks.endpoint.uri=\"https://mockoauth:8090/jwks\" oauth.valid.issuer.uri=\"https://mockoauth:8090\" unsecuredLoginStringClaim_sub=\"admin\" ; - KAFKA_LISTENER_NAME_JWTPLAIN_PLAIN_SASL_SERVER_CALLBACK_HANDLER_CLASS=io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler - KAFKA_LISTENER_NAME_PLAIN_SASL_ENABLED_MECHANISMS=PLAIN @@ -96,7 +96,7 @@ services: - KAFKA_LISTENER_NAME_FAILINGJWT_OAUTHBEARER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required oauth.config.id=\"FAILINGJWT\" oauth.fail.fast=\"false\" oauth.check.access.token.type=\"false\" oauth.jwks.endpoint.uri=\"https://mockoauth:8090/jwks\" oauth.jwks.refresh.seconds=\"10\" oauth.valid.issuer.uri=\"https://mockoauth:8090\" unsecuredLoginStringClaim_sub=\"admin\" ; - KAFKA_LISTENER_NAME_FAILINGJWT_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler - - KAFKA_LISTENER_NAME_FAILINGJWT_PLAIN_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required oauth.config.id=\"FAILINGJWT\" oauth.fail.fast=\"false\" oauth.check.access.token.type=\"false\" oauth.jwks.endpoint.uri=\"https://mockoauth:8090/jwks\" oauth.jwks.refresh.seconds=\"10\" oauth.valid.issuer.uri=\"https://mockoauth:8090\" oauth.token.endpoint.uri=\"https://mockoauth:8090/failing_token\" oauth.client.id=\"kafka\" oauth.client.secret=\"kafka-secret\" oauth.http.retries=\"1\" oauth.http.retry.pause.millis=\"3000\" unsecuredLoginStringClaim_sub=\"admin\" ; + - KAFKA_LISTENER_NAME_FAILINGJWT_PLAIN_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required oauth.config.id=\"FAILINGJWT\" oauth.fail.fast=\"false\" oauth.check.access.token.type=\"false\" oauth.jwks.endpoint.uri=\"https://mockoauth:8090/jwks\" oauth.jwks.refresh.seconds=\"10\" oauth.valid.issuer.uri=\"https://mockoauth:8090\" oauth.token.endpoint.uri=\"https://mockoauth:8090/failing_token\" oauth.http.retries=\"1\" oauth.http.retry.pause.millis=\"3000\" unsecuredLoginStringClaim_sub=\"admin\" ; - KAFKA_LISTENER_NAME_FAILINGJWT_PLAIN_SASL_SERVER_CALLBACK_HANDLER_CLASS=io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler @@ -115,7 +115,7 @@ services: # The following value will be available as env var STRIMZI_OAUTH_METRIC_REPORTERS - STRIMZI_OAUTH_METRIC_REPORTERS=org.apache.kafka.common.metrics.JmxReporter - # The following value will turn to 'strimzi.oauth.metric.reporters=...' in 'strimzi.properties' file + # The following value will turn into 'strimzi.oauth.metric.reporters=...' in 'strimzi.properties' file # However, that won't work as the value may be filtered to the component that happens to initialise OAuthMetrics #- KAFKA_STRIMZI_OAUTH_METRIC_REPORTERS=org.apache.kafka.common.metrics.JmxReporter diff --git a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/MockOAuthTests.java b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/MockOAuthTests.java index 2108e8e6..5d4791f5 100644 --- a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/MockOAuthTests.java +++ b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/MockOAuthTests.java @@ -6,6 +6,7 @@ import io.strimzi.testsuite.oauth.common.TestContainersLogCollector; import io.strimzi.testsuite.oauth.common.TestContainersWatcher; +import io.strimzi.testsuite.oauth.mockoauth.AuthorizationEndpointsTest; import io.strimzi.testsuite.oauth.mockoauth.JaasServerConfigTest; import io.strimzi.testsuite.oauth.mockoauth.metrics.MetricsTest; import io.strimzi.testsuite.oauth.mockoauth.ClientAssertionAuthTest; @@ -61,7 +62,7 @@ public void runTests() throws Exception { String kafkaContainer = environment.getContainerByServiceName("kafka_1").get().getContainerInfo().getName().substring(1); System.out.println("See log at: " + new File("target/test.log").getAbsolutePath()); - // MetricsTest has to be the first as it relies on initial configuration and behaviour of mockoauth + // MetricsTest has to be the first as it relies on initial configuration and behaviour of mockoauth server // JWKS endpoint is expected to return 404 // Subsequent tests can change that, but it takes some seconds for Kafka to retry fetching JWKS keys logStart("MetricsTest :: Basic Metrics Tests"); @@ -73,6 +74,9 @@ public void runTests() throws Exception { logStart("JaasClientConfigTest :: Client Configuration Tests"); new JaasClientConfigTest().doTest(); + logStart("AuthorizationEndpointTest :: Server Configuration Tests"); + new AuthorizationEndpointsTest().doTest(); + logStart("JaasServerConfigTest :: Server Configuration Tests"); new JaasServerConfigTest().doTest(); diff --git a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/AuthorizationEndpointsTest.java b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/AuthorizationEndpointsTest.java new file mode 100644 index 00000000..8a17979b --- /dev/null +++ b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/AuthorizationEndpointsTest.java @@ -0,0 +1,266 @@ +/* + * Copyright 2017-2024, Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.testsuite.oauth.mockoauth; + +import io.strimzi.kafka.oauth.common.HttpException; +import io.strimzi.kafka.oauth.common.OAuthAuthenticator; +import io.strimzi.kafka.oauth.common.SSLUtil; +import io.strimzi.kafka.oauth.common.TokenInfo; +import io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler; +import io.strimzi.kafka.oauth.server.OAuthSaslAuthenticationException; +import io.strimzi.kafka.oauth.server.ServerConfig; +import io.strimzi.kafka.oauth.services.ServiceException; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback; +import org.jetbrains.annotations.NotNull; +import org.junit.Assert; + +import javax.net.ssl.SSLSocketFactory; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.auth.login.AppConfigurationEntry; + +import java.io.IOException; +import java.net.URI; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static io.strimzi.kafka.oauth.common.IOUtil.randomHexString; +import static io.strimzi.testsuite.oauth.common.TestUtil.getRootCause; +import static io.strimzi.testsuite.oauth.mockoauth.Common.changeAuthServerMode; +import static io.strimzi.testsuite.oauth.mockoauth.Common.createOAuthClient; + +public class AuthorizationEndpointsTest { + + public void doTest() throws Exception { + + changeAuthServerMode("token", "MODE_200"); + + // create a client for resource server + String clientSrv = "appserver"; + String clientSrvSecret = "appserver-secret"; + createOAuthClient(clientSrv, clientSrvSecret); + + // create a bearer client + String clientSrvBearer = "appserver-bearer"; + String clientSrvBearerToken = randomHexString(32); + createOAuthClient(clientSrvBearer, clientSrvBearerToken); + + // create a client for client app + String clientApp = "client"; + String clientAppSecret = "client-secret"; + createOAuthClient(clientApp, clientAppSecret); + + // prepare TLS support + String projectRoot = Common.getProjectRoot(); + SSLSocketFactory sslFactory = SSLUtil.createSSLFactory( + projectRoot + "/../docker/certificates/ca-truststore.p12", null, "changeit", null, null); + + // Login with client app's client_id + secret to obtain an access token + TokenInfo tokenInfo = OAuthAuthenticator.loginWithClientSecret( + URI.create("https://mockoauth:8090/token"), + sslFactory, + null, + clientApp, + clientAppSecret, + true, + null, + null, + true); + + OAuthBearerValidatorCallback[] oauthCallbacks = {new OAuthBearerValidatorCallback(tokenInfo.token())}; + + + introspectEndpointTests(oauthCallbacks, clientSrv, clientSrvSecret, clientSrvBearerToken); + + + jwksEndpointTests(oauthCallbacks, clientSrv, clientSrvSecret, clientSrvBearerToken); + } + + private static void introspectEndpointTests(OAuthBearerValidatorCallback[] oauthCallbacks, String clientSrv, String clientSrvSecret, String clientSrvBearerToken) throws IOException, UnsupportedCallbackException { + + // introspect with clientid + secret + + // set mock auth server introspection endpoint mode - by default authentication is required by appserver + changeAuthServerMode("introspect", "MODE_200"); + changeAuthServerMode("userinfo", "MODE_200"); + + // configure validator with wrong client_id + secret + Map attrs = new HashMap<>(); + + //attrs.put(ServerConfig.OAUTH_CONFIG_ID, "config-id-introspect"); + attrs.put(ServerConfig.OAUTH_INTROSPECTION_ENDPOINT_URI, "https://mockoauth:8090/introspect"); + attrs.put(ServerConfig.OAUTH_USERINFO_ENDPOINT_URI, "https://mockoauth:8090/userinfo"); + attrs.put(ServerConfig.OAUTH_USERNAME_CLAIM, "uid"); + attrs.put(ServerConfig.OAUTH_CLIENT_ID, "bad-client-id"); + attrs.put(ServerConfig.OAUTH_CLIENT_SECRET, "bad-client-secret"); + attrs.put(ServerConfig.OAUTH_VALID_ISSUER_URI, "https://mockoauth:8090"); + attrs.put(ServerConfig.OAUTH_SSL_TRUSTSTORE_LOCATION, "../docker/target/kafka/certs/ca-truststore.p12"); + attrs.put(ServerConfig.OAUTH_SSL_TRUSTSTORE_PASSWORD, "changeit"); + attrs.put(ServerConfig.OAUTH_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, ""); + attrs.put(ServerConfig.OAUTH_SSL_TRUSTSTORE_TYPE, "pkcs12"); + + JaasServerOauthValidatorCallbackHandler handler = reconfigureHandler(attrs); + + // validate token + try { + handler.handle(oauthCallbacks); + Assert.fail("Should have failed"); + } catch (Exception exception) { + Assert.assertEquals("Exception is OAuthSaslAuthenticationException", OAuthSaslAuthenticationException.class, exception.getClass()); + Throwable cause = getRootCause(exception); + Assert.assertNotNull("Exception has a cause", cause); + Assert.assertEquals("Cause is HttpException", HttpException.class, cause.getClass()); + Assert.assertTrue("Error message check", cause.getMessage().contains("introspect failed with status 401")); + } + handler.close(); + + // configure validator with correct client_id + secret + attrs.put(ServerConfig.OAUTH_CLIENT_ID, clientSrv); + attrs.put(ServerConfig.OAUTH_CLIENT_SECRET, clientSrvSecret); + + handler = reconfigureHandler(attrs); + + // validate token + handler.handle(oauthCallbacks); + handler.close(); + + // introspect with bearer token + + // configure validator with wrong bearer token + attrs.remove(ServerConfig.OAUTH_CLIENT_ID); + attrs.remove(ServerConfig.OAUTH_CLIENT_SECRET); + attrs.put(ServerConfig.OAUTH_SERVER_BEARER_TOKEN, "bad-token"); + handler = reconfigureHandler(attrs); + + // validate token + try { + handler.handle(oauthCallbacks); + Assert.fail("Should have failed"); + } catch (Exception exception) { + Assert.assertEquals("Exception is OAuthSaslAuthenticationException", OAuthSaslAuthenticationException.class, exception.getClass()); + Throwable cause = getRootCause(exception); + Assert.assertNotNull("Exception has a cause", cause); + Assert.assertEquals("Cause is HttpException", HttpException.class, cause.getClass()); + Assert.assertTrue("Error message check", cause.getMessage().contains("introspect failed with status 401")); + } + handler.close(); + + + // configure validator with correct bearer token + attrs.put(ServerConfig.OAUTH_SERVER_BEARER_TOKEN, clientSrvBearerToken); + handler = reconfigureHandler(attrs); + + // validate token + handler.handle(oauthCallbacks); + handler.close(); + + + // unprotected introspect + changeAuthServerMode("introspect", "MODE_200_UNPROTECTED"); + + // configure validator without client_id or bearer token + attrs.remove(ServerConfig.OAUTH_SERVER_BEARER_TOKEN); + handler = reconfigureHandler(attrs); + + // validate token + handler.handle(oauthCallbacks); + handler.close(); + } + + + private static void jwksEndpointTests(OAuthBearerValidatorCallback[] oauthCallbacks, String clientSrv, String clientSrvSecret, String clientSrvBearerToken) throws IOException, UnsupportedCallbackException { + + // jwks with clientid + secret + + // set mock auth server jwks endpoint mode - by default NO authentication is required by appserver + changeAuthServerMode("jwks", "MODE_200_PROTECTED"); + + // configure validator with wrong client_id + secret + Map attrs = new HashMap<>(); + + //attrs.put(ServerConfig.OAUTH_CONFIG_ID, "config-id-jwks"); + attrs.put(ServerConfig.OAUTH_CLIENT_ID, "bad-client-id"); + attrs.put(ServerConfig.OAUTH_CLIENT_SECRET, "bad-client-secret"); + attrs.put(ServerConfig.OAUTH_JWKS_ENDPOINT_URI, "https://mockoauth:8090/jwks"); + attrs.put(ServerConfig.OAUTH_VALID_ISSUER_URI, "https://mockoauth:8090"); + attrs.put(ServerConfig.OAUTH_CHECK_ACCESS_TOKEN_TYPE, "false"); + attrs.put(ServerConfig.OAUTH_SSL_TRUSTSTORE_LOCATION, "../docker/target/kafka/certs/ca-truststore.p12"); + attrs.put(ServerConfig.OAUTH_SSL_TRUSTSTORE_PASSWORD, "changeit"); + attrs.put(ServerConfig.OAUTH_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, ""); + attrs.put(ServerConfig.OAUTH_SSL_TRUSTSTORE_TYPE, "pkcs12"); + + JaasServerOauthValidatorCallbackHandler handler; + try { + reconfigureHandler(attrs); + } catch (Exception exception) { + Assert.assertEquals("Exception is ServiceException", ServiceException.class, exception.getClass()); + Assert.assertTrue("Error message check", exception.getMessage().contains("Failed to fetch public keys")); + Throwable cause = exception.getCause(); + Assert.assertEquals("Cause is HttpException", HttpException.class, cause.getClass()); + Assert.assertTrue("Cause message check", cause.getMessage().contains("jwks failed with status 401")); + + } + + // configure validator with correct client_id + secret + attrs.put(ServerConfig.OAUTH_CLIENT_ID, clientSrv); + attrs.put(ServerConfig.OAUTH_CLIENT_SECRET, clientSrvSecret); + + handler = reconfigureHandler(attrs); + + // validate token + handler.handle(oauthCallbacks); + handler.close(); + + // jwks with bearer token + + // configure validator with wrong bearer token + attrs.remove(ServerConfig.OAUTH_CLIENT_ID); + attrs.remove(ServerConfig.OAUTH_CLIENT_SECRET); + attrs.put(ServerConfig.OAUTH_SERVER_BEARER_TOKEN, "bad-token"); + + try { + reconfigureHandler(attrs); + } catch (Exception exception) { + Assert.assertEquals("Exception is ServiceException", ServiceException.class, exception.getClass()); + Assert.assertTrue("Error message check", exception.getMessage().contains("Failed to fetch public keys")); + Throwable cause = exception.getCause(); + Assert.assertEquals("Cause is HttpException", HttpException.class, cause.getClass()); + Assert.assertTrue("Cause message check", cause.getMessage().contains("jwks failed with status 401")); + } + + // configure validator with correct bearer token + attrs.put(ServerConfig.OAUTH_SERVER_BEARER_TOKEN, clientSrvBearerToken); + handler = reconfigureHandler(attrs); + + // validate token + handler.handle(oauthCallbacks); + handler.close(); + + + // unprotected jwks + changeAuthServerMode("jwks", "MODE_200"); + + // configure validator without client_id or bearer token + attrs.remove(ServerConfig.OAUTH_SERVER_BEARER_TOKEN); + handler = reconfigureHandler(attrs); + + // validate token + handler.handle(oauthCallbacks); + handler.close(); + } + + + @NotNull + private static JaasServerOauthValidatorCallbackHandler reconfigureHandler(Map attrs) { + Map serverProps = new HashMap<>(); + serverProps.put("security.protocol", "SASL_PLAINTEXT"); + serverProps.put("sasl.mechanism", "OAUTHBEARER"); + + AppConfigurationEntry jaasConfig = new AppConfigurationEntry("org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule", AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, attrs); + JaasServerOauthValidatorCallbackHandler handler = new JaasServerOauthValidatorCallbackHandler(); + handler.configure(serverProps, "OAUTHBEARER", Collections.singletonList(jaasConfig)); + return handler; + } +} diff --git a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/JWKSKeyUseTest.java b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/JWKSKeyUseTest.java index 06bb8edd..1e34b8f5 100644 --- a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/JWKSKeyUseTest.java +++ b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/JWKSKeyUseTest.java @@ -77,6 +77,9 @@ public void doTest() throws Exception { private static JWTSignatureValidator createTokenValidator(String validatorId, SSLSocketFactory sslFactory, boolean ignoreKeyUse) { return new JWTSignatureValidator(validatorId, + null, + null, + null, "https://mockoauth:8090/jwks", sslFactory, null, diff --git a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/JaasServerConfigTest.java b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/JaasServerConfigTest.java index 36de7eba..7e77c6c8 100644 --- a/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/JaasServerConfigTest.java +++ b/testsuite/mockoauth-tests/src/test/java/io/strimzi/testsuite/oauth/mockoauth/JaasServerConfigTest.java @@ -4,13 +4,15 @@ */ package io.strimzi.testsuite.oauth.mockoauth; +import io.strimzi.kafka.oauth.common.ConfigException; import io.strimzi.kafka.oauth.metrics.GlobalConfig; import io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler; import io.strimzi.kafka.oauth.server.ServerConfig; +import org.junit.Assert; import javax.security.auth.login.AppConfigurationEntry; import java.io.IOException; -import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -22,13 +24,23 @@ public void doTest() throws Exception { private void testAllConfigOptions() throws IOException { + testJwksValidatorOptions(); + + testIntrospectValidatorOptions(); + } + + private void testJwksValidatorOptions() throws IOException { + // Fast local JWT check - JaasServerOauthValidatorCallbackHandler handler = new JaasServerOauthValidatorCallbackHandler(); + JaasServerOauthValidatorCallbackHandler handler = new JaasServerOauthValidatorCallbackHandler(); Map attrs = new HashMap<>(); - // Fast local JWT check + // Fast local JWT checks + // Check #1 attrs.put(ServerConfig.OAUTH_CONFIG_ID, "config-id"); + attrs.put(ServerConfig.OAUTH_CLIENT_ID, "client-id"); + attrs.put(ServerConfig.OAUTH_CLIENT_SECRET, "client-secret"); attrs.put(ServerConfig.OAUTH_JWKS_ENDPOINT_URI, "https://sso/jwks"); attrs.put(ServerConfig.OAUTH_FAIL_FAST, "false"); attrs.put(ServerConfig.OAUTH_USERNAME_CLAIM, "username-claim"); @@ -36,7 +48,6 @@ private void testAllConfigOptions() throws IOException { attrs.put(ServerConfig.OAUTH_FALLBACK_USERNAME_PREFIX, "fallback-username-prefix"); attrs.put(ServerConfig.OAUTH_GROUPS_CLAIM, "$.groups"); attrs.put(ServerConfig.OAUTH_GROUPS_CLAIM_DELIMITER, ","); - attrs.put(ServerConfig.OAUTH_CLIENT_ID, "client-id"); attrs.put(ServerConfig.OAUTH_CHECK_AUDIENCE, "true"); attrs.put(ServerConfig.OAUTH_CUSTOM_CLAIM_CHECK, "@.aud anyof ['kafka', 'something']"); attrs.put(ServerConfig.OAUTH_JWKS_REFRESH_SECONDS, "10"); @@ -64,39 +75,119 @@ private void testAllConfigOptions() throws IOException { LogLineReader logReader = new LogLineReader(Common.LOG_PATH); logReader.readNext(); - handler.configure(serverProps, "OAUTHBEARER", Arrays.asList(jaasConfig)); + handler.configure(serverProps, "OAUTHBEARER", Collections.singletonList(jaasConfig)); Common.checkLog(logReader, "JWTSignatureValidator", "", - "validatorId", "config-id", - "keysEndpointUri", "https://sso/jwks", - "usernameClaim", "username-claim", - "fallbackUsernameClaim", "fallback-username-claim", - "fallbackUsernamePrefix", "username-prefix", - "groupsClaimQuery", "\\$\\.groups", - "groupsClaimDelimiter", ",", - "validIssuerUri", "https://sso", - "hostnameVerifier", "SSLUtil", - "sslSocketFactory", "SSLSocketFactoryImpl", - "certsRefreshSeconds", "10", - "certsRefreshMinPauseSeconds", "2", - "certsExpirySeconds", "900", - "certsIgnoreKeyUse", "true", - "checkAccessTokenType", "false", - "audience", "client-id", - "customClaimCheck", "@\\.aud anyof \\['kafka', 'something'\\]", - "connectTimeoutSeconds", "10", - "readTimeoutSeconds", "10", - "enableMetrics", "true", - "failFast", "false", - "includeAcceptHeader", "false" + "validatorId", "config-id", + "clientId", "client-id", + "clientSecret", "c\\*\\*", + "bearerTokenProvider", "null", + "keysEndpointUri", "https://sso/jwks", + "usernameClaim", "username-claim", + "fallbackUsernameClaim", "fallback-username-claim", + "fallbackUsernamePrefix", "username-prefix", + "groupsClaimQuery", "\\$\\.groups", + "groupsClaimDelimiter", ",", + "validIssuerUri", "https://sso", + "hostnameVerifier", "SSLUtil", + "sslSocketFactory", "SSLSocketFactoryImpl", + "certsRefreshSeconds", "10", + "certsRefreshMinPauseSeconds", "2", + "certsExpirySeconds", "900", + "certsIgnoreKeyUse", "true", + "checkAccessTokenType", "false", + "audience", "client-id", + "customClaimCheck", "@\\.aud anyof \\['kafka', 'something'\\]", + "connectTimeoutSeconds", "10", + "readTimeoutSeconds", "10", + "enableMetrics", "true", + "failFast", "false", + "includeAcceptHeader", "false" ); // principalExtractor: PrincipalExtractor {usernameClaim: io.strimzi.kafka.oauth.common.PrincipalExtractor$Extractor@1e5f4170, fallbackUsernameClaim: null, fallbackUsernamePrefix: null} + // Check #2 + attrs.put(ServerConfig.OAUTH_CONFIG_ID, "config-id-2"); + attrs.remove(ServerConfig.OAUTH_CLIENT_ID); + attrs.remove(ServerConfig.OAUTH_CLIENT_SECRET); + attrs.put(ServerConfig.OAUTH_SERVER_BEARER_TOKEN, "server-bearer-token"); + + jaasConfig = new AppConfigurationEntry("org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule", AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, attrs); + handler = new JaasServerOauthValidatorCallbackHandler(); + + logReader.readNext(); + try { + handler.configure(serverProps, "OAUTHBEARER", Collections.singletonList(jaasConfig)); + } catch (Exception exception) { + Assert.assertEquals("Exception is ConfigException", ConfigException.class, exception.getClass()); + Assert.assertTrue("Error message check", exception.getMessage().contains("'oauth.client.id' must be set when 'oauth.check.audience' is 'true'")); + } + + // Check #3, relies on check #2 + attrs.put(ServerConfig.OAUTH_CONFIG_ID, "config-id-3"); + attrs.remove(ServerConfig.OAUTH_CHECK_AUDIENCE); + + jaasConfig = new AppConfigurationEntry("org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule", AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, attrs); + handler = new JaasServerOauthValidatorCallbackHandler(); + + logReader.readNext(); + handler.configure(serverProps, "OAUTHBEARER", Collections.singletonList(jaasConfig)); + + Common.checkLog(logReader, "JWTSignatureValidator", "", + "clientId", "null", + "clientSecret", "null", + "bearerTokenProvider", "token: 's\\*\\*", + "audience", "null" + ); + + // Check #4 + attrs.put(ServerConfig.OAUTH_CONFIG_ID, "config-id-4"); + attrs.put(ServerConfig.OAUTH_CLIENT_ID, "client-id"); + attrs.put(ServerConfig.OAUTH_CLIENT_SECRET, "client-secret"); + + jaasConfig = new AppConfigurationEntry("org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule", AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, attrs); + handler = new JaasServerOauthValidatorCallbackHandler(); + + logReader.readNext(); + try { + handler.configure(serverProps, "OAUTHBEARER", Collections.singletonList(jaasConfig)); + Assert.fail("Should have failed"); + + } catch (Exception exception) { + Assert.assertEquals("Exception is IllegalArgumentException", IllegalArgumentException.class, exception.getClass()); + Assert.assertTrue("Error message check", exception.getMessage().contains("Can't use both clientId and bearerToken")); + } + + // Check #5 + attrs.put(ServerConfig.OAUTH_CONFIG_ID, "config-id-5"); + attrs.remove(ServerConfig.OAUTH_CLIENT_ID); + attrs.remove(ServerConfig.OAUTH_CLIENT_SECRET); + attrs.remove(ServerConfig.OAUTH_SERVER_BEARER_TOKEN); + + jaasConfig = new AppConfigurationEntry("org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule", AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, attrs); + handler = new JaasServerOauthValidatorCallbackHandler(); + + logReader.readNext(); + handler.configure(serverProps, "OAUTHBEARER", Collections.singletonList(jaasConfig)); + + Common.checkLog(logReader, "JWTSignatureValidator", "", + "clientId", "null", + "clientSecret", "null", + "bearerTokenProvider", "null" + ); + } + + private void testIntrospectValidatorOptions() throws IOException { - // Introspect endpoint - attrs = new HashMap<>(); - attrs.put(ServerConfig.OAUTH_CONFIG_ID, "config-id2"); + LogLineReader logReader = new LogLineReader(Common.LOG_PATH); + logReader.readNext(); + + // Introspect endpoint checks + // Check #1 + // Configure clientId + secret authentication when talking to the introspection / userinfo endpoint + Map attrs = new HashMap<>(); + attrs.put(ServerConfig.OAUTH_CONFIG_ID, "config-id-1_1"); attrs.put(ServerConfig.OAUTH_INTROSPECTION_ENDPOINT_URI, "https://sso/introspect"); attrs.put(ServerConfig.OAUTH_USERINFO_ENDPOINT_URI, "https://sso/userinfo"); attrs.put(ServerConfig.OAUTH_USERNAME_CLAIM, "username-claim"); @@ -122,33 +213,114 @@ private void testAllConfigOptions() throws IOException { attrs.put(ServerConfig.OAUTH_SSL_TRUSTSTORE_TYPE, "pkcs12"); attrs.put(ServerConfig.OAUTH_INCLUDE_ACCEPT_HEADER, "false"); + + AppConfigurationEntry jaasConfig = new AppConfigurationEntry("org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule", AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, attrs); + JaasServerOauthValidatorCallbackHandler handler = new JaasServerOauthValidatorCallbackHandler(); + + Map serverProps = new HashMap<>(); + serverProps.put("security.protocol", "SASL_PLAINTEXT"); + serverProps.put("sasl.mechanism", "OAUTHBEARER"); + + handler.configure(serverProps, "OAUTHBEARER", Collections.singletonList(jaasConfig)); + + Common.checkLog(logReader, "OAuthIntrospectionValidator", "", + "id", "config-id-1_1", + "introspectionEndpointUri", "https://sso/introspect", + "groupsClaimQuery", "\\$\\.groups", + "groupsClaimDelimiter", ",", + "validIssuerUri", "https://sso", + "userInfoUri", "https://sso/userinfo", + "hostnameVerifier", "SSLUtil", + "sslSocketFactory", "SSLSocketFactoryImpl", + "validTokenType", "jwt", + "clientId", "client-id", + "clientSecret", "c\\*\\*", + "bearerTokenProvider", "null", + "audience", "client-id", + "usernameClaim", "username-claim", + "fallbackUsernameClaim", "fallback-username-claim", + "fallbackUsernamePrefix", "username-prefix", + "customClaimCheck", "@\\.aud anyof \\['kafka', 'something'\\]", + "connectTimeoutSeconds", "10", + "readTimeoutSeconds", "10", + "enableMetrics", "true", + "retries", "3", + "retryPauseMillis", "500", + "includeAcceptHeader", "false" + ); + + // Check #2 + // Configure bearer token authentication when talking to the introspection / userinfo endpoint + attrs.put(ServerConfig.OAUTH_CONFIG_ID, "config-id-1_2"); + attrs.remove(ServerConfig.OAUTH_CLIENT_ID); + attrs.remove(ServerConfig.OAUTH_CLIENT_SECRET); + attrs.put(ServerConfig.OAUTH_SERVER_BEARER_TOKEN, "server-bearer-token"); + jaasConfig = new AppConfigurationEntry("org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule", AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, attrs); handler = new JaasServerOauthValidatorCallbackHandler(); - handler.configure(serverProps, "OAUTHBEARER", Arrays.asList(jaasConfig)); + + logReader.readNext(); + try { + handler.configure(serverProps, "OAUTHBEARER", Collections.singletonList(jaasConfig)); + } catch (Exception exception) { + Assert.assertEquals("Exception is ConfigException", ConfigException.class, exception.getClass()); + Assert.assertTrue("Error message check", exception.getMessage().contains("'oauth.client.id' must be set when 'oauth.check.audience' is 'true'")); + } + + // Check #3, relies on check #2 + // Disable audience so that the missing client_id is not a problem + attrs.put(ServerConfig.OAUTH_CONFIG_ID, "config-id-1_3"); + attrs.remove(ServerConfig.OAUTH_CHECK_AUDIENCE); + + jaasConfig = new AppConfigurationEntry("org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule", AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, attrs); + handler = new JaasServerOauthValidatorCallbackHandler(); + + logReader.readNext(); + handler.configure(serverProps, "OAUTHBEARER", Collections.singletonList(jaasConfig)); + + Common.checkLog(logReader, "OAuthIntrospectionValidator", "", + "clientId", "null", + "clientSecret", "null", + "bearerTokenProvider", "token: 's\\*\\*", + "audience", "null" + ); + + // Check #4 + // Enable both client_id + secret and bearer token + attrs.put(ServerConfig.OAUTH_CONFIG_ID, "config-id-1_4"); + attrs.put(ServerConfig.OAUTH_CLIENT_ID, "client-id"); + attrs.put(ServerConfig.OAUTH_CLIENT_SECRET, "client-secret"); + + jaasConfig = new AppConfigurationEntry("org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule", AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, attrs); + handler = new JaasServerOauthValidatorCallbackHandler(); + + logReader.readNext(); + try { + handler.configure(serverProps, "OAUTHBEARER", Collections.singletonList(jaasConfig)); + Assert.fail("Should have failed"); + + } catch (Exception exception) { + Assert.assertEquals("Exception is IllegalArgumentException", IllegalArgumentException.class, exception.getClass()); + Assert.assertTrue("Error message check", exception.getMessage().contains("Can't use both clientId and bearerToken")); + } + + // Check #5 + // Disable authentication when talking to the introspection / userinfo endpoint + attrs.put(ServerConfig.OAUTH_CONFIG_ID, "config-id-1_5"); + attrs.remove(ServerConfig.OAUTH_CLIENT_ID); + attrs.remove(ServerConfig.OAUTH_CLIENT_SECRET); + attrs.remove(ServerConfig.OAUTH_SERVER_BEARER_TOKEN); + + jaasConfig = new AppConfigurationEntry("org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule", AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, attrs); + handler = new JaasServerOauthValidatorCallbackHandler(); + + logReader.readNext(); + handler.configure(serverProps, "OAUTHBEARER", Collections.singletonList(jaasConfig)); Common.checkLog(logReader, "OAuthIntrospectionValidator", "", - "id", "config-id2", - "introspectionEndpointUri", "https://sso/introspect", - "groupsClaimQuery", "\\$\\.groups", - "groupsClaimDelimiter", ",", - "validIssuerUri", "https://sso", - "userInfoUri", "https://sso/userinfo", - "hostnameVerifier", "SSLUtil", - "sslSocketFactory", "SSLSocketFactoryImpl", - "validTokenType", "jwt", - "clientId", "client-id", - "clientSecret", "c\\*\\*", - "audience", "client-id", - "usernameClaim", "username-claim", - "fallbackUsernameClaim", "fallback-username-claim", - "fallbackUsernamePrefix", "username-prefix", - "customClaimCheck", "@\\.aud anyof \\['kafka', 'something'\\]", - "connectTimeoutSeconds", "10", - "readTimeoutSeconds", "10", - "enableMetrics", "true", - "retries", "3", - "retryPauseMillis", "500", - "includeAcceptHeader", "false" + "clientId", "null", + "clientSecret", "null", + "bearerTokenProvider", "null" ); } }