From 366ba0df6bc2ae4c2aa0bba48233617075dd8af6 Mon Sep 17 00:00:00 2001 From: Andrey Pleskach Date: Wed, 24 Mar 2021 22:07:45 +0100 Subject: [PATCH] Code refactoring 1. Moved RecordSender class into package io.aiven.kafka.connect.http.recordsender package 2. Moved HttpSender into io.aiven.kafka.connect.http.sender package and split up HttpSender behave for 2 diff auth type with auth and without auth 3. Moved backoff policy into HtpSender --- .../connect/http/AvroIntegrationTest.java | 2 - .../kafka/connect/http/IntegrationTest.java | 2 +- .../io/aiven/kafka/connect/http/Batcher.java | 20 --- .../aiven/kafka/connect/http/HttpSender.java | 78 ---------- .../kafka/connect/http/HttpSinkTask.java | 29 +--- .../kafka/connect/http/RecordSender.java | 74 --------- .../connect/http/config/HttpSinkConfig.java | 2 +- .../{ => recordsender}/BatchRecordSender.java | 17 +-- .../http/recordsender/RecordSender.java | 47 ++++++ .../SingleRecordSender.java | 15 +- .../kafka/connect/http/sender/HttpSender.java | 144 ++++++++++++++++++ .../http/config/HttpSinkConfigTest.java | 13 +- 12 files changed, 219 insertions(+), 224 deletions(-) delete mode 100644 src/main/java/io/aiven/kafka/connect/http/Batcher.java delete mode 100644 src/main/java/io/aiven/kafka/connect/http/HttpSender.java delete mode 100644 src/main/java/io/aiven/kafka/connect/http/RecordSender.java rename src/main/java/io/aiven/kafka/connect/http/{ => recordsender}/BatchRecordSender.java (80%) create mode 100644 src/main/java/io/aiven/kafka/connect/http/recordsender/RecordSender.java rename src/main/java/io/aiven/kafka/connect/http/{ => recordsender}/SingleRecordSender.java (72%) create mode 100644 src/main/java/io/aiven/kafka/connect/http/sender/HttpSender.java diff --git a/src/integration-test/java/io/aiven/kafka/connect/http/AvroIntegrationTest.java b/src/integration-test/java/io/aiven/kafka/connect/http/AvroIntegrationTest.java index 5e1b228..a45f476 100644 --- a/src/integration-test/java/io/aiven/kafka/connect/http/AvroIntegrationTest.java +++ b/src/integration-test/java/io/aiven/kafka/connect/http/AvroIntegrationTest.java @@ -44,7 +44,6 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.slf4j.Logger; @@ -149,7 +148,6 @@ final void tearDown() { @Test @Timeout(30) - @Disabled final void testBasicDelivery() throws ExecutionException, InterruptedException { final BodyRecorderHandler bodyRecorderHandler = new BodyRecorderHandler(); mockServer.addHandler(bodyRecorderHandler); diff --git a/src/integration-test/java/io/aiven/kafka/connect/http/IntegrationTest.java b/src/integration-test/java/io/aiven/kafka/connect/http/IntegrationTest.java index 9a97721..3af4f67 100644 --- a/src/integration-test/java/io/aiven/kafka/connect/http/IntegrationTest.java +++ b/src/integration-test/java/io/aiven/kafka/connect/http/IntegrationTest.java @@ -204,7 +204,7 @@ final void testFailingEvery3rdRequest() throws ExecutionException, InterruptedEx TestUtils.waitForCondition( () -> bodyRecorderHandler.recorderBodies().size() >= expectedBodies.size(), - 10000, + 10_000, "All requests received by HTTP server" ); log.info("Recorded request bodies: {}", bodyRecorderHandler.recorderBodies()); diff --git a/src/main/java/io/aiven/kafka/connect/http/Batcher.java b/src/main/java/io/aiven/kafka/connect/http/Batcher.java deleted file mode 100644 index f9f72df..0000000 --- a/src/main/java/io/aiven/kafka/connect/http/Batcher.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright 2019 Aiven Oy - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.aiven.kafka.connect.http; - -final class Batcher { -} diff --git a/src/main/java/io/aiven/kafka/connect/http/HttpSender.java b/src/main/java/io/aiven/kafka/connect/http/HttpSender.java deleted file mode 100644 index 5cdcc00..0000000 --- a/src/main/java/io/aiven/kafka/connect/http/HttpSender.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright 2019 Aiven Oy - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.aiven.kafka.connect.http; - -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URL; -import java.net.http.HttpClient; -import java.net.http.HttpRequest; -import java.net.http.HttpResponse; -import java.time.Duration; - -import org.apache.kafka.connect.errors.ConnectException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -final class HttpSender { - private static final Logger log = LoggerFactory.getLogger(HttpSender.class); - - private static final String HEADER_AUTHORIZATION = "Authorization"; - private static final String HEADER_CONTENT_TYPE = "Content-Type"; - - private static final Duration HTTP_TIMEOUT = Duration.ofSeconds(30); - - private final HttpClient httpClient; - private final HttpRequest.Builder requestTemplate; - - HttpSender(final URL url, - final String headerAuthorization, - final String headerContentType) { - this.httpClient = HttpClient.newHttpClient(); - - final URI uri; - try { - uri = url.toURI(); - } catch (final URISyntaxException e) { - throw new ConnectException(e); - } - requestTemplate = HttpRequest.newBuilder(uri) - .timeout(HTTP_TIMEOUT); - if (headerAuthorization != null) { - requestTemplate.header(HEADER_AUTHORIZATION, headerAuthorization); - } - if (headerContentType != null) { - requestTemplate.header(HEADER_CONTENT_TYPE, headerContentType); - } - } - - void send(final String body) throws IOException, InterruptedException { - final HttpRequest request = requestTemplate.copy() - .POST(HttpRequest.BodyPublishers.ofString(body)) - .build(); - final HttpResponse response; - response = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); - log.debug("Server replied with status code {} and body {}", response.statusCode(), response.body()); - - if (response.statusCode() >= 400) { - throw new IOException("Server replied with status code " + response.statusCode() - + " and body " + response.body()); - } - } -} diff --git a/src/main/java/io/aiven/kafka/connect/http/HttpSinkTask.java b/src/main/java/io/aiven/kafka/connect/http/HttpSinkTask.java index 3878d2d..4e8a9e7 100644 --- a/src/main/java/io/aiven/kafka/connect/http/HttpSinkTask.java +++ b/src/main/java/io/aiven/kafka/connect/http/HttpSinkTask.java @@ -20,12 +20,13 @@ import java.util.Map; import java.util.Objects; -import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; import io.aiven.kafka.connect.http.config.HttpSinkConfig; +import io.aiven.kafka.connect.http.recordsender.RecordSender; +import io.aiven.kafka.connect.http.sender.HttpSender; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,32 +39,21 @@ public final class HttpSinkTask extends SinkTask { // required by Connect public HttpSinkTask() { + this(null); } - // for testing - HttpSinkTask(final HttpSender httpSender) { + protected HttpSinkTask(final HttpSender httpSender) { this.httpSender = httpSender; } @Override public void start(final Map props) { Objects.requireNonNull(props); - final var config = new HttpSinkConfig(props); - if (this.httpSender == null) { - this.httpSender = new HttpSender( - config.httpUrl(), - config.headerAuthorization(), - config.headerContentType()); - } - - if (config.batchingEnabled()) { - recordSender = new BatchRecordSender(httpSender, - config.batchMaxSize(), config.maxRetries(), config.retryBackoffMs()); - } else { - recordSender = new SingleRecordSender(httpSender, config.maxRetries(), config.retryBackoffMs()); + this.httpSender = HttpSender.createHttpSender(config); } + recordSender = RecordSender.createRecordSender(httpSender, config); } @Override @@ -76,11 +66,7 @@ public void put(final Collection records) { throw new DataException("Record value must not be null"); } } - try { - recordSender.send(records); - } catch (final InterruptedException e) { - throw new ConnectException(e); - } + recordSender.send(records); } } @@ -93,4 +79,5 @@ public void stop() { public String version() { return Version.VERSION; } + } diff --git a/src/main/java/io/aiven/kafka/connect/http/RecordSender.java b/src/main/java/io/aiven/kafka/connect/http/RecordSender.java deleted file mode 100644 index ff17c88..0000000 --- a/src/main/java/io/aiven/kafka/connect/http/RecordSender.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright 2019 Aiven Oy - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.aiven.kafka.connect.http; - -import java.io.IOException; -import java.util.Collection; - -import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.sink.SinkRecord; - -import io.aiven.kafka.connect.http.converter.RecordValueConverter; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -abstract class RecordSender { - private static final Logger log = LoggerFactory.getLogger(RecordSender.class); - - private final HttpSender httpSender; - - private final int maxRetries; - private final int retryBackoffMs; - - protected final RecordValueConverter recordValueConverter = new RecordValueConverter(); - - RecordSender(final HttpSender httpSender, - final int maxRetries, final int retryBackoffMs) { - this.httpSender = httpSender; - - this.maxRetries = maxRetries; - this.retryBackoffMs = retryBackoffMs; - } - - abstract void send(final Collection records) throws InterruptedException; - - /** - * Sends a HTTP body using {@code httpSender}, respecting the configured retry policy. - * - * @return whether the sending was successful. - */ - protected void sendWithRetries(final String body) throws InterruptedException { - int remainRetries = this.maxRetries; - while (true) { - try { - httpSender.send(body); - return; - } catch (final IOException e) { - if (remainRetries > 0) { - log.info("Sending failed, will retry in {} ms ({} retries remain)", - this.retryBackoffMs, remainRetries, e); - remainRetries -= 1; - Thread.sleep(this.retryBackoffMs); - } else { - log.error("Sending failed and no retries remain, stopping"); - throw new ConnectException(e); - } - } - } - } -} diff --git a/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java b/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java index 7c134c1..1f4725c 100644 --- a/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java +++ b/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java @@ -90,7 +90,7 @@ public String toString() { configDef.define( HTTP_AUTHORIZATION_TYPE_CONFIG, ConfigDef.Type.STRING, - ConfigDef.NO_DEFAULT_VALUE, + AuthorizationType.NONE.name, new ConfigDef.Validator() { @Override public void ensureValid(final String name, final Object value) { diff --git a/src/main/java/io/aiven/kafka/connect/http/BatchRecordSender.java b/src/main/java/io/aiven/kafka/connect/http/recordsender/BatchRecordSender.java similarity index 80% rename from src/main/java/io/aiven/kafka/connect/http/BatchRecordSender.java rename to src/main/java/io/aiven/kafka/connect/http/recordsender/BatchRecordSender.java index 0a3c0f1..cf4b5d5 100644 --- a/src/main/java/io/aiven/kafka/connect/http/BatchRecordSender.java +++ b/src/main/java/io/aiven/kafka/connect/http/recordsender/BatchRecordSender.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package io.aiven.kafka.connect.http; +package io.aiven.kafka.connect.http.recordsender; import java.util.ArrayList; import java.util.Collection; @@ -22,34 +22,33 @@ import org.apache.kafka.connect.sink.SinkRecord; +import io.aiven.kafka.connect.http.sender.HttpSender; + final class BatchRecordSender extends RecordSender { private static final String BATCH_RECORD_SEPARATOR = "\n"; private final int batchMaxSize; - BatchRecordSender(final HttpSender httpSender, - final int batchMaxSize, - final int maxRetries, final int retryBackoffMs) { - super(httpSender, maxRetries, retryBackoffMs); + protected BatchRecordSender(final HttpSender httpSender, final int batchMaxSize) { + super(httpSender); this.batchMaxSize = batchMaxSize; } @Override - void send(final Collection records) throws InterruptedException { + public void send(final Collection records) { final List batch = new ArrayList<>(batchMaxSize); for (final var record : records) { batch.add(record); if (batch.size() >= batchMaxSize) { final String body = createRequestBody(batch); batch.clear(); - - sendWithRetries(body); + httpSender.send(body); } } if (!batch.isEmpty()) { final String body = createRequestBody(batch); - sendWithRetries(body); + httpSender.send(body); } } diff --git a/src/main/java/io/aiven/kafka/connect/http/recordsender/RecordSender.java b/src/main/java/io/aiven/kafka/connect/http/recordsender/RecordSender.java new file mode 100644 index 0000000..ee1497a --- /dev/null +++ b/src/main/java/io/aiven/kafka/connect/http/recordsender/RecordSender.java @@ -0,0 +1,47 @@ +/* + * Copyright 2019 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.http.recordsender; + +import java.util.Collection; + +import org.apache.kafka.connect.sink.SinkRecord; + +import io.aiven.kafka.connect.http.config.HttpSinkConfig; +import io.aiven.kafka.connect.http.converter.RecordValueConverter; +import io.aiven.kafka.connect.http.sender.HttpSender; + +public abstract class RecordSender { + + protected final HttpSender httpSender; + + protected final RecordValueConverter recordValueConverter = new RecordValueConverter(); + + protected RecordSender(final HttpSender httpSender) { + this.httpSender = httpSender; + } + + public abstract void send(final Collection records); + + public static RecordSender createRecordSender(final HttpSender httpSender, final HttpSinkConfig config) { + if (config.batchingEnabled()) { + return new BatchRecordSender(httpSender, config.batchMaxSize()); + } else { + return new SingleRecordSender(httpSender); + } + } + +} diff --git a/src/main/java/io/aiven/kafka/connect/http/SingleRecordSender.java b/src/main/java/io/aiven/kafka/connect/http/recordsender/SingleRecordSender.java similarity index 72% rename from src/main/java/io/aiven/kafka/connect/http/SingleRecordSender.java rename to src/main/java/io/aiven/kafka/connect/http/recordsender/SingleRecordSender.java index 9dc3a16..777c0ad 100644 --- a/src/main/java/io/aiven/kafka/connect/http/SingleRecordSender.java +++ b/src/main/java/io/aiven/kafka/connect/http/recordsender/SingleRecordSender.java @@ -14,23 +14,26 @@ * limitations under the License. */ -package io.aiven.kafka.connect.http; +package io.aiven.kafka.connect.http.recordsender; import java.util.Collection; import org.apache.kafka.connect.sink.SinkRecord; +import io.aiven.kafka.connect.http.sender.HttpSender; + final class SingleRecordSender extends RecordSender { - SingleRecordSender(final HttpSender httpSender, - final int maxRetries, final int retryBackoffMs) { - super(httpSender, maxRetries, retryBackoffMs); + + protected SingleRecordSender(final HttpSender httpSender) { + super(httpSender); } @Override - void send(final Collection records) throws InterruptedException { + public void send(final Collection records) { for (final SinkRecord record : records) { final String body = recordValueConverter.convert(record); - sendWithRetries(body); + httpSender.send(body); } } + } diff --git a/src/main/java/io/aiven/kafka/connect/http/sender/HttpSender.java b/src/main/java/io/aiven/kafka/connect/http/sender/HttpSender.java new file mode 100644 index 0000000..bd9396a --- /dev/null +++ b/src/main/java/io/aiven/kafka/connect/http/sender/HttpSender.java @@ -0,0 +1,144 @@ +/* + * Copyright 2019 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.http.sender; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.connect.errors.ConnectException; + +import io.aiven.kafka.connect.http.config.HttpSinkConfig; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class HttpSender { + + private static final Logger log = LoggerFactory.getLogger(HttpSender.class); + + protected static final String HEADER_AUTHORIZATION = "Authorization"; + + protected static final String HEADER_CONTENT_TYPE = "Content-Type"; + + private static final Duration HTTP_TIMEOUT = Duration.ofSeconds(30); + + private final HttpClient httpClient; + + protected final HttpRequest.Builder requestTemplate; + + protected final HttpSinkConfig config; + + protected interface OnResponseHandler { + + void onResponse(final HttpResponse response) throws IOException; + + } + + protected final OnResponseHandler onHttpErrorResponseHandler = response -> { + if (response.statusCode() >= 400) { + if (response.statusCode() < 200 || response.statusCode() > 299) { + log.warn("Got unexpected HTTP status code: {}", response.statusCode()); + } + throw new IOException("Server replied with status code " + response.statusCode() + + " and body " + response.body()); + } + }; + + protected HttpSender(final HttpSinkConfig config) { + this.config = config; + this.httpClient = HttpClient.newHttpClient(); + try { + requestTemplate = HttpRequest.newBuilder(config.httpUrl().toURI()).timeout(HTTP_TIMEOUT); + } catch (final URISyntaxException e) { + throw new ConnectException(e); + } + } + + public final void send(final String body) { + final HttpRequest request = requestTemplate.copy() + .POST(HttpRequest.BodyPublishers.ofString(body)) + .build(); + sendWithRetries(request, onHttpErrorResponseHandler); + } + + /** + * Sends a HTTP body using {@code httpSender}, respecting the configured retry policy. + * + * @return whether the sending was successful. + */ + protected void sendWithRetries(final HttpRequest request, + final OnResponseHandler onHttpResponseHandler) { + int remainRetries = config.maxRetries(); + while (remainRetries >= 0) { + try { + try { + final var response = + httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + log.debug("Server replied with status code {} and body {}", response.statusCode(), response.body()); + onHttpResponseHandler.onResponse(response); + return; + } catch (final IOException e) { + log.info("Sending failed, will retry in {} ms ({} retries remain)", + config.retryBackoffMs(), remainRetries, e); + remainRetries -= 1; + TimeUnit.MILLISECONDS.sleep(config.retryBackoffMs()); + } + } catch (final InterruptedException e) { + log.error("Sending failed due to InterruptedException, stopping"); + throw new ConnectException(e); + } + } + log.error("Sending failed and no retries remain, stopping"); + throw new ConnectException("Sending failed and no retries remain, stopping"); + } + + public static HttpSender createHttpSender(final HttpSinkConfig config) { + switch (config.authorizationType()) { + case STATIC: + return new StaticAuthHttpSender(config); + case NONE: + return new NoAuthHttpSender(config); + default: + throw new ConnectException("Can't create HTTP sender for auth type: " + config.authorizationType()); + } + } + + private static final class NoAuthHttpSender extends HttpSender { + + private NoAuthHttpSender(final HttpSinkConfig config) { + super(config); + } + + } + + private static final class StaticAuthHttpSender extends HttpSender { + + private StaticAuthHttpSender(final HttpSinkConfig config) { + super(config); + requestTemplate.header(HEADER_AUTHORIZATION, config.headerAuthorization()); + if (config.headerContentType() != null) { + requestTemplate.header(HEADER_CONTENT_TYPE, config.headerContentType()); + } + } + } + +} diff --git a/src/test/java/io/aiven/kafka/connect/http/config/HttpSinkConfigTest.java b/src/test/java/io/aiven/kafka/connect/http/config/HttpSinkConfigTest.java index 482bb88..7f44078 100644 --- a/src/test/java/io/aiven/kafka/connect/http/config/HttpSinkConfigTest.java +++ b/src/test/java/io/aiven/kafka/connect/http/config/HttpSinkConfigTest.java @@ -61,7 +61,7 @@ void correctMinimalConfig() throws MalformedURLException { } @Test - void invalidUrl() throws MalformedURLException { + void invalidUrl() { final Map properties = Map.of( "http.url", "#http://localhost:8090", "http.authorization.type", "none" @@ -74,17 +74,6 @@ void invalidUrl() throws MalformedURLException { t.getMessage()); } - @Test - void missingAuthorizationType() { - final Map properties = Map.of( - "http.url", "http://localhost:8090" - ); - - final Throwable t = assertThrows(ConfigException.class, () -> new HttpSinkConfig(properties)); - assertEquals("Missing required configuration \"http.authorization.type\" which has no default value.", - t.getMessage()); - } - @ParameterizedTest @ValueSource(strings = {"none", "static"}) void supportedAuthorizationType(final String authorization) {