diff --git a/src/main/java/io/aiven/kafka/connect/http/HttpSinkTask.java b/src/main/java/io/aiven/kafka/connect/http/HttpSinkTask.java index 4e8a9e7..942103d 100644 --- a/src/main/java/io/aiven/kafka/connect/http/HttpSinkTask.java +++ b/src/main/java/io/aiven/kafka/connect/http/HttpSinkTask.java @@ -54,6 +54,9 @@ public void start(final Map props) { this.httpSender = HttpSender.createHttpSender(config); } recordSender = RecordSender.createRecordSender(httpSender, config); + if (Objects.nonNull(config.kafkaRetryBackoffMs())) { + context.timeout(config.kafkaRetryBackoffMs()); + } } @Override diff --git a/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java b/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java index c12cc24..fe3c8b8 100644 --- a/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java +++ b/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java @@ -22,6 +22,8 @@ import java.net.URL; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; @@ -37,6 +39,8 @@ public class HttpSinkConfig extends AbstractConfig { private static final String HTTP_HEADERS_AUTHORIZATION_CONFIG = "http.headers.authorization"; private static final String HTTP_HEADERS_CONTENT_TYPE_CONFIG = "http.headers.content.type"; + public static final String KAFKA_RETRY_BACKOFF_MS_CONFIG = "kafka.retry.backoff.ms"; + private static final String OAUTH2_ACCESS_TOKEN_URL_CONFIG = "oauth2.access.token.url"; private static final String OAUTH2_CLIENT_ID_CONFIG = "oauth2.client.id"; private static final String OAUTH2_CLIENT_SECRET_CONFIG = "oauth2.client.secret"; @@ -272,6 +276,39 @@ private static void addBatchingConfigGroup(final ConfigDef configDef) { private static void addRetriesConfigGroup(final ConfigDef configDef) { int groupCounter = 0; + configDef.define( + KAFKA_RETRY_BACKOFF_MS_CONFIG, + ConfigDef.Type.LONG, + null, + new ConfigDef.Validator() { + + static final long MAXIMUM_BACKOFF_POLICY = 86400000; // 24 hours + + @Override + public void ensureValid(final String name, final Object value) { + if (Objects.isNull(value)) { + return; + } + assert value instanceof Long; + final var longValue = (Long) value; + if (longValue < 0) { + throw new ConfigException(name, value, "Value must be at least 0"); + } else if (longValue > MAXIMUM_BACKOFF_POLICY) { + throw new ConfigException(name, value, + "Value must be no more than " + MAXIMUM_BACKOFF_POLICY + " (24 hours)"); + } + } + }, + ConfigDef.Importance.MEDIUM, + "The retry backoff in milliseconds. " + + "This config is used to notify Kafka Connect to retry delivering a message batch or " + + "performing recovery in case of transient exceptions. Maximum value is " + + TimeUnit.HOURS.toMillis(24) + " (24 hours).", + DELIVERY_GROUP, + groupCounter++, + ConfigDef.Width.NONE, + KAFKA_RETRY_BACKOFF_MS_CONFIG + ); configDef.define( MAX_RETRIES_CONFIG, ConfigDef.Type.INT, @@ -355,6 +392,10 @@ public final URI httpUri() { return toURI(HTTP_URL_CONFIG); } + public final Long kafkaRetryBackoffMs() { + return getLong(KAFKA_RETRY_BACKOFF_MS_CONFIG); + } + public AuthorizationType authorizationType() { return AuthorizationType.forName(getString(HTTP_AUTHORIZATION_TYPE_CONFIG)); } diff --git a/src/test/java/io/aiven/kafka/connect/http/config/HttpSinkConfigTest.java b/src/test/java/io/aiven/kafka/connect/http/config/HttpSinkConfigTest.java index 511f1f8..92580f2 100644 --- a/src/test/java/io/aiven/kafka/connect/http/config/HttpSinkConfigTest.java +++ b/src/test/java/io/aiven/kafka/connect/http/config/HttpSinkConfigTest.java @@ -21,6 +21,7 @@ import java.net.URL; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.kafka.common.config.ConfigException; @@ -65,6 +66,7 @@ void correctMinimalConfig() throws MalformedURLException, URISyntaxException { assertNull(config.oauth2ClientScope()); assertEquals(OAuth2AuthorizationMode.HEADER, config.oauth2AuthorizationMode()); assertEquals("access_token", config.oauth2ResponseTokenProperty()); + assertNull(config.kafkaRetryBackoffMs()); } @Test @@ -389,6 +391,38 @@ void negativeRetryBackoffMs() { t.getMessage()); } + @Test + void tooBigKafkaRetryBackoffMs() { + final Map properties = Map.of( + "http.url", "http://localhost:8090", + "http.authorization.type", "none", + "kafka.retry.backoff.ms", String.valueOf(TimeUnit.HOURS.toMillis(25)) + ); + + final Throwable t = assertThrows( + ConfigException.class, () -> new HttpSinkConfig(properties) + ); + assertEquals("Invalid value 90000000 for configuration kafka.retry.backoff.ms: " + + "Value must be no more than 86400000 (24 hours)", + t.getMessage()); + } + + + @Test + void negativeKafkaRetryBackoffMs() { + final Map properties = Map.of( + "http.url", "http://localhost:8090", + "http.authorization.type", "none", + "kafka.retry.backoff.ms", "-1" + ); + + final Throwable t = assertThrows( + ConfigException.class, () -> new HttpSinkConfig(properties) + ); + assertEquals("Invalid value -1 for configuration kafka.retry.backoff.ms: Value must be at least 0", + t.getMessage()); + } + @Test void correctRetryBackoffMs() { final Map properties = Map.of( @@ -400,4 +434,17 @@ void correctRetryBackoffMs() { final HttpSinkConfig config = new HttpSinkConfig(properties); assertEquals(12345, config.retryBackoffMs()); } + + @Test + void customKafkaRetryBackoffMs() { + final Map properties = Map.of( + "http.url", "http://localhost:8090", + "http.authorization.type", "none", + "kafka.retry.backoff.ms", "6000" + ); + + final var config = new HttpSinkConfig(properties); + assertEquals(6000, config.kafkaRetryBackoffMs()); + } + }