Skip to content

Commit

Permalink
Add Rds source config (#4573)
Browse files Browse the repository at this point in the history
* Add rds source config and some skeleton code

Signed-off-by: Hai Yan <oeyh@amazon.com>

* Add unit tests

Signed-off-by: Hai Yan <oeyh@amazon.com>

* Address comments

Signed-off-by: Hai Yan <oeyh@amazon.com>

* Add cluster and aurora options

Signed-off-by: Hai Yan <oeyh@amazon.com>

---------

Signed-off-by: Hai Yan <oeyh@amazon.com>
  • Loading branch information
oeyh authored Jun 4, 2024
1 parent d2aa114 commit 609b94c
Show file tree
Hide file tree
Showing 14 changed files with 704 additions and 0 deletions.
26 changes: 26 additions & 0 deletions data-prepper-plugins/rds-source/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
plugins {
id 'java'
}

dependencies {
implementation project(path: ':data-prepper-api')
implementation project(path: ':data-prepper-plugins:aws-plugin-api')
implementation project(path: ':data-prepper-plugins:buffer-common')
implementation project(path: ':data-prepper-plugins:http-common')
implementation project(path: ':data-prepper-plugins:common')

implementation 'io.micrometer:micrometer-core'

implementation 'software.amazon.awssdk:sts'
implementation 'software.amazon.awssdk:arns'
implementation 'software.amazon.awssdk:rds'
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:netty-nio-client'

implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'

testImplementation testLibs.mockito.inline
testImplementation project(path: ':data-prepper-test-common')
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.rds;

import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.plugins.source.rds.configuration.AwsAuthenticationConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.services.rds.RdsClient;

public class ClientFactory {
private final AwsCredentialsProvider awsCredentialsProvider;
private final AwsAuthenticationConfig awsAuthenticationConfig;

public ClientFactory(final AwsCredentialsSupplier awsCredentialsSupplier,
final AwsAuthenticationConfig awsAuthenticationConfig) {
awsCredentialsProvider = awsCredentialsSupplier.getProvider(AwsCredentialsOptions.builder()
.withRegion(awsAuthenticationConfig.getAwsRegion())
.withStsRoleArn(awsAuthenticationConfig.getAwsStsRoleArn())
.withStsExternalId(awsAuthenticationConfig.getAwsStsExternalId())
.withStsHeaderOverrides(awsAuthenticationConfig.getAwsStsHeaderOverrides())
.build());
this.awsAuthenticationConfig = awsAuthenticationConfig;
}

public RdsClient buildRdsClient() {
return RdsClient.builder()
.region(awsAuthenticationConfig.getAwsRegion())
.credentialsProvider(awsCredentialsProvider)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.rds;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.rds.export.ExportScheduler;
import org.opensearch.dataprepper.plugins.source.rds.leader.LeaderScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.rds.RdsClient;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class RdsService {
private static final Logger LOG = LoggerFactory.getLogger(RdsService.class);

private final RdsClient rdsClient;
private final EnhancedSourceCoordinator sourceCoordinator;
private final PluginMetrics pluginMetrics;
private final RdsSourceConfig sourceConfig;
private ExecutorService executor;

public RdsService(final EnhancedSourceCoordinator sourceCoordinator,
final RdsSourceConfig sourceConfig,
final ClientFactory clientFactory,
final PluginMetrics pluginMetrics) {
this.sourceCoordinator = sourceCoordinator;
this.pluginMetrics = pluginMetrics;
this.sourceConfig = sourceConfig;

rdsClient = clientFactory.buildRdsClient();
}

/**
* This service start three long-running threads (scheduler)
* Each thread is responsible for one type of job.
* The data will be guaranteed to be sent to {@link Buffer} in order.
*
* @param buffer Data Prepper Buffer
*/
public void start(Buffer<Record<Event>> buffer) {
LOG.info("Start running RDS service");
final List<Runnable> runnableList = new ArrayList<>();
runnableList.add(new LeaderScheduler(sourceCoordinator, sourceConfig));
runnableList.add(new ExportScheduler(sourceCoordinator, rdsClient, pluginMetrics));

executor = Executors.newFixedThreadPool(runnableList.size());
runnableList.forEach(executor::submit);
}

/**
* Interrupt the running of schedulers.
* Each scheduler must implement logic for gracefully shutdown.
*/
public void shutdown() {
if (executor != null) {
LOG.info("shutdown RDS schedulers");
executor.shutdownNow();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.rds;

import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.UsesEnhancedSourceCoordination;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;
import java.util.function.Function;

@DataPrepperPlugin(name = "rds", pluginType = Source.class, pluginConfigurationType = RdsSourceConfig.class)
public class RdsSource implements Source<Record<Event>>, UsesEnhancedSourceCoordination {

private static final Logger LOG = LoggerFactory.getLogger(RdsSource.class);

private final ClientFactory clientFactory;
private final PluginMetrics pluginMetrics;
private final RdsSourceConfig sourceConfig;
private EnhancedSourceCoordinator sourceCoordinator;
private RdsService rdsService;

@DataPrepperPluginConstructor
public RdsSource(final PluginMetrics pluginMetrics,
final RdsSourceConfig sourceConfig,
final AwsCredentialsSupplier awsCredentialsSupplier) {
this.pluginMetrics = pluginMetrics;
this.sourceConfig = sourceConfig;

clientFactory = new ClientFactory(awsCredentialsSupplier, sourceConfig.getAwsAuthenticationConfig());
}

@Override
public void start(Buffer<Record<Event>> buffer) {
Objects.requireNonNull(sourceCoordinator);

rdsService = new RdsService(sourceCoordinator, sourceConfig, clientFactory, pluginMetrics);

LOG.info("Start RDS service");
rdsService.start(buffer);
}

@Override
public void stop() {
LOG.info("Stop RDS service");
if (Objects.nonNull(rdsService)) {
rdsService.shutdown();
}
}

@Override
public void setEnhancedSourceCoordinator(EnhancedSourceCoordinator sourceCoordinator) {
this.sourceCoordinator = sourceCoordinator;
this.sourceCoordinator.initialize();
}

@Override
public Function<SourcePartitionStoreItem, EnhancedSourcePartition> getPartitionFactory() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.rds;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.plugins.source.rds.configuration.AwsAuthenticationConfig;
import org.opensearch.dataprepper.plugins.source.rds.configuration.EngineType;
import org.opensearch.dataprepper.plugins.source.rds.configuration.ExportConfig;

import java.util.List;

/**
* Configuration for RDS Source
*/
public class RdsSourceConfig {

/**
* Identifier for RDS instance/cluster or Aurora cluster
*/
@JsonProperty("db_identifier")
private String dbIdentifier;

/**
* Whether the db_identifier refers to a cluster or an instance
*/
@JsonProperty("cluster")
private boolean isCluster = false;

@JsonProperty("engine")
private EngineType engine = EngineType.MYSQL;

/**
* Whether the source is an Aurora cluster
*/
@JsonProperty("aurora")
private boolean isAurora = false;

/**
* The table name is in the format of `database.table` for MySQL engine
*/
@JsonProperty("table_names")
private List<String> tableNames;

@JsonProperty("aws")
@NotNull
@Valid
private AwsAuthenticationConfig awsAuthenticationConfig;

@JsonProperty("acknowledgments")
private boolean acknowledgments = false;

/**
* S3 bucket for holding both export and stream data
*/
@JsonProperty("s3_bucket")
private String s3Bucket;

@JsonProperty("s3_prefix")
private String s3Prefix;

@JsonProperty("s3_region")
private String s3Region;

@JsonProperty("export")
@Valid
private ExportConfig exportConfig;

public String getDbIdentifier() {
return dbIdentifier;
}

public boolean isCluster() {
return isCluster;
}

public EngineType getEngine() {
return engine;
}

public boolean isAurora() {
return isAurora;
}

public List<String> getTableNames() {
return tableNames;
}

public AwsAuthenticationConfig getAwsAuthenticationConfig() {
return awsAuthenticationConfig;
}

public boolean isAcknowledgmentsEnabled() {
return acknowledgments;
}

public String getS3Bucket() {
return s3Bucket;
}

public String getS3Prefix() {
return s3Prefix;
}

public String getS3Region() {
return s3Region;
}

public ExportConfig getExport() {
return exportConfig;
}

public boolean isExportEnabled() {
return exportConfig != null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.rds.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Size;
import software.amazon.awssdk.regions.Region;

import java.util.Map;

public class AwsAuthenticationConfig {

@JsonProperty("region")
@Size(min = 1, message = "Region cannot be empty string")
private String awsRegion;

@JsonProperty("sts_role_arn")
@Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters")
private String awsStsRoleArn;

@JsonProperty("sts_external_id")
@Size(min = 2, max = 1224, message = "awsStsExternalId length should be between 2 and 1224 characters")
private String awsStsExternalId;

@JsonProperty("sts_header_overrides")
@Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override")
private Map<String, String> awsStsHeaderOverrides;

public String getAwsStsRoleArn() {
return awsStsRoleArn;
}

public String getAwsStsExternalId() {
return awsStsExternalId;
}

public Region getAwsRegion() {
return awsRegion != null ? Region.of(awsRegion) : null;
}

public Map<String, String> getAwsStsHeaderOverrides() {
return awsStsHeaderOverrides;
}
}

Loading

0 comments on commit 609b94c

Please sign in to comment.