Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support AWS Kinesis Data Streams as a Source #4836

Merged
merged 8 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions data-prepper-plugins/kinesis-source/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java'
}

dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:common')
implementation project(path: ':data-prepper-plugins:buffer-common')
implementation project(path: ':data-prepper-plugins:aws-plugin-api')
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'io.micrometer:micrometer-core'
implementation 'software.amazon.kinesis:amazon-kinesis-client:2.6.0'
compileOnly 'org.projectlombok:lombok:1.18.20'
annotationProcessor 'org.projectlombok:lombok:1.18.20'

testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
testImplementation project(':data-prepper-test-common')
testImplementation project(':data-prepper-test-event')
testImplementation project(':data-prepper-core')
testImplementation project(':data-prepper-plugin-framework')
testImplementation project(':data-prepper-pipeline-parser')
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
testImplementation project(':data-prepper-plugins:parse-json-processor')
testImplementation project(':data-prepper-plugins:newline-codecs')
}

jacocoTestCoverageVerification {
dependsOn jacocoTestReport
violationRules {
rule { //in addition to core projects rule
limit {
minimum = 1.0
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.extension;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every class needs a license header. Please add to all files.

See: https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md#license-headers

Hint: If you use IntelliJ you can configure it to automatically add this. And you can have it add the headers to all the files in this Gradle module (since they are all new).


import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Getter;

@Getter
public class KinesisLeaseConfig {
@JsonProperty("lease_coordination")
private KinesisLeaseCoordinationTableConfig leaseCoordinationTable;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.extension;

import org.opensearch.dataprepper.model.annotations.DataPrepperExtensionPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.plugin.ExtensionPlugin;
import org.opensearch.dataprepper.model.plugin.ExtensionPoints;

@DataPrepperExtensionPlugin(modelType = KinesisLeaseConfig.class, rootKeyJsonPath = "/kinesis", allowInPipelineConfigurations = true)
public class KinesisLeaseConfigExtension implements ExtensionPlugin {

private KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier;
@DataPrepperPluginConstructor
public KinesisLeaseConfigExtension(final KinesisLeaseConfig kinesisLeaseConfig) {
this.kinesisLeaseConfigSupplier = new KinesisLeaseConfigSupplier(kinesisLeaseConfig);
}

@Override
public void apply(final ExtensionPoints extensionPoints) {
extensionPoints.addExtensionProvider(new KinesisLeaseConfigProvider(this.kinesisLeaseConfigSupplier));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.extension;

import org.opensearch.dataprepper.model.plugin.ExtensionProvider;

import java.util.Optional;

class KinesisLeaseConfigProvider implements ExtensionProvider<KinesisLeaseConfigSupplier> {
private final KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier;

public KinesisLeaseConfigProvider(final KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier) {
this.kinesisLeaseConfigSupplier = kinesisLeaseConfigSupplier;
}

@Override
public Optional<KinesisLeaseConfigSupplier> provideInstance(Context context) {
return Optional.of(this.kinesisLeaseConfigSupplier);
}

@Override
public Class<KinesisLeaseConfigSupplier> supportedClass() {
return KinesisLeaseConfigSupplier.class;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.extension;

import java.util.Optional;

public class KinesisLeaseConfigSupplier {

private KinesisLeaseConfig kinesisLeaseConfig;

public KinesisLeaseConfigSupplier(final KinesisLeaseConfig kinesisLeaseConfig) {
this.kinesisLeaseConfig = kinesisLeaseConfig;
}

public Optional<KinesisLeaseConfig> getKinesisExtensionLeaseConfig() {
return Optional.ofNullable(kinesisLeaseConfig);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.extension;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Getter;
import lombok.NonNull;
import software.amazon.awssdk.regions.Region;

@Getter
public class KinesisLeaseCoordinationTableConfig {

@JsonProperty("table_name")
@NonNull
private String tableName;
dlvenable marked this conversation as resolved.
Show resolved Hide resolved

@JsonProperty("region")
@NonNull
sb2k16 marked this conversation as resolved.
Show resolved Hide resolved
private String region;

public Region getAwsRegion() {
return Region.of(region);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.source;

import java.net.InetAddress;
import java.net.UnknownHostException;

/**
* Generate a unique ID to represent a consumer application instance.
*/
public class HostNameWorkerIdentifierGenerator implements WorkerIdentifierGenerator {

private static final String hostName;

static {
try {
hostName = InetAddress.getLocalHost().getHostName();
} catch (final UnknownHostException e) {
throw new RuntimeException(e);
}
}


/**
* @return Default to use host name.
*/
@Override
public String generate() {
return hostName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.source;

import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.AwsAuthenticationConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.KinesisClientUtil;

public class KinesisClientFactory {
private final AwsCredentialsProvider awsCredentialsProvider;
private final AwsCredentialsProvider defaultCredentialsProvider;
private final AwsAuthenticationConfig awsAuthenticationConfig;

public KinesisClientFactory(final AwsCredentialsSupplier awsCredentialsSupplier,
final AwsAuthenticationConfig awsAuthenticationConfig) {
awsCredentialsProvider = awsCredentialsSupplier.getProvider(AwsCredentialsOptions.builder()
.withRegion(awsAuthenticationConfig.getAwsRegion())
.withStsRoleArn(awsAuthenticationConfig.getAwsStsRoleArn())
.withStsExternalId(awsAuthenticationConfig.getAwsStsExternalId())
.withStsHeaderOverrides(awsAuthenticationConfig.getAwsStsHeaderOverrides())
.build());
defaultCredentialsProvider = awsCredentialsSupplier.getProvider(AwsCredentialsOptions.defaultOptions());
this.awsAuthenticationConfig = awsAuthenticationConfig;
}

public DynamoDbAsyncClient buildDynamoDBClient(Region region) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it recommended to use the Async client? If so why?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @graytaylor0 for your review. This is required to construct ConfigsBuilder class in KCL

return DynamoDbAsyncClient.builder()
.credentialsProvider(defaultCredentialsProvider)
.region(region)
.build();
}

public KinesisAsyncClient buildKinesisAsyncClient(Region region) {
return KinesisClientUtil.createKinesisAsyncClient(
KinesisAsyncClient.builder()
.credentialsProvider(awsCredentialsProvider)
.region(region)
);
}

public CloudWatchAsyncClient buildCloudWatchAsyncClient(Region region) {
return CloudWatchAsyncClient.builder()
.credentialsProvider(defaultCredentialsProvider)
.region(region)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.source;

import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisStreamConfig;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
import software.amazon.awssdk.services.kinesis.model.StreamDescription;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy;
import software.amazon.kinesis.processor.MultiStreamTracker;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;


public class KinesisMultiStreamTracker implements MultiStreamTracker {
private static final String COLON = ":";

private final KinesisAsyncClient kinesisClient;
private final KinesisSourceConfig sourceConfig;
private final String applicationName;

public KinesisMultiStreamTracker(KinesisAsyncClient kinesisClient, final KinesisSourceConfig sourceConfig, final String applicationName) {
this.kinesisClient = kinesisClient;
this.sourceConfig = sourceConfig;
this.applicationName = applicationName;
}

@Override
public List<StreamConfig> streamConfigList() {
List<StreamConfig> streamConfigList = new ArrayList<>();
for (KinesisStreamConfig kinesisStreamConfig : sourceConfig.getStreams()) {
StreamConfig streamConfig;
try {
streamConfig = getStreamConfig(kinesisStreamConfig);
} catch (Exception e) {
throw new RuntimeException(e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What kind of exceptions get thrown here? If it's a config error we should throw InvalidPluginConfigurationException and provide context with the message where we can

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an internal StreamConfig class which is passed to KCL. It is not data prepper kinesis stream config.

}
streamConfigList.add(streamConfig);
}
return streamConfigList;
}

private StreamConfig getStreamConfig(KinesisStreamConfig kinesisStreamConfig) throws Exception {
StreamIdentifier sourceStreamIdentifier = getStreamIdentifier(kinesisStreamConfig);
return new StreamConfig(sourceStreamIdentifier,
InitialPositionInStreamExtended.newInitialPosition(kinesisStreamConfig.getInitialPosition()));
}

private StreamIdentifier getStreamIdentifier(KinesisStreamConfig kinesisStreamConfig) throws Exception {
DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder()
.streamName(kinesisStreamConfig.getName())
.build();
DescribeStreamResponse describeStreamResponse = kinesisClient.describeStream(describeStreamRequest).get();
String streamIdentifierString = getStreamIdentifierString(describeStreamResponse.streamDescription());
return StreamIdentifier.multiStreamInstance(streamIdentifierString);
}

private String getStreamIdentifierString(StreamDescription streamDescription) {
String accountId = streamDescription.streamARN().split(COLON)[4];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this an actual ARN, or just an identifier? If Arn we should use the Arn class to get the account id

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @graytaylor0. I will fix this.

long creationEpochSecond = streamDescription.streamCreationTimestamp().getEpochSecond();
return String.join(COLON, accountId, streamDescription.streamName(), String.valueOf(creationEpochSecond));
}

/**
* Setting the deletion policy as autodetect and release shard lease with a wait time of 10 sec
*/
@Override
public FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy() {
return new FormerStreamsLeasesDeletionStrategy.AutoDetectionAndDeferredDeletionStrategy() {
@Override
public Duration waitPeriodToDeleteFormerStreams() {
return Duration.ofSeconds(10);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are trade offs for having this higher or lower?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the strategy to perform cleaning up of the leases for streams that just has been deleted. The wait period is to allow for any pending processing of records before cleaning up the lease from the database.

}
};

}
}
Loading
Loading