From bd33604afc397409798d2a5246f4506114533169 Mon Sep 17 00:00:00 2001 From: Andrey Pleskach Date: Thu, 25 Mar 2021 14:05:27 +0100 Subject: [PATCH] Add support of OAuth2 authorization --- build.gradle | 9 +- config/checkstyle/suppressions.xml | 3 + .../http/config/AuthorizationType.java | 8 +- .../connect/http/config/HttpSinkConfig.java | 200 +++++++++-- .../http/config/OAuth2AuthorizationMode.java | 33 ++ .../connect/http/config/UrlValidator.java | 55 +++ .../sender/AccessTokenHttpRequestBuilder.java | 95 ++++++ .../http/sender/HttpRequestBuilder.java | 47 +++ .../http/sender/HttpResponseHandler.java | 46 +++ .../kafka/connect/http/sender/HttpSender.java | 96 ++---- .../connect/http/sender/OAuth2HttpSender.java | 100 ++++++ src/main/java/module-info.java | 1 + .../http/config/HttpSinkConfigTest.java | 275 +++++++++++---- .../config/HttpSinkConfigValidationTest.java | 2 +- .../AccessTokenHttpRequestBuilderTest.java | 89 +++++ .../http/sender/OAuth2HttpSenderTest.java | 316 ++++++++++++++++++ 16 files changed, 1217 insertions(+), 158 deletions(-) create mode 100644 src/main/java/io/aiven/kafka/connect/http/config/OAuth2AuthorizationMode.java create mode 100644 src/main/java/io/aiven/kafka/connect/http/config/UrlValidator.java create mode 100644 src/main/java/io/aiven/kafka/connect/http/sender/AccessTokenHttpRequestBuilder.java create mode 100644 src/main/java/io/aiven/kafka/connect/http/sender/HttpRequestBuilder.java create mode 100644 src/main/java/io/aiven/kafka/connect/http/sender/HttpResponseHandler.java create mode 100644 src/main/java/io/aiven/kafka/connect/http/sender/OAuth2HttpSender.java create mode 100644 src/test/java/io/aiven/kafka/connect/http/sender/AccessTokenHttpRequestBuilderTest.java create mode 100644 src/test/java/io/aiven/kafka/connect/http/sender/OAuth2HttpSenderTest.java diff --git a/build.gradle b/build.gradle index 4f279cf..72130ce 100644 --- a/build.gradle +++ b/build.gradle @@ -81,12 +81,17 @@ dependencies { compileOnly "org.apache.kafka:connect-json:$kafkaVersion" implementation "org.slf4j:slf4j-api:1.7.28" + implementation "com.fasterxml.jackson.core:jackson-databind:2.12.2" - testImplementation "org.junit.jupiter:junit-jupiter:5.5.2" testImplementation "org.hamcrest:hamcrest:2.1" testImplementation "org.apache.kafka:connect-api:$kafkaVersion" + + testImplementation "org.junit.jupiter:junit-jupiter:5.7.1" + testImplementation "org.mockito:mockito-core:3.8.0" + testImplementation "org.mockito:mockito-junit-jupiter:3.8.0" + testRuntime "org.apache.logging.log4j:log4j-slf4j-impl:2.12.1" - testRuntime "com.fasterxml.jackson.core:jackson-databind:2.9.10" + testRuntime "com.fasterxml.jackson.core:jackson-databind:2.12.2" testRuntime "org.apache.kafka:connect-json:$kafkaVersion" integrationTestImplementation("org.apache.kafka:connect-runtime:$kafkaVersion") { diff --git a/config/checkstyle/suppressions.xml b/config/checkstyle/suppressions.xml index 9e34f8d..24f85b3 100644 --- a/config/checkstyle/suppressions.xml +++ b/config/checkstyle/suppressions.xml @@ -24,4 +24,7 @@ files="(IntegrationTest|ConnectRunner).java"/> + + + diff --git a/src/main/java/io/aiven/kafka/connect/http/config/AuthorizationType.java b/src/main/java/io/aiven/kafka/connect/http/config/AuthorizationType.java index fb1005c..889b296 100644 --- a/src/main/java/io/aiven/kafka/connect/http/config/AuthorizationType.java +++ b/src/main/java/io/aiven/kafka/connect/http/config/AuthorizationType.java @@ -23,6 +23,7 @@ public enum AuthorizationType { NONE("none"), + OAUTH2("oauth2"), STATIC("static"); public final String name; @@ -36,6 +37,8 @@ public static AuthorizationType forName(final String name) { if (NONE.name.equalsIgnoreCase(name)) { return NONE; + } else if (OAUTH2.name.equalsIgnoreCase(name)) { + return OAUTH2; } else if (STATIC.name.equalsIgnoreCase(name)) { return STATIC; } else { @@ -43,7 +46,6 @@ public static AuthorizationType forName(final String name) { } } - public static Collection names() { - return Arrays.stream(values()).map(v -> v.name).collect(Collectors.toList()); - } + public static final Collection NAMES = + Arrays.stream(values()).map(v -> v.name).collect(Collectors.toList()); } diff --git a/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java b/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java index 1f4725c..96b1707 100644 --- a/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java +++ b/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java @@ -17,6 +17,8 @@ package io.aiven.kafka.connect.http.config; import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; import java.net.URL; import java.util.List; import java.util.Map; @@ -30,10 +32,18 @@ public class HttpSinkConfig extends AbstractConfig { private static final String CONNECTION_GROUP = "Connection"; private static final String HTTP_URL_CONFIG = "http.url"; + private static final String HTTP_AUTHORIZATION_TYPE_CONFIG = "http.authorization.type"; private static final String HTTP_HEADERS_AUTHORIZATION_CONFIG = "http.headers.authorization"; private static final String HTTP_HEADERS_CONTENT_TYPE_CONFIG = "http.headers.content.type"; + private static final String OAUTH2_ACCESS_TOKEN_URL_CONFIG = "oauth2.access.token.url"; + private static final String OAUTH2_CLIENT_ID_CONFIG = "oauth2.client.id"; + private static final String OAUTH2_CLIENT_SECRET_CONFIG = "oauth2.client.secret"; + private static final String OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG = "oauth2.client.authorization.mode"; + private static final String OAUTH2_CLIENT_SCOPE_CONFIG = "oauth2.client.scope"; + private static final String OAUTH2_RESPONSE_TOKEN_PROPERTY_CONFIG = "oauth2.response.token.property"; + private static final String BATCHING_GROUP = "Batching"; private static final String BATCHING_ENABLED_CONFIG = "batching.enabled"; private static final String BATCH_MAX_SIZE_CONFIG = "batch.max.size"; @@ -58,27 +68,7 @@ private static void addConnectionConfigGroup(final ConfigDef configDef) { HTTP_URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, - new ConfigDef.Validator() { - @Override - public void ensureValid(final String name, final Object value) { - if (value == null) { - throw new ConfigException(HTTP_URL_CONFIG, value, "can't be null"); - } - if (!(value instanceof String)) { - throw new ConfigException(HTTP_URL_CONFIG, value, "must be string"); - } - try { - new URL((String) value); - } catch (final MalformedURLException e) { - throw new ConfigException(HTTP_URL_CONFIG, value, "malformed URL"); - } - } - - @Override - public String toString() { - return "HTTP(S) ULRs"; - } - }, + new UrlValidator(), ConfigDef.Importance.HIGH, "The URL to send data to.", CONNECTION_GROUP, @@ -90,7 +80,7 @@ public String toString() { configDef.define( HTTP_AUTHORIZATION_TYPE_CONFIG, ConfigDef.Type.STRING, - AuthorizationType.NONE.name, + ConfigDef.NO_DEFAULT_VALUE, new ConfigDef.Validator() { @Override public void ensureValid(final String name, final Object value) { @@ -99,16 +89,16 @@ public void ensureValid(final String name, final Object value) { } assert value instanceof String; final String valueStr = (String) value; - if (!AuthorizationType.names().contains(valueStr)) { + if (!AuthorizationType.NAMES.contains(valueStr)) { throw new ConfigException( HTTP_AUTHORIZATION_TYPE_CONFIG, valueStr, - "supported values are: " + AuthorizationType.names()); + "supported values are: " + AuthorizationType.NAMES); } } @Override public String toString() { - return AuthorizationType.names().toString(); + return AuthorizationType.NAMES.toString(); } }, ConfigDef.Importance.HIGH, @@ -118,7 +108,7 @@ public String toString() { ConfigDef.Width.SHORT, HTTP_AUTHORIZATION_TYPE_CONFIG, List.of(HTTP_HEADERS_AUTHORIZATION_CONFIG), - FixedSetRecommender.ofSupportedValues(AuthorizationType.names()) + FixedSetRecommender.ofSupportedValues(AuthorizationType.NAMES) ); configDef.define( @@ -156,6 +146,100 @@ public boolean visible(final String name, final Map parsedConfig ConfigDef.Width.MEDIUM, HTTP_HEADERS_CONTENT_TYPE_CONFIG ); + + configDef.define( + OAUTH2_ACCESS_TOKEN_URL_CONFIG, + ConfigDef.Type.STRING, + null, + new UrlValidator(true), + ConfigDef.Importance.HIGH, + "The URL to be used for fetching access token. " + + "Client Credentials is only supported grand type.", + CONNECTION_GROUP, + groupCounter++, + ConfigDef.Width.LONG, + OAUTH2_ACCESS_TOKEN_URL_CONFIG + ); + configDef.define( + OAUTH2_CLIENT_ID_CONFIG, + ConfigDef.Type.STRING, + null, + new ConfigDef.NonEmptyStringWithoutControlChars(), + ConfigDef.Importance.HIGH, + "The client id used for fetching access token.", + CONNECTION_GROUP, + groupCounter++, + ConfigDef.Width.LONG, + OAUTH2_CLIENT_SECRET_CONFIG + ); + configDef.define( + OAUTH2_CLIENT_SECRET_CONFIG, + ConfigDef.Type.PASSWORD, + null, + ConfigDef.Importance.HIGH, + "The secret used for fetching access token.", + CONNECTION_GROUP, + groupCounter++, + ConfigDef.Width.LONG, + OAUTH2_CLIENT_SECRET_CONFIG + ); + configDef.define( + OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG, + ConfigDef.Type.STRING, + OAuth2AuthorizationMode.HEADER.name(), + new ConfigDef.Validator() { + @Override + public void ensureValid(final String name, final Object value) { + if (value == null) { + throw new ConfigException(name, null, "can't be null"); + } + if (!(value instanceof String)) { + throw new ConfigException(name, value, "must be string"); + } + if (!OAuth2AuthorizationMode.OAUTH2_AUTHORIZATION_MODES + .contains(value.toString().toUpperCase())) { + throw new ConfigException( + OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG, value, + "supported values are: " + OAuth2AuthorizationMode.OAUTH2_AUTHORIZATION_MODES); + } + } + }, + ConfigDef.Importance.MEDIUM, + "Specifies how to encode client_id and client_secret in the OAuth2 authorization request. " + + "If set to 'header', the credentials are encoded as an " + + "'Authorization: Basic ' HTTP header. " + + "If set to ‘url’, then client_id and client_secret are sent as URL encoded parameters. " + + "Default is 'header'", + CONNECTION_GROUP, + groupCounter++, + ConfigDef.Width.LONG, + OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG + ); + configDef.define( + OAUTH2_CLIENT_SCOPE_CONFIG, + ConfigDef.Type.STRING, + null, + new ConfigDef.NonEmptyStringWithoutControlChars(), + ConfigDef.Importance.LOW, + "The scope used for fetching access token.", + CONNECTION_GROUP, + groupCounter++, + ConfigDef.Width.LONG, + OAUTH2_CLIENT_SCOPE_CONFIG + ); + configDef.define( + OAUTH2_RESPONSE_TOKEN_PROPERTY_CONFIG, + ConfigDef.Type.STRING, + "access_token", + new ConfigDef.NonEmptyStringWithoutControlChars(), + ConfigDef.Importance.LOW, + "The name of the JSON property containing the access token returned by the OAuth2 provider. " + + "Default value is 'access_token'.", + CONNECTION_GROUP, + groupCounter++, + ConfigDef.Width.LONG, + OAUTH2_RESPONSE_TOKEN_PROPERTY_CONFIG + ); } private static void addBatchingConfigGroup(final ConfigDef configDef) { @@ -230,7 +314,28 @@ private void validate() { + " = " + AuthorizationType.STATIC); } break; - + case OAUTH2: + if (oauth2AccessTokenUri() == null) { + throw new ConfigException( + OAUTH2_ACCESS_TOKEN_URL_CONFIG, getString(OAUTH2_ACCESS_TOKEN_URL_CONFIG), + "Must be present when " + HTTP_HEADERS_CONTENT_TYPE_CONFIG + + " = " + AuthorizationType.OAUTH2); + } + if (oauth2ClientId() == null || oauth2ClientId().isEmpty()) { + throw new ConfigException( + OAUTH2_CLIENT_ID_CONFIG, + getString(OAUTH2_CLIENT_ID_CONFIG), + "Must be present when " + HTTP_HEADERS_CONTENT_TYPE_CONFIG + + " = " + AuthorizationType.OAUTH2); + } + if (oauth2ClientSecret() == null || oauth2ClientSecret().value().isEmpty()) { + throw new ConfigException( + OAUTH2_CLIENT_SECRET_CONFIG, + getPassword(OAUTH2_CLIENT_SECRET_CONFIG), + "Must be present when " + HTTP_HEADERS_CONTENT_TYPE_CONFIG + + " = " + AuthorizationType.OAUTH2); + } + break; case NONE: if (headerAuthorization() != null && !headerAuthorization().isBlank()) { throw new ConfigException( @@ -246,12 +351,8 @@ private void validate() { } } - public final URL httpUrl() { - try { - return new URL(getString(HTTP_URL_CONFIG)); - } catch (final MalformedURLException e) { - throw new ConnectException(e); - } + public final URI httpUri() { + return toURI(HTTP_URL_CONFIG); } public AuthorizationType authorizationType() { @@ -287,6 +388,38 @@ public final String connectorName() { return originalsStrings().get(NAME_CONFIG); } + public final URI oauth2AccessTokenUri() { + return getString(OAUTH2_ACCESS_TOKEN_URL_CONFIG) != null ? toURI(OAUTH2_ACCESS_TOKEN_URL_CONFIG) : null; + } + + private URI toURI(final String propertyName) { + try { + return new URL(getString(propertyName)).toURI(); + } catch (final MalformedURLException | URISyntaxException e) { + throw new ConnectException(e); + } + } + + public final String oauth2ClientId() { + return getString(OAUTH2_CLIENT_ID_CONFIG); + } + + public final Password oauth2ClientSecret() { + return getPassword(OAUTH2_CLIENT_SECRET_CONFIG); + } + + public final OAuth2AuthorizationMode oauth2AuthorizationMode() { + return OAuth2AuthorizationMode.valueOf(getString(OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG).toUpperCase()); + } + + public final String oauth2ClientScope() { + return getString(OAUTH2_CLIENT_SCOPE_CONFIG); + } + + public final String oauth2ResponseTokenProperty() { + return getString(OAUTH2_RESPONSE_TOKEN_PROPERTY_CONFIG); + } + public static void main(final String... args) { System.out.println("========================================="); System.out.println("HTTP Sink connector Configuration Options"); @@ -294,4 +427,5 @@ public static void main(final String... args) { System.out.println(); System.out.println(configDef().toEnrichedRst()); } + } diff --git a/src/main/java/io/aiven/kafka/connect/http/config/OAuth2AuthorizationMode.java b/src/main/java/io/aiven/kafka/connect/http/config/OAuth2AuthorizationMode.java new file mode 100644 index 0000000..d5f0210 --- /dev/null +++ b/src/main/java/io/aiven/kafka/connect/http/config/OAuth2AuthorizationMode.java @@ -0,0 +1,33 @@ +/* + * Copyright 2021 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.http.config; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public enum OAuth2AuthorizationMode { + + HEADER, + URL; + + static final List OAUTH2_AUTHORIZATION_MODES = + Arrays.stream(OAuth2AuthorizationMode.values()) + .map(OAuth2AuthorizationMode::name) + .collect(Collectors.toUnmodifiableList()); + +} diff --git a/src/main/java/io/aiven/kafka/connect/http/config/UrlValidator.java b/src/main/java/io/aiven/kafka/connect/http/config/UrlValidator.java new file mode 100644 index 0000000..63f1127 --- /dev/null +++ b/src/main/java/io/aiven/kafka/connect/http/config/UrlValidator.java @@ -0,0 +1,55 @@ +/* + * Copyright 2019 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.http.config; + +import java.net.MalformedURLException; +import java.net.URL; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; + +class UrlValidator implements ConfigDef.Validator { + + private final boolean skipNullString; + + UrlValidator() { + this(false); + } + + UrlValidator(final boolean skipNullString) { + this.skipNullString = skipNullString; + } + + @Override + public void ensureValid(final String name, final Object value) { + if (skipNullString && value == null) { + return; + } + if (value == null) { + throw new ConfigException(name, null, "can't be null"); + } + if (!(value instanceof String)) { + throw new ConfigException(name, value, "must be string"); + } + try { + new URL((String) value); + } catch (final MalformedURLException e) { + throw new ConfigException(name, value, "malformed URL"); + } + } + +} diff --git a/src/main/java/io/aiven/kafka/connect/http/sender/AccessTokenHttpRequestBuilder.java b/src/main/java/io/aiven/kafka/connect/http/sender/AccessTokenHttpRequestBuilder.java new file mode 100644 index 0000000..14ade2a --- /dev/null +++ b/src/main/java/io/aiven/kafka/connect/http/sender/AccessTokenHttpRequestBuilder.java @@ -0,0 +1,95 @@ +/* + * Copyright 2021 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.http.sender; + +import java.net.URLEncoder; +import java.net.http.HttpRequest; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.Objects; +import java.util.StringJoiner; + +import org.apache.kafka.connect.errors.ConnectException; + +import io.aiven.kafka.connect.http.config.HttpSinkConfig; + +class AccessTokenHttpRequestBuilder implements HttpRequestBuilder { + + AccessTokenHttpRequestBuilder() { + } + + @Override + public HttpRequest.Builder build(final HttpSinkConfig config) { + Objects.requireNonNull(config, "config"); + Objects.requireNonNull(config.oauth2AccessTokenUri(), "oauth2AccessTokenUri"); + final var accessTokenRequestBuilder = HttpRequest + .newBuilder(config.oauth2AccessTokenUri()) + .timeout(REQUEST_HTTP_TIMEOUT) + .header(HEADER_CONTENT_TYPE, "application/x-www-form-urlencoded"); + + final var accessTokenRequestBodyBuilder = new StringJoiner("&"); + accessTokenRequestBodyBuilder.add(encodeNameAndValue("grant_type", "client_credentials")); + if (config.oauth2ClientScope() != null) { + accessTokenRequestBodyBuilder.add(encodeNameAndValue("scope", config.oauth2ClientScope())); + } + + setClientIdAndSecret(accessTokenRequestBuilder, accessTokenRequestBodyBuilder, config); + return accessTokenRequestBuilder + .POST(HttpRequest.BodyPublishers.ofString(accessTokenRequestBodyBuilder.toString())); + } + + private void setClientIdAndSecret(final HttpRequest.Builder httpRequestBuilder, + final StringJoiner requestBodyBuilder, + final HttpSinkConfig config) { + switch (config.oauth2AuthorizationMode()) { + case HEADER: + addClientIdAndSecretInRequestHeader(httpRequestBuilder, config); + break; + case URL: + addClientIdAndSecretInRequestBody(requestBodyBuilder, config); + break; + default: + throw new ConnectException("Unknown OAuth2 authorization mode: " + config.oauth2AuthorizationMode()); + } + } + + private void addClientIdAndSecretInRequestHeader(final HttpRequest.Builder httpRequestBuilder, + final HttpSinkConfig config) { + final var clientAndSecretBytes = + (config.oauth2ClientId() + ":" + config.oauth2ClientSecret().value()) + .getBytes(StandardCharsets.UTF_8); + final var clientAndSecretAuthHeader = + "Basic " + Base64.getEncoder().encodeToString(clientAndSecretBytes); + httpRequestBuilder.header(HEADER_AUTHORIZATION, clientAndSecretAuthHeader); + } + + private void addClientIdAndSecretInRequestBody(final StringJoiner requestBodyBuilder, + final HttpSinkConfig config) { + requestBodyBuilder + .add(encodeNameAndValue("client_id", config.oauth2ClientId())) + .add(encodeNameAndValue("client_secret", config.oauth2ClientSecret().value())); + } + + private String encodeNameAndValue(final String name, final String value) { + return String.format("%s=%s", encode(name), encode(value)); + } + + private String encode(final String value) { + return URLEncoder.encode(value, StandardCharsets.UTF_8); + } + +} diff --git a/src/main/java/io/aiven/kafka/connect/http/sender/HttpRequestBuilder.java b/src/main/java/io/aiven/kafka/connect/http/sender/HttpRequestBuilder.java new file mode 100644 index 0000000..74de84b --- /dev/null +++ b/src/main/java/io/aiven/kafka/connect/http/sender/HttpRequestBuilder.java @@ -0,0 +1,47 @@ +/* + * Copyright 2021 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.http.sender; + +import java.net.http.HttpRequest; +import java.time.Duration; + +import io.aiven.kafka.connect.http.config.HttpSinkConfig; + +interface HttpRequestBuilder { + + String HEADER_AUTHORIZATION = "Authorization"; + + String HEADER_CONTENT_TYPE = "Content-Type"; + + Duration REQUEST_HTTP_TIMEOUT = Duration.ofSeconds(30); + + HttpRequest.Builder build(final HttpSinkConfig config); + + HttpRequestBuilder DEFAULT_HTTP_REQUEST_BUILDER = config -> + HttpRequest.newBuilder(config.httpUri()).timeout(REQUEST_HTTP_TIMEOUT); + + HttpRequestBuilder AUTH_HTTP_REQUEST_BUILDER = config -> { + final var requestBuilder = + DEFAULT_HTTP_REQUEST_BUILDER.build(config) + .header(HEADER_AUTHORIZATION, config.headerAuthorization()); + if (config.headerContentType() != null) { + requestBuilder.header(HEADER_CONTENT_TYPE, config.headerContentType()); + } + return requestBuilder; + }; + +} diff --git a/src/main/java/io/aiven/kafka/connect/http/sender/HttpResponseHandler.java b/src/main/java/io/aiven/kafka/connect/http/sender/HttpResponseHandler.java new file mode 100644 index 0000000..90d5579 --- /dev/null +++ b/src/main/java/io/aiven/kafka/connect/http/sender/HttpResponseHandler.java @@ -0,0 +1,46 @@ +/* + * Copyright 2021 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.http.sender; + +import java.io.IOException; +import java.net.http.HttpResponse; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +interface HttpResponseHandler { + + Logger LOGGER = LoggerFactory.getLogger(HttpResponseHandler.class); + + void onResponse(final HttpResponse response) throws IOException; + + HttpResponseHandler ON_HTTP_ERROR_RESPONSE_HANDLER = response -> { + if (response.statusCode() >= 400) { + if (response.statusCode() < 200 || response.statusCode() > 299) { + final var request = response.request(); + LOGGER.warn( + "Got unexpected HTTP status code: {} and body: {}. Requested URI: {}", + response.statusCode(), + response.body(), + request != null ? request.uri() : "UNKNOWN"); + } + throw new IOException("Server replied with status code " + response.statusCode() + + " and body " + response.body()); + } + }; + +} diff --git a/src/main/java/io/aiven/kafka/connect/http/sender/HttpSender.java b/src/main/java/io/aiven/kafka/connect/http/sender/HttpSender.java index bd9396a..529678e 100644 --- a/src/main/java/io/aiven/kafka/connect/http/sender/HttpSender.java +++ b/src/main/java/io/aiven/kafka/connect/http/sender/HttpSender.java @@ -17,11 +17,9 @@ package io.aiven.kafka.connect.http.sender; import java.io.IOException; -import java.net.URISyntaxException; import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; -import java.time.Duration; import java.util.concurrent.TimeUnit; import org.apache.kafka.connect.errors.ConnectException; @@ -31,53 +29,44 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class HttpSender { +public class HttpSender { private static final Logger log = LoggerFactory.getLogger(HttpSender.class); - protected static final String HEADER_AUTHORIZATION = "Authorization"; - - protected static final String HEADER_CONTENT_TYPE = "Content-Type"; - - private static final Duration HTTP_TIMEOUT = Duration.ofSeconds(30); - - private final HttpClient httpClient; - - protected final HttpRequest.Builder requestTemplate; + protected final HttpClient httpClient; protected final HttpSinkConfig config; - protected interface OnResponseHandler { + private final HttpRequestBuilder httpRequestBuilder; - void onResponse(final HttpResponse response) throws IOException; + protected HttpSender(final HttpSinkConfig config) { + this(config, HttpRequestBuilder.DEFAULT_HTTP_REQUEST_BUILDER, HttpClient.newHttpClient()); + } + protected HttpSender(final HttpSinkConfig config, + final HttpRequestBuilder httpRequestBuilder) { + this(config, httpRequestBuilder, HttpClient.newHttpClient()); } - protected final OnResponseHandler onHttpErrorResponseHandler = response -> { - if (response.statusCode() >= 400) { - if (response.statusCode() < 200 || response.statusCode() > 299) { - log.warn("Got unexpected HTTP status code: {}", response.statusCode()); - } - throw new IOException("Server replied with status code " + response.statusCode() - + " and body " + response.body()); - } - }; + protected HttpSender(final HttpSinkConfig config, + final HttpClient httpClient) { + this(config, HttpRequestBuilder.DEFAULT_HTTP_REQUEST_BUILDER, httpClient); + } - protected HttpSender(final HttpSinkConfig config) { + protected HttpSender(final HttpSinkConfig config, + final HttpRequestBuilder httpRequestBuilder, + final HttpClient httpClient) { this.config = config; - this.httpClient = HttpClient.newHttpClient(); - try { - requestTemplate = HttpRequest.newBuilder(config.httpUrl().toURI()).timeout(HTTP_TIMEOUT); - } catch (final URISyntaxException e) { - throw new ConnectException(e); - } + this.httpRequestBuilder = httpRequestBuilder; + this.httpClient = httpClient; } public final void send(final String body) { - final HttpRequest request = requestTemplate.copy() - .POST(HttpRequest.BodyPublishers.ofString(body)) - .build(); - sendWithRetries(request, onHttpErrorResponseHandler); + final var requestBuilder = + httpRequestBuilder + .build(config) + .POST(HttpRequest.BodyPublishers.ofString(body)); + sendWithRetries(requestBuilder, HttpResponseHandler.ON_HTTP_ERROR_RESPONSE_HANDLER); } /** @@ -85,17 +74,17 @@ public final void send(final String body) { * * @return whether the sending was successful. */ - protected void sendWithRetries(final HttpRequest request, - final OnResponseHandler onHttpResponseHandler) { + protected HttpResponse sendWithRetries(final HttpRequest.Builder requestBuilder, + final HttpResponseHandler httpResponseHandler) { int remainRetries = config.maxRetries(); while (remainRetries >= 0) { try { try { final var response = - httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + httpClient.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofString()); log.debug("Server replied with status code {} and body {}", response.statusCode(), response.body()); - onHttpResponseHandler.onResponse(response); - return; + httpResponseHandler.onResponse(response); + return response; } catch (final IOException e) { log.info("Sending failed, will retry in {} ms ({} retries remain)", config.retryBackoffMs(), remainRetries, e); @@ -103,7 +92,7 @@ protected void sendWithRetries(final HttpRequest request, TimeUnit.MILLISECONDS.sleep(config.retryBackoffMs()); } } catch (final InterruptedException e) { - log.error("Sending failed due to InterruptedException, stopping"); + log.error("Sending failed due to InterruptedException, stopping", e); throw new ConnectException(e); } } @@ -113,32 +102,15 @@ protected void sendWithRetries(final HttpRequest request, public static HttpSender createHttpSender(final HttpSinkConfig config) { switch (config.authorizationType()) { - case STATIC: - return new StaticAuthHttpSender(config); case NONE: - return new NoAuthHttpSender(config); + return new HttpSender(config); + case STATIC: + return new HttpSender(config, HttpRequestBuilder.AUTH_HTTP_REQUEST_BUILDER); + case OAUTH2: + return new OAuth2HttpSender(config); default: throw new ConnectException("Can't create HTTP sender for auth type: " + config.authorizationType()); } } - private static final class NoAuthHttpSender extends HttpSender { - - private NoAuthHttpSender(final HttpSinkConfig config) { - super(config); - } - - } - - private static final class StaticAuthHttpSender extends HttpSender { - - private StaticAuthHttpSender(final HttpSinkConfig config) { - super(config); - requestTemplate.header(HEADER_AUTHORIZATION, config.headerAuthorization()); - if (config.headerContentType() != null) { - requestTemplate.header(HEADER_CONTENT_TYPE, config.headerContentType()); - } - } - } - } diff --git a/src/main/java/io/aiven/kafka/connect/http/sender/OAuth2HttpSender.java b/src/main/java/io/aiven/kafka/connect/http/sender/OAuth2HttpSender.java new file mode 100644 index 0000000..3572699 --- /dev/null +++ b/src/main/java/io/aiven/kafka/connect/http/sender/OAuth2HttpSender.java @@ -0,0 +1,100 @@ +/* + * Copyright 2021 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.http.sender; + +import java.io.IOException; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.Map; + +import org.apache.kafka.connect.errors.ConnectException; + +import io.aiven.kafka.connect.http.config.HttpSinkConfig; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class OAuth2HttpSender extends HttpSender { + + private final Logger logger = LoggerFactory.getLogger(OAuth2HttpSender.class); + + private String accessTokenAuthHeader; + + private final ObjectMapper objectMapper = new ObjectMapper(); + + OAuth2HttpSender(final HttpSinkConfig config) { + super(config); + } + + //for testing + OAuth2HttpSender(final HttpSinkConfig config, final HttpClient httpClient) { + super(config, httpClient); + } + + @Override + protected HttpResponse sendWithRetries(final HttpRequest.Builder requestBuilder, + final HttpResponseHandler originHttpResponseHandler) { + final var accessTokenAwareRequestBuilder = loadOrRefreshAccessToken(requestBuilder); + final HttpResponseHandler handler = response -> { + if (response.statusCode() == 401) { // access denied or refresh of a token is needed + this.accessTokenAuthHeader = null; + this.sendWithRetries(requestBuilder, originHttpResponseHandler); + } else { + originHttpResponseHandler.onResponse(response); + } + }; + return super.sendWithRetries(accessTokenAwareRequestBuilder, handler); + } + + private HttpRequest.Builder loadOrRefreshAccessToken(final HttpRequest.Builder requestBuilder) { + if (accessTokenAuthHeader == null) { + logger.info("Configure OAuth2 for URI: {} and Client ID: {}", + config.oauth2AccessTokenUri(), config.oauth2ClientId()); + try { + final var response = + super.sendWithRetries( + new AccessTokenHttpRequestBuilder().build(config), + HttpResponseHandler.ON_HTTP_ERROR_RESPONSE_HANDLER + ); + accessTokenAuthHeader = buildAccessTokenAuthHeader(response.body()); + } catch (final IOException e) { + throw new ConnectException("Couldn't get OAuth2 access token", e); + } + } + return requestBuilder.header(HttpRequestBuilder.HEADER_AUTHORIZATION, accessTokenAuthHeader); + } + + private String buildAccessTokenAuthHeader(final String responseBody) throws JsonProcessingException { + final var accessTokenResponse = + objectMapper.readValue(responseBody, new TypeReference>() {}); + if (!accessTokenResponse.containsKey(config.oauth2ResponseTokenProperty())) { + throw new ConnectException( + "Couldn't find access token property " + + config.oauth2ResponseTokenProperty() + + " in response properties: " + + accessTokenResponse.keySet()); + } + final var tokenType = accessTokenResponse.getOrDefault("token_type", "Bearer"); + final var accessToken = accessTokenResponse.get(config.oauth2ResponseTokenProperty()); + return String.format("%s %s", tokenType, accessToken); + } + +} diff --git a/src/main/java/module-info.java b/src/main/java/module-info.java index b48359a..7cbba59 100644 --- a/src/main/java/module-info.java +++ b/src/main/java/module-info.java @@ -3,4 +3,5 @@ requires java.net.http; requires static kafka.clients; requires static connect.api; + requires com.fasterxml.jackson.databind; } diff --git a/src/test/java/io/aiven/kafka/connect/http/config/HttpSinkConfigTest.java b/src/test/java/io/aiven/kafka/connect/http/config/HttpSinkConfigTest.java index 7f44078..511f1f8 100644 --- a/src/test/java/io/aiven/kafka/connect/http/config/HttpSinkConfigTest.java +++ b/src/test/java/io/aiven/kafka/connect/http/config/HttpSinkConfigTest.java @@ -17,6 +17,7 @@ package io.aiven.kafka.connect.http.config; import java.net.MalformedURLException; +import java.net.URISyntaxException; import java.net.URL; import java.util.HashMap; import java.util.Map; @@ -34,48 +35,202 @@ import static org.junit.jupiter.api.Assertions.assertTrue; final class HttpSinkConfigTest { + @Test void requiredConfigurations() { final Map properties = Map.of(); final Throwable t = assertThrows( - ConfigException.class, - () -> new HttpSinkConfig(properties)); + ConfigException.class, () -> new HttpSinkConfig(properties)); assertEquals("Missing required configuration \"http.url\" which has no default value.", t.getMessage()); } @Test - void correctMinimalConfig() throws MalformedURLException { + void correctMinimalConfig() throws MalformedURLException, URISyntaxException { final Map properties = Map.of( - "http.url", "http://localhost:8090", - "http.authorization.type", "none" + "http.url", "http://localhost:8090", + "http.authorization.type", "none" ); final HttpSinkConfig config = new HttpSinkConfig(properties); - assertEquals(new URL("http://localhost:8090"), config.httpUrl()); + assertEquals(new URL("http://localhost:8090").toURI(), config.httpUri()); assertEquals(AuthorizationType.NONE, config.authorizationType()); assertNull(config.headerContentType()); assertFalse(config.batchingEnabled()); assertEquals(500, config.batchMaxSize()); assertEquals(1, config.maxRetries()); assertEquals(3000, config.retryBackoffMs()); + assertNull(config.oauth2AccessTokenUri()); + assertNull(config.oauth2ClientId()); + assertNull(config.oauth2ClientSecret()); + assertNull(config.oauth2ClientScope()); + assertEquals(OAuth2AuthorizationMode.HEADER, config.oauth2AuthorizationMode()); + assertEquals("access_token", config.oauth2ResponseTokenProperty()); + } + + @Test + void invalidOAuth2AccessTokenUrl() { + final var emptyAccessTokenUrlConfig = Map.of( + "http.url", "http://localhost:8090", + "http.authorization.type", "none", + "oauth2.access.token.url", "" + ); + + final var emptyStringT = + assertThrows(ConfigException.class, () -> new HttpSinkConfig(emptyAccessTokenUrlConfig)); + assertEquals( + "Invalid value for configuration oauth2.access.token.url: malformed URL", + emptyStringT.getMessage() + ); + + final var wrongAccessTokenUrlConfig = Map.of( + "http.url", "http://localhost:8090", + "http.authorization.type", "none", + "oauth2.access.token.url", ";http://localhost:8090" + ); + + final var wrongUrlT = + assertThrows(ConfigException.class, () -> new HttpSinkConfig(wrongAccessTokenUrlConfig)); + assertEquals( + "Invalid value ;http://localhost:8090 for configuration " + + "oauth2.access.token.url: malformed URL", + wrongUrlT.getMessage() + ); + } + + @Test + void invalidOAuth2ClientAuthorizationMode() { + final var invalidClientAuthorizationModeConfig = Map.of( + "http.url", "http://localhost:8090", + "http.authorization.type", "oauth2", + "oauth2.access.token.url", "http://localhost:8090/token", + "oauth2.client.id", "client_id", + "oauth2.client.secret", "client_secret", + "oauth2.client.authorization.mode", "AAAABBBCCCC" + ); + + final var t = + assertThrows(ConfigException.class, () -> new HttpSinkConfig(invalidClientAuthorizationModeConfig)); + assertEquals( + "Invalid value AAAABBBCCCC for configuration oauth2.client.authorization.mode: " + + "supported values are: [HEADER, URL]", + t.getMessage() + ); + } + + @Test + void invalidOAuth2Configuration() { + final var noAccessTokenUrlConfig = Map.of( + "http.url", "http://localhost:8090", + "http.authorization.type", "oauth2" + ); + + final var noAccessTokenUrlE = + assertThrows(ConfigException.class, () -> new HttpSinkConfig(noAccessTokenUrlConfig)); + assertEquals( + "Invalid value null for configuration oauth2.access.token.url: " + + "Must be present when http.headers.content.type = OAUTH2", + noAccessTokenUrlE.getMessage()); + + final var noSecretIdConfig = Map.of( + "http.url", "http://localhost:8090", + "http.authorization.type", "oauth2", + "oauth2.access.token.url", "http://localhost:8090/token" + ); + + final var noSecretIdConfigE = + assertThrows(ConfigException.class, () -> new HttpSinkConfig(noSecretIdConfig)); + assertEquals( + "Invalid value null for configuration oauth2.client.id: " + + "Must be present when http.headers.content.type = OAUTH2", + noSecretIdConfigE.getMessage()); + + final var noSecretConfig = Map.of( + "http.url", "http://localhost:8090", + "http.authorization.type", "oauth2", + "oauth2.access.token.url", "http://localhost:8090/token", + "oauth2.client.id", "client_id" + ); + + final var noSecretConfigE = + assertThrows(ConfigException.class, () -> new HttpSinkConfig(noSecretConfig)); + assertEquals( + "Invalid value null for configuration oauth2.client.secret: " + + "Must be present when http.headers.content.type = OAUTH2", + noSecretConfigE.getMessage()); + + } + + @Test + void validOAuth2MinimalConfiguration() throws MalformedURLException, URISyntaxException { + + final var oauth2Config = Map.of( + "http.url", "http://localhost:8090", + "http.authorization.type", "oauth2", + "oauth2.access.token.url", "http://localhost:8090/token", + "oauth2.client.id", "client_id", + "oauth2.client.secret", "client_secret" + ); + + final var config = new HttpSinkConfig(oauth2Config); + + assertEquals(new URL("http://localhost:8090/token").toURI(), config.oauth2AccessTokenUri()); + assertEquals("client_id", config.oauth2ClientId()); + assertEquals("client_secret", config.oauth2ClientSecret().value()); + assertEquals(OAuth2AuthorizationMode.HEADER, config.oauth2AuthorizationMode()); + assertEquals("access_token", config.oauth2ResponseTokenProperty()); + assertNull(config.oauth2ClientScope()); + } + + @Test + void validOAuth2FullConfiguration() throws MalformedURLException, URISyntaxException { + + final var oauth2Config = Map.of( + "http.url", "http://localhost:8090", + "http.authorization.type", "oauth2", + "oauth2.access.token.url", "http://localhost:8090/token", + "oauth2.client.id", "client_id", + "oauth2.client.secret", "client_secret", + "oauth2.client.authorization.mode", "url", + "oauth2.client.scope", "scope1,scope2", + "oauth2.response.token.property", "moooooo" + ); + + final var config = new HttpSinkConfig(oauth2Config); + + assertEquals(new URL("http://localhost:8090/token").toURI(), config.oauth2AccessTokenUri()); + assertEquals("client_id", config.oauth2ClientId()); + assertEquals("client_secret", config.oauth2ClientSecret().value()); + assertEquals(OAuth2AuthorizationMode.URL, config.oauth2AuthorizationMode()); + assertEquals("moooooo", config.oauth2ResponseTokenProperty()); + assertEquals("scope1,scope2", config.oauth2ClientScope()); } @Test void invalidUrl() { final Map properties = Map.of( - "http.url", "#http://localhost:8090", - "http.authorization.type", "none" + "http.url", "#http://localhost:8090", + "http.authorization.type", "none" ); final Throwable t = assertThrows( - ConfigException.class, - () -> new HttpSinkConfig(properties)); + ConfigException.class, () -> new HttpSinkConfig(properties)); assertEquals("Invalid value #http://localhost:8090 for configuration http.url: malformed URL", - t.getMessage()); + t.getMessage()); + } + + @Test + void missingAuthorizationType() { + final Map properties = Map.of( + "http.url", "http://localhost:8090" + ); + + final Throwable t = assertThrows(ConfigException.class, () -> new HttpSinkConfig(properties)); + assertEquals("Missing required configuration \"http.authorization.type\" which has no default value.", + t.getMessage()); } @ParameterizedTest - @ValueSource(strings = {"none", "static"}) + @ValueSource(strings = {"none", "static", "oauth2"}) void supportedAuthorizationType(final String authorization) { Map properties = Map.of( "http.url", "http://localhost:8090", @@ -89,6 +244,12 @@ void supportedAuthorizationType(final String authorization) { expectedAuthorizationType = AuthorizationType.STATIC; properties = new HashMap<>(properties); properties.put("http.headers.authorization", "some"); + } else if ("oauth2".equals(authorization)) { + expectedAuthorizationType = AuthorizationType.OAUTH2; + properties = new HashMap<>(properties); + properties.put("oauth2.access.token.url", "http://localhost:42"); + properties.put("oauth2.client.id", "client_id"); + properties.put("oauth2.client.secret", "client_secret"); } else { throw new RuntimeException("Shouldn't be here"); } @@ -101,55 +262,55 @@ void supportedAuthorizationType(final String authorization) { @Test void unsupportedAuthorizationType() { final Map properties = Map.of( - "http.url", "http://localhost:8090", - "http.authorization.type", "unsupported" + "http.url", "http://localhost:8090", + "http.authorization.type", "unsupported" ); final Throwable t = assertThrows( - ConfigException.class, () -> new HttpSinkConfig(properties) + ConfigException.class, () -> new HttpSinkConfig(properties) ); assertEquals("Invalid value unsupported for configuration http.authorization.type: " - + "supported values are: [none, static]", - t.getMessage()); + + "supported values are: [none, oauth2, static]", + t.getMessage()); } @Test void missingAuthorizationHeaderWhenAuthorizationTypeStatic() { final Map properties = Map.of( - "http.url", "http://localhost:8090", - "http.authorization.type", "static" + "http.url", "http://localhost:8090", + "http.authorization.type", "static" ); final Throwable t = assertThrows( - ConfigException.class, () -> new HttpSinkConfig(properties) + ConfigException.class, () -> new HttpSinkConfig(properties) ); assertEquals("Invalid value null for configuration http.headers.authorization: " - + "Must be present when http.headers.content.type = STATIC", - t.getMessage()); + + "Must be present when http.headers.content.type = STATIC", + t.getMessage()); } @Test void presentAuthorizationHeaderWhenAuthorizationTypeNotStatic() { final Map properties = Map.of( - "http.url", "http://localhost:8090", - "http.authorization.type", "none", - "http.headers.authorization", "some" + "http.url", "http://localhost:8090", + "http.authorization.type", "none", + "http.headers.authorization", "some" ); final Throwable t = assertThrows( - ConfigException.class, () -> new HttpSinkConfig(properties) + ConfigException.class, () -> new HttpSinkConfig(properties) ); assertEquals("Invalid value [hidden] for configuration http.headers.authorization: " - + "Must not be present when http.headers.content.type != STATIC", - t.getMessage()); + + "Must not be present when http.headers.content.type != STATIC", + t.getMessage()); } @Test void headerContentType() { final Map properties = Map.of( - "http.url", "http://localhost:8090", - "http.authorization.type", "none", - "http.headers.content.type", "application/json" + "http.url", "http://localhost:8090", + "http.authorization.type", "none", + "http.headers.content.type", "application/json" ); final HttpSinkConfig config = new HttpSinkConfig(properties); @@ -159,10 +320,10 @@ void headerContentType() { @Test void correctBatching() { final Map properties = Map.of( - "http.url", "http://localhost:8090", - "http.authorization.type", "none", - "batching.enabled", "true", - "batch.max.size", "123456" + "http.url", "http://localhost:8090", + "http.authorization.type", "none", + "batching.enabled", "true", + "batch.max.size", "123456" ); final HttpSinkConfig config = new HttpSinkConfig(properties); @@ -173,40 +334,40 @@ void correctBatching() { @Test void tooBigBatchSize() { final Map properties = Map.of( - "http.url", "http://localhost:8090", - "http.authorization.type", "none", - "batching.enabled", "true", - "batch.max.size", "1000001" + "http.url", "http://localhost:8090", + "http.authorization.type", "none", + "batching.enabled", "true", + "batch.max.size", "1000001" ); final Throwable t = assertThrows( - ConfigException.class, () -> new HttpSinkConfig(properties) + ConfigException.class, () -> new HttpSinkConfig(properties) ); assertEquals("Invalid value 1000001 for configuration batch.max.size: Value must be no more than 1000000", - t.getMessage()); + t.getMessage()); } @Test void negativeMaxRetries() { final Map properties = Map.of( - "http.url", "http://localhost:8090", - "http.authorization.type", "none", - "max.retries", "-1" + "http.url", "http://localhost:8090", + "http.authorization.type", "none", + "max.retries", "-1" ); final Throwable t = assertThrows( - ConfigException.class, () -> new HttpSinkConfig(properties) + ConfigException.class, () -> new HttpSinkConfig(properties) ); assertEquals("Invalid value -1 for configuration max.retries: Value must be at least 0", - t.getMessage()); + t.getMessage()); } @Test void correctMaxRetries() { final Map properties = Map.of( - "http.url", "http://localhost:8090", - "http.authorization.type", "none", - "max.retries", "123" + "http.url", "http://localhost:8090", + "http.authorization.type", "none", + "max.retries", "123" ); final HttpSinkConfig config = new HttpSinkConfig(properties); @@ -216,24 +377,24 @@ void correctMaxRetries() { @Test void negativeRetryBackoffMs() { final Map properties = Map.of( - "http.url", "http://localhost:8090", - "http.authorization.type", "none", - "retry.backoff.ms", "-1" + "http.url", "http://localhost:8090", + "http.authorization.type", "none", + "retry.backoff.ms", "-1" ); final Throwable t = assertThrows( - ConfigException.class, () -> new HttpSinkConfig(properties) + ConfigException.class, () -> new HttpSinkConfig(properties) ); assertEquals("Invalid value -1 for configuration retry.backoff.ms: Value must be at least 0", - t.getMessage()); + t.getMessage()); } @Test void correctRetryBackoffMs() { final Map properties = Map.of( - "http.url", "http://localhost:8090", - "http.authorization.type", "none", - "retry.backoff.ms", "12345" + "http.url", "http://localhost:8090", + "http.authorization.type", "none", + "retry.backoff.ms", "12345" ); final HttpSinkConfig config = new HttpSinkConfig(properties); diff --git a/src/test/java/io/aiven/kafka/connect/http/config/HttpSinkConfigValidationTest.java b/src/test/java/io/aiven/kafka/connect/http/config/HttpSinkConfigValidationTest.java index b7f4c2b..c42175b 100644 --- a/src/test/java/io/aiven/kafka/connect/http/config/HttpSinkConfigValidationTest.java +++ b/src/test/java/io/aiven/kafka/connect/http/config/HttpSinkConfigValidationTest.java @@ -33,7 +33,7 @@ void recommendedValuesForAuthorization() { .filter(x -> x.name().equals("http.authorization.type")) .findFirst().get(); assertIterableEquals( - AuthorizationType.names(), + AuthorizationType.NAMES, v.recommendedValues() ); } diff --git a/src/test/java/io/aiven/kafka/connect/http/sender/AccessTokenHttpRequestBuilderTest.java b/src/test/java/io/aiven/kafka/connect/http/sender/AccessTokenHttpRequestBuilderTest.java new file mode 100644 index 0000000..cf98e04 --- /dev/null +++ b/src/test/java/io/aiven/kafka/connect/http/sender/AccessTokenHttpRequestBuilderTest.java @@ -0,0 +1,89 @@ +/* + * Copyright 2021 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.http.sender; + +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.Map; +import java.util.Optional; + +import io.aiven.kafka.connect.http.config.HttpSinkConfig; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class AccessTokenHttpRequestBuilderTest { + + @Test + void shouldBuildDefaultAccessTokenRequest() throws Exception { + final var config = Map.of( + "http.url", "http://localhost:42", + "http.authorization.type", "oauth2", + "oauth2.access.token.url", "http://localhost:42/token", + "oauth2.client.id", "some_client_id", + "oauth2.client.secret", "some_client_secret" + ); + final var accessTokenRequest = + new AccessTokenHttpRequestBuilder().build(new HttpSinkConfig(config)).build(); + + assertEquals(new URL("http://localhost:42/token").toURI(), accessTokenRequest.uri()); + + final var expectedAuthHeader = "Basic " + + Base64.getEncoder() + .encodeToString("some_client_id:some_client_secret".getBytes(StandardCharsets.UTF_8)); + assertEquals( + Optional.of(AccessTokenHttpRequestBuilder.REQUEST_HTTP_TIMEOUT), + accessTokenRequest.timeout()); + assertEquals("POST", + accessTokenRequest.method()); + assertEquals(Optional.of("application/x-www-form-urlencoded"), + accessTokenRequest.headers().firstValue(HttpRequestBuilder.HEADER_CONTENT_TYPE)); + assertEquals(Optional.of(expectedAuthHeader), + accessTokenRequest.headers().firstValue(HttpRequestBuilder.HEADER_AUTHORIZATION)); + + } + + @Test + void shouldBuildCustomisedAccessTokenRequest() throws Exception { + final var config = Map.of( + "http.url", "http://localhost:42", + "http.authorization.type", "oauth2", + "oauth2.access.token.url", "http://localhost:42/token", + "oauth2.client.id", "some_client_id", + "oauth2.client.secret", "some_client_secret", + "oauth2.client.authorization.mode", "url", + "oauth2.client.scope", "scope1,scope2" + ); + final var accessTokenRequest = + new AccessTokenHttpRequestBuilder().build(new HttpSinkConfig(config)).build(); + + assertEquals(new URL("http://localhost:42/token").toURI(), accessTokenRequest.uri()); + + assertEquals( + Optional.of(AccessTokenHttpRequestBuilder.REQUEST_HTTP_TIMEOUT), + accessTokenRequest.timeout()); + assertEquals("POST", + accessTokenRequest.method()); + assertEquals(Optional.of("application/x-www-form-urlencoded"), + accessTokenRequest.headers().firstValue(HttpRequestBuilder.HEADER_CONTENT_TYPE)); + assertTrue(accessTokenRequest.headers().firstValue(HttpRequestBuilder.HEADER_AUTHORIZATION).isEmpty()); + } + +} diff --git a/src/test/java/io/aiven/kafka/connect/http/sender/OAuth2HttpSenderTest.java b/src/test/java/io/aiven/kafka/connect/http/sender/OAuth2HttpSenderTest.java new file mode 100644 index 0000000..4461207 --- /dev/null +++ b/src/test/java/io/aiven/kafka/connect/http/sender/OAuth2HttpSenderTest.java @@ -0,0 +1,316 @@ +/* + * Copyright 2021 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.http.sender; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import org.apache.kafka.connect.errors.ConnectException; + +import io.aiven.kafka.connect.http.config.HttpSinkConfig; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; +import org.mockito.stubbing.Answer; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.STRICT_STUBS) +class OAuth2HttpSenderTest { + + @Mock + HttpClient mockedHttpClient; + + final ObjectMapper objectMapper = new ObjectMapper(); + + @Test + void buildAccessTokenAuthHeaderForDefaultSettings(@Mock final HttpResponse accessTokenResponse) + throws IOException, InterruptedException { + final var config = defaultConfig(); + + final var httpSend = + new OAuth2HttpSender( + new HttpSinkConfig(config), + mockedHttpClient + ); + + final var requestCaptor = ArgumentCaptor.forClass(HttpRequest.class); + + final var accessTokenJson = Map.of( + "access_token", "bla-bla-bla" + ); + + when(accessTokenResponse.statusCode()).thenReturn(200); + when(accessTokenResponse.body()).thenReturn(objectMapper.writeValueAsString(accessTokenJson)); + when(mockedHttpClient.send(requestCaptor.capture(), any())).thenReturn(accessTokenResponse); + + httpSend.send("SOME_BODY"); + + final var r = requestCaptor.getAllValues().get(1); + assertEquals( + Optional.of("Bearer bla-bla-bla"), + r.headers().firstValue(HttpRequestBuilder.HEADER_AUTHORIZATION) + ); + } + + @Test + void buildAccessTokenAuthHeaderFromCustomSettings(@Mock final HttpResponse accessTokenResponse) + throws IOException, InterruptedException { + final var config = new HashMap<>(defaultConfig()); + config.put("oauth2.client.authorization.mode", "url"); + config.put("oauth2.client.scope", "a,b,c"); + config.put("oauth2.response.token.property", "some_token"); + + final var httpSend = + new OAuth2HttpSender( + new HttpSinkConfig(config), + mockedHttpClient + ); + + final var requestCaptor = ArgumentCaptor.forClass(HttpRequest.class); + + final var accessTokenJson = Map.of( + "some_token", "bla-bla-bla-bla", + "token_type", "Basic" + ); + + when(accessTokenResponse.statusCode()).thenReturn(200); + when(accessTokenResponse.body()).thenReturn(objectMapper.writeValueAsString(accessTokenJson)); + when(mockedHttpClient.send(requestCaptor.capture(), any())).thenReturn(accessTokenResponse); + + httpSend.send("SOME_BODY"); + + final var r = requestCaptor.getAllValues().get(1); + assertEquals( + Optional.of("Basic bla-bla-bla-bla"), + r.headers().firstValue(HttpRequestBuilder.HEADER_AUTHORIZATION) + ); + } + + @Test + void reuseAccessToken(@Mock final HttpResponse response) throws Exception { + final var config = defaultConfig(); + + final var httpSend = + new OAuth2HttpSender( + new HttpSinkConfig(config), + mockedHttpClient + ); + + final var requestCaptor = ArgumentCaptor.forClass(HttpRequest.class); + + final var accessTokenJson = Map.of( + "access_token", "bla-bla-bla-bla" + ); + + when(response.statusCode()).thenReturn(200); + when(response.body()).thenReturn(objectMapper.writeValueAsString(accessTokenJson)); + when(mockedHttpClient.send(requestCaptor.capture(), any())).thenReturn(response); + + httpSend.send("SOME_BODY"); + verify(mockedHttpClient, times(2)).send(any(HttpRequest.class), any()); + httpSend.send("SOME_BODY"); + verify(mockedHttpClient, times(3)).send(any(HttpRequest.class), any()); + + } + + @Test + void refreshAccessToken(@Mock final HttpResponse response) throws Exception { + final var config = defaultConfig(); + + final var httpSend = + new OAuth2HttpSender( + new HttpSinkConfig(config), + mockedHttpClient + ); + + final var requestCaptor = ArgumentCaptor.forClass(HttpRequest.class); + + when(mockedHttpClient.send(requestCaptor.capture(), any())) + .thenAnswer(new Answer>() { + + final Map accessTokenJson = Map.of( + "access_token", "bla-bla-bla-bla" + ); + final Map newAccessTokenJson = Map.of( + "access_token", "bla-bla-bla-bla-bla" + ); + + int accessTokenRequestCounter = 0; + + int messageRequestCounter = 0; + + @Override + public HttpResponse answer(final InvocationOnMock invocation) throws Throwable { + final var request = invocation.getArgument(0); + if (request.uri().equals(new URI("http://localhost:42/token"))) { + if (accessTokenRequestCounter == 1) { + when(response.statusCode()).thenReturn(200); + when(response.body()) + .thenReturn(objectMapper.writeValueAsString(newAccessTokenJson)); + } else { + when(response.statusCode()).thenReturn(200); + when(response.body()) + .thenReturn(objectMapper.writeValueAsString(accessTokenJson)); + } + accessTokenRequestCounter++; + } else { + if (messageRequestCounter == 1) { + when(response.statusCode()).thenReturn(401); + when(response.body()).thenReturn("NOK"); + } else { + when(response.statusCode()).thenReturn(200); + when(response.body()).thenReturn("OK"); + } + messageRequestCounter++; + } + return response; + } + }); + + httpSend.send("SOME_BODY_1"); + httpSend.send("SOME_BODY_2"); + httpSend.send("SOME_BODY_3"); + + assertEquals(2, + requestCaptor.getAllValues() + .stream() + .filter(r -> { + try { + return r.uri().equals(new URI("http://localhost:42/token")); + } catch (final URISyntaxException e) { + throw new RuntimeException(e); + } + }) + .count()); + } + + @Test + void throwsConnectExceptionForNokToken(@Mock final HttpResponse response) + throws IOException, InterruptedException { + final var config = defaultConfig(); + + final var httpSend = + new OAuth2HttpSender( + new HttpSinkConfig(config), + mockedHttpClient + ); + when(response.statusCode()).thenReturn(400); + when(response.body()).thenReturn("NOK"); + when(mockedHttpClient.send(any(HttpRequest.class), any())) + .thenReturn(response); + + final var e = assertThrows(ConnectException.class, () -> httpSend.send("SOME_BODY")); + + assertEquals( + "Sending failed and no retries remain, stopping", + e.getMessage()); + + } + + @Test + void throwsConnectExceptionOnRefreshToken(@Mock final HttpResponse response) + throws IOException, InterruptedException { + + final var httpSend = + new OAuth2HttpSender( + new HttpSinkConfig(defaultConfig()), + mockedHttpClient + ); + + + when(mockedHttpClient.send(any(HttpRequest.class), any())) + .thenAnswer(new Answer>() { + + final Map accessTokenJson = Map.of( + "access_token", "bla-bla-bla-bla" + ); + + int accessTokenRequestCounter = 0; + + int messageRequestCounter = 0; + + @Override + public HttpResponse answer(final InvocationOnMock invocation) throws Throwable { + final var request = invocation.getArgument(0); + if (request.uri().equals(new URI("http://localhost:42/token"))) { + if (accessTokenRequestCounter >= 1) { + when(response.statusCode()).thenReturn(400); + when(response.body()).thenReturn("NOK"); + } else { + when(response.statusCode()).thenReturn(200); + when(response.body()) + .thenReturn(objectMapper.writeValueAsString(accessTokenJson)); + } + accessTokenRequestCounter++; + } else { + if (messageRequestCounter == 1) { + when(response.statusCode()).thenReturn(401); + when(response.body()).thenReturn("NOK"); + } else { + when(response.statusCode()).thenReturn(200); + when(response.body()).thenReturn("OK"); + } + messageRequestCounter++; + } + return response; + } + }); + + final var e = assertThrows(ConnectException.class, () -> { + httpSend.send("SOME_BODY_1"); + httpSend.send("SOME_BODY_2"); + }); + + assertEquals( + "Sending failed and no retries remain, stopping", + e.getMessage()); + + } + + + private Map defaultConfig() { + return Map.of( + "http.url", "http://localhost:42", + "http.authorization.type", "oauth2", + "oauth2.access.token.url", "http://localhost:42/token", + "oauth2.client.id", "some_client_id", + "oauth2.client.secret", "some_client_secret" + ); + } + +}