Skip to content

Commit

Permalink
Merge pull request #20 from aiven/oauth2-support
Browse files Browse the repository at this point in the history
Oauth2 support
  • Loading branch information
HelenMel authored Apr 1, 2021
2 parents 389ebb5 + bd33604 commit 2693cae
Show file tree
Hide file tree
Showing 16 changed files with 1,217 additions and 158 deletions.
9 changes: 7 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,17 @@ dependencies {
compileOnly "org.apache.kafka:connect-json:$kafkaVersion"

implementation "org.slf4j:slf4j-api:1.7.28"
implementation "com.fasterxml.jackson.core:jackson-databind:2.12.2"

testImplementation "org.junit.jupiter:junit-jupiter:5.5.2"
testImplementation "org.hamcrest:hamcrest:2.1"
testImplementation "org.apache.kafka:connect-api:$kafkaVersion"

testImplementation "org.junit.jupiter:junit-jupiter:5.7.1"
testImplementation "org.mockito:mockito-core:3.8.0"
testImplementation "org.mockito:mockito-junit-jupiter:3.8.0"

testRuntime "org.apache.logging.log4j:log4j-slf4j-impl:2.12.1"
testRuntime "com.fasterxml.jackson.core:jackson-databind:2.9.10"
testRuntime "com.fasterxml.jackson.core:jackson-databind:2.12.2"
testRuntime "org.apache.kafka:connect-json:$kafkaVersion"

integrationTestImplementation("org.apache.kafka:connect-runtime:$kafkaVersion") {
Expand Down
3 changes: 3 additions & 0 deletions config/checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,7 @@
files="(IntegrationTest|ConnectRunner).java"/>
<suppress checks="ClassDataAbstractionCoupling"
files="(IntegrationTest|ConnectRunner).java"/>
<suppress checks="MethodLength" files="(HttpSinkConfig).java"/>
<suppress checks="CyclomaticComplexity" files="(HttpSinkConfig).java"/>

</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

public enum AuthorizationType {
NONE("none"),
OAUTH2("oauth2"),
STATIC("static");

public final String name;
Expand All @@ -36,14 +37,15 @@ public static AuthorizationType forName(final String name) {

if (NONE.name.equalsIgnoreCase(name)) {
return NONE;
} else if (OAUTH2.name.equalsIgnoreCase(name)) {
return OAUTH2;
} else if (STATIC.name.equalsIgnoreCase(name)) {
return STATIC;
} else {
throw new IllegalArgumentException("Unknown authorization type: " + name);
}
}

public static Collection<String> names() {
return Arrays.stream(values()).map(v -> v.name).collect(Collectors.toList());
}
public static final Collection<String> NAMES =
Arrays.stream(values()).map(v -> v.name).collect(Collectors.toList());
}
200 changes: 167 additions & 33 deletions src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package io.aiven.kafka.connect.http.config;

import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.List;
import java.util.Map;
Expand All @@ -30,10 +32,18 @@
public class HttpSinkConfig extends AbstractConfig {
private static final String CONNECTION_GROUP = "Connection";
private static final String HTTP_URL_CONFIG = "http.url";

private static final String HTTP_AUTHORIZATION_TYPE_CONFIG = "http.authorization.type";
private static final String HTTP_HEADERS_AUTHORIZATION_CONFIG = "http.headers.authorization";
private static final String HTTP_HEADERS_CONTENT_TYPE_CONFIG = "http.headers.content.type";

private static final String OAUTH2_ACCESS_TOKEN_URL_CONFIG = "oauth2.access.token.url";
private static final String OAUTH2_CLIENT_ID_CONFIG = "oauth2.client.id";
private static final String OAUTH2_CLIENT_SECRET_CONFIG = "oauth2.client.secret";
private static final String OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG = "oauth2.client.authorization.mode";
private static final String OAUTH2_CLIENT_SCOPE_CONFIG = "oauth2.client.scope";
private static final String OAUTH2_RESPONSE_TOKEN_PROPERTY_CONFIG = "oauth2.response.token.property";

private static final String BATCHING_GROUP = "Batching";
private static final String BATCHING_ENABLED_CONFIG = "batching.enabled";
private static final String BATCH_MAX_SIZE_CONFIG = "batch.max.size";
Expand All @@ -58,27 +68,7 @@ private static void addConnectionConfigGroup(final ConfigDef configDef) {
HTTP_URL_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.NO_DEFAULT_VALUE,
new ConfigDef.Validator() {
@Override
public void ensureValid(final String name, final Object value) {
if (value == null) {
throw new ConfigException(HTTP_URL_CONFIG, value, "can't be null");
}
if (!(value instanceof String)) {
throw new ConfigException(HTTP_URL_CONFIG, value, "must be string");
}
try {
new URL((String) value);
} catch (final MalformedURLException e) {
throw new ConfigException(HTTP_URL_CONFIG, value, "malformed URL");
}
}

@Override
public String toString() {
return "HTTP(S) ULRs";
}
},
new UrlValidator(),
ConfigDef.Importance.HIGH,
"The URL to send data to.",
CONNECTION_GROUP,
Expand All @@ -90,7 +80,7 @@ public String toString() {
configDef.define(
HTTP_AUTHORIZATION_TYPE_CONFIG,
ConfigDef.Type.STRING,
AuthorizationType.NONE.name,
ConfigDef.NO_DEFAULT_VALUE,
new ConfigDef.Validator() {
@Override
public void ensureValid(final String name, final Object value) {
Expand All @@ -99,16 +89,16 @@ public void ensureValid(final String name, final Object value) {
}
assert value instanceof String;
final String valueStr = (String) value;
if (!AuthorizationType.names().contains(valueStr)) {
if (!AuthorizationType.NAMES.contains(valueStr)) {
throw new ConfigException(
HTTP_AUTHORIZATION_TYPE_CONFIG, valueStr,
"supported values are: " + AuthorizationType.names());
"supported values are: " + AuthorizationType.NAMES);
}
}

@Override
public String toString() {
return AuthorizationType.names().toString();
return AuthorizationType.NAMES.toString();
}
},
ConfigDef.Importance.HIGH,
Expand All @@ -118,7 +108,7 @@ public String toString() {
ConfigDef.Width.SHORT,
HTTP_AUTHORIZATION_TYPE_CONFIG,
List.of(HTTP_HEADERS_AUTHORIZATION_CONFIG),
FixedSetRecommender.ofSupportedValues(AuthorizationType.names())
FixedSetRecommender.ofSupportedValues(AuthorizationType.NAMES)
);

configDef.define(
Expand Down Expand Up @@ -156,6 +146,100 @@ public boolean visible(final String name, final Map<String, Object> parsedConfig
ConfigDef.Width.MEDIUM,
HTTP_HEADERS_CONTENT_TYPE_CONFIG
);

configDef.define(
OAUTH2_ACCESS_TOKEN_URL_CONFIG,
ConfigDef.Type.STRING,
null,
new UrlValidator(true),
ConfigDef.Importance.HIGH,
"The URL to be used for fetching access token. "
+ "Client Credentials is only supported grand type.",
CONNECTION_GROUP,
groupCounter++,
ConfigDef.Width.LONG,
OAUTH2_ACCESS_TOKEN_URL_CONFIG
);
configDef.define(
OAUTH2_CLIENT_ID_CONFIG,
ConfigDef.Type.STRING,
null,
new ConfigDef.NonEmptyStringWithoutControlChars(),
ConfigDef.Importance.HIGH,
"The client id used for fetching access token.",
CONNECTION_GROUP,
groupCounter++,
ConfigDef.Width.LONG,
OAUTH2_CLIENT_SECRET_CONFIG
);
configDef.define(
OAUTH2_CLIENT_SECRET_CONFIG,
ConfigDef.Type.PASSWORD,
null,
ConfigDef.Importance.HIGH,
"The secret used for fetching access token.",
CONNECTION_GROUP,
groupCounter++,
ConfigDef.Width.LONG,
OAUTH2_CLIENT_SECRET_CONFIG
);
configDef.define(
OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG,
ConfigDef.Type.STRING,
OAuth2AuthorizationMode.HEADER.name(),
new ConfigDef.Validator() {
@Override
public void ensureValid(final String name, final Object value) {
if (value == null) {
throw new ConfigException(name, null, "can't be null");
}
if (!(value instanceof String)) {
throw new ConfigException(name, value, "must be string");
}
if (!OAuth2AuthorizationMode.OAUTH2_AUTHORIZATION_MODES
.contains(value.toString().toUpperCase())) {
throw new ConfigException(
OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG, value,
"supported values are: " + OAuth2AuthorizationMode.OAUTH2_AUTHORIZATION_MODES);
}
}
},
ConfigDef.Importance.MEDIUM,
"Specifies how to encode client_id and client_secret in the OAuth2 authorization request. "
+ "If set to 'header', the credentials are encoded as an "
+ "'Authorization: Basic <base-64 encoded client_id:client_secret>' HTTP header. "
+ "If set to ‘url’, then client_id and client_secret are sent as URL encoded parameters. "
+ "Default is 'header'",
CONNECTION_GROUP,
groupCounter++,
ConfigDef.Width.LONG,
OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG
);
configDef.define(
OAUTH2_CLIENT_SCOPE_CONFIG,
ConfigDef.Type.STRING,
null,
new ConfigDef.NonEmptyStringWithoutControlChars(),
ConfigDef.Importance.LOW,
"The scope used for fetching access token.",
CONNECTION_GROUP,
groupCounter++,
ConfigDef.Width.LONG,
OAUTH2_CLIENT_SCOPE_CONFIG
);
configDef.define(
OAUTH2_RESPONSE_TOKEN_PROPERTY_CONFIG,
ConfigDef.Type.STRING,
"access_token",
new ConfigDef.NonEmptyStringWithoutControlChars(),
ConfigDef.Importance.LOW,
"The name of the JSON property containing the access token returned by the OAuth2 provider. "
+ "Default value is 'access_token'.",
CONNECTION_GROUP,
groupCounter++,
ConfigDef.Width.LONG,
OAUTH2_RESPONSE_TOKEN_PROPERTY_CONFIG
);
}

private static void addBatchingConfigGroup(final ConfigDef configDef) {
Expand Down Expand Up @@ -230,7 +314,28 @@ private void validate() {
+ " = " + AuthorizationType.STATIC);
}
break;

case OAUTH2:
if (oauth2AccessTokenUri() == null) {
throw new ConfigException(
OAUTH2_ACCESS_TOKEN_URL_CONFIG, getString(OAUTH2_ACCESS_TOKEN_URL_CONFIG),
"Must be present when " + HTTP_HEADERS_CONTENT_TYPE_CONFIG
+ " = " + AuthorizationType.OAUTH2);
}
if (oauth2ClientId() == null || oauth2ClientId().isEmpty()) {
throw new ConfigException(
OAUTH2_CLIENT_ID_CONFIG,
getString(OAUTH2_CLIENT_ID_CONFIG),
"Must be present when " + HTTP_HEADERS_CONTENT_TYPE_CONFIG
+ " = " + AuthorizationType.OAUTH2);
}
if (oauth2ClientSecret() == null || oauth2ClientSecret().value().isEmpty()) {
throw new ConfigException(
OAUTH2_CLIENT_SECRET_CONFIG,
getPassword(OAUTH2_CLIENT_SECRET_CONFIG),
"Must be present when " + HTTP_HEADERS_CONTENT_TYPE_CONFIG
+ " = " + AuthorizationType.OAUTH2);
}
break;
case NONE:
if (headerAuthorization() != null && !headerAuthorization().isBlank()) {
throw new ConfigException(
Expand All @@ -246,12 +351,8 @@ private void validate() {
}
}

public final URL httpUrl() {
try {
return new URL(getString(HTTP_URL_CONFIG));
} catch (final MalformedURLException e) {
throw new ConnectException(e);
}
public final URI httpUri() {
return toURI(HTTP_URL_CONFIG);
}

public AuthorizationType authorizationType() {
Expand Down Expand Up @@ -287,11 +388,44 @@ public final String connectorName() {
return originalsStrings().get(NAME_CONFIG);
}

public final URI oauth2AccessTokenUri() {
return getString(OAUTH2_ACCESS_TOKEN_URL_CONFIG) != null ? toURI(OAUTH2_ACCESS_TOKEN_URL_CONFIG) : null;
}

private URI toURI(final String propertyName) {
try {
return new URL(getString(propertyName)).toURI();
} catch (final MalformedURLException | URISyntaxException e) {
throw new ConnectException(e);
}
}

public final String oauth2ClientId() {
return getString(OAUTH2_CLIENT_ID_CONFIG);
}

public final Password oauth2ClientSecret() {
return getPassword(OAUTH2_CLIENT_SECRET_CONFIG);
}

public final OAuth2AuthorizationMode oauth2AuthorizationMode() {
return OAuth2AuthorizationMode.valueOf(getString(OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG).toUpperCase());
}

public final String oauth2ClientScope() {
return getString(OAUTH2_CLIENT_SCOPE_CONFIG);
}

public final String oauth2ResponseTokenProperty() {
return getString(OAUTH2_RESPONSE_TOKEN_PROPERTY_CONFIG);
}

public static void main(final String... args) {
System.out.println("=========================================");
System.out.println("HTTP Sink connector Configuration Options");
System.out.println("=========================================");
System.out.println();
System.out.println(configDef().toEnrichedRst());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2021 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.http.config;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public enum OAuth2AuthorizationMode {

HEADER,
URL;

static final List<String> OAUTH2_AUTHORIZATION_MODES =
Arrays.stream(OAuth2AuthorizationMode.values())
.map(OAuth2AuthorizationMode::name)
.collect(Collectors.toUnmodifiableList());

}
Loading

0 comments on commit 2693cae

Please sign in to comment.