Skip to content

Commit

Permalink
Merge pull request #22 from aiven/backoff-policy
Browse files Browse the repository at this point in the history
Add kafka backoff timeout in ms
  • Loading branch information
ivanyu authored Apr 14, 2021
2 parents 0a12c06 + fb4eb66 commit 3874cb6
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 0 deletions.
3 changes: 3 additions & 0 deletions src/main/java/io/aiven/kafka/connect/http/HttpSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ public void start(final Map<String, String> props) {
this.httpSender = HttpSender.createHttpSender(config);
}
recordSender = RecordSender.createRecordSender(httpSender, config);
if (Objects.nonNull(config.kafkaRetryBackoffMs())) {
context.timeout(config.kafkaRetryBackoffMs());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
Expand All @@ -37,6 +39,8 @@ public class HttpSinkConfig extends AbstractConfig {
private static final String HTTP_HEADERS_AUTHORIZATION_CONFIG = "http.headers.authorization";
private static final String HTTP_HEADERS_CONTENT_TYPE_CONFIG = "http.headers.content.type";

public static final String KAFKA_RETRY_BACKOFF_MS_CONFIG = "kafka.retry.backoff.ms";

private static final String OAUTH2_ACCESS_TOKEN_URL_CONFIG = "oauth2.access.token.url";
private static final String OAUTH2_CLIENT_ID_CONFIG = "oauth2.client.id";
private static final String OAUTH2_CLIENT_SECRET_CONFIG = "oauth2.client.secret";
Expand Down Expand Up @@ -272,6 +276,39 @@ private static void addBatchingConfigGroup(final ConfigDef configDef) {

private static void addRetriesConfigGroup(final ConfigDef configDef) {
int groupCounter = 0;
configDef.define(
KAFKA_RETRY_BACKOFF_MS_CONFIG,
ConfigDef.Type.LONG,
null,
new ConfigDef.Validator() {

static final long MAXIMUM_BACKOFF_POLICY = 86400000; // 24 hours

@Override
public void ensureValid(final String name, final Object value) {
if (Objects.isNull(value)) {
return;
}
assert value instanceof Long;
final var longValue = (Long) value;
if (longValue < 0) {
throw new ConfigException(name, value, "Value must be at least 0");
} else if (longValue > MAXIMUM_BACKOFF_POLICY) {
throw new ConfigException(name, value,
"Value must be no more than " + MAXIMUM_BACKOFF_POLICY + " (24 hours)");
}
}
},
ConfigDef.Importance.MEDIUM,
"The retry backoff in milliseconds. "
+ "This config is used to notify Kafka Connect to retry delivering a message batch or "
+ "performing recovery in case of transient exceptions. Maximum value is "
+ TimeUnit.HOURS.toMillis(24) + " (24 hours).",
DELIVERY_GROUP,
groupCounter++,
ConfigDef.Width.NONE,
KAFKA_RETRY_BACKOFF_MS_CONFIG
);
configDef.define(
MAX_RETRIES_CONFIG,
ConfigDef.Type.INT,
Expand Down Expand Up @@ -355,6 +392,10 @@ public final URI httpUri() {
return toURI(HTTP_URL_CONFIG);
}

public final Long kafkaRetryBackoffMs() {
return getLong(KAFKA_RETRY_BACKOFF_MS_CONFIG);
}

public AuthorizationType authorizationType() {
return AuthorizationType.forName(getString(HTTP_AUTHORIZATION_TYPE_CONFIG));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.common.config.ConfigException;

Expand Down Expand Up @@ -65,6 +66,7 @@ void correctMinimalConfig() throws MalformedURLException, URISyntaxException {
assertNull(config.oauth2ClientScope());
assertEquals(OAuth2AuthorizationMode.HEADER, config.oauth2AuthorizationMode());
assertEquals("access_token", config.oauth2ResponseTokenProperty());
assertNull(config.kafkaRetryBackoffMs());
}

@Test
Expand Down Expand Up @@ -389,6 +391,38 @@ void negativeRetryBackoffMs() {
t.getMessage());
}

@Test
void tooBigKafkaRetryBackoffMs() {
final Map<String, String> properties = Map.of(
"http.url", "http://localhost:8090",
"http.authorization.type", "none",
"kafka.retry.backoff.ms", String.valueOf(TimeUnit.HOURS.toMillis(25))
);

final Throwable t = assertThrows(
ConfigException.class, () -> new HttpSinkConfig(properties)
);
assertEquals("Invalid value 90000000 for configuration kafka.retry.backoff.ms: "
+ "Value must be no more than 86400000 (24 hours)",
t.getMessage());
}


@Test
void negativeKafkaRetryBackoffMs() {
final Map<String, String> properties = Map.of(
"http.url", "http://localhost:8090",
"http.authorization.type", "none",
"kafka.retry.backoff.ms", "-1"
);

final Throwable t = assertThrows(
ConfigException.class, () -> new HttpSinkConfig(properties)
);
assertEquals("Invalid value -1 for configuration kafka.retry.backoff.ms: Value must be at least 0",
t.getMessage());
}

@Test
void correctRetryBackoffMs() {
final Map<String, String> properties = Map.of(
Expand All @@ -400,4 +434,17 @@ void correctRetryBackoffMs() {
final HttpSinkConfig config = new HttpSinkConfig(properties);
assertEquals(12345, config.retryBackoffMs());
}

@Test
void customKafkaRetryBackoffMs() {
final Map<String, String> properties = Map.of(
"http.url", "http://localhost:8090",
"http.authorization.type", "none",
"kafka.retry.backoff.ms", "6000"
);

final var config = new HttpSinkConfig(properties);
assertEquals(6000, config.kafkaRetryBackoffMs());
}

}

0 comments on commit 3874cb6

Please sign in to comment.