Skip to content

Commit

Permalink
Log deprecation warning
Browse files Browse the repository at this point in the history
  • Loading branch information
Moritz Mack committed Jul 5, 2022
1 parent e541fae commit 967edbe
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,11 @@ public Read<T> withCoder(Coder<T> coder) {

@Override
public PCollection<T> expand(PBegin input) {
LoggerFactory.getLogger(DynamoDBIO.class)
.warn(
"You are using a deprecated IO for DynamoDB. Please migrate to module "
+ "'org.apache.beam:beam-sdks-java-io-amazon-web-services2'.");

checkArgument((getScanRequestFn() != null), "withScanRequestFn() is required");
checkArgument((getAwsClientsProvider() != null), "withAwsClientsProvider() is required");
ScanRequest scanRequest = getScanRequestFn().apply(null);
Expand Down Expand Up @@ -459,6 +464,11 @@ public Write<T> withDeduplicateKeys(List<String> deduplicateKeys) {

@Override
public PCollection<Void> expand(PCollection<T> input) {
LoggerFactory.getLogger(DynamoDBIO.class)
.warn(
"You are using a deprecated IO for DynamoDB. Please migrate to module "
+ "'org.apache.beam:beam-sdks-java-io-amazon-web-services2'.");

return input.apply(ParDo.of(new WriteFn<>(this)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,11 @@ public Write withCoder(Coder<PublishResult> coder) {

@Override
public PCollectionTuple expand(PCollection<PublishRequest> input) {
LoggerFactory.getLogger(SnsIO.class)
.warn(
"You are using a deprecated IO for Sns. Please migrate to module "
+ "'org.apache.beam:beam-sdks-java-io-amazon-web-services2'.");

checkArgument(getTopicName() != null, "withTopicName() is required");
PCollectionTuple result =
input.apply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.beam.sdk.values.PDone;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.slf4j.LoggerFactory;

/**
* An unbounded source for Amazon Simple Queue Service (SQS).
Expand Down Expand Up @@ -169,6 +170,10 @@ public Read withQueueUrl(String queueUrl) {

@Override
public PCollection<Message> expand(PBegin input) {
LoggerFactory.getLogger(SqsIO.class)
.warn(
"You are using a deprecated IO for Sqs. Please migrate to module "
+ "'org.apache.beam:beam-sdks-java-io-amazon-web-services2'.");

org.apache.beam.sdk.io.Read.Unbounded<Message> unbounded =
org.apache.beam.sdk.io.Read.from(
Expand Down Expand Up @@ -202,6 +207,11 @@ abstract static class Builder {

@Override
public PDone expand(PCollection<SendMessageRequest> input) {
LoggerFactory.getLogger(SqsIO.class)
.warn(
"You are using a deprecated IO for Sqs. Please migrate to module "
+ "'org.apache.beam:beam-sdks-java-io-amazon-web-services2'.");

input.apply(
ParDo.of(
new SqsWriteFn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,10 @@ public Read<T> withMaxCapacityPerShard(Integer maxCapacity) {

@Override
public PCollection<T> expand(PBegin input) {
LOG.warn(
"You are using a deprecated IO for Kinesis. Please migrate to module "
+ "'org.apache.beam:beam-sdks-java-io-amazon-web-services2'.");

Unbounded<KinesisRecord> unbounded =
org.apache.beam.sdk.io.Read.from(
new KinesisSource(
Expand Down Expand Up @@ -880,6 +884,10 @@ Write withRetries(int retries) {

@Override
public PDone expand(PCollection<byte[]> input) {
LOG.warn(
"You are using a deprecated IO for Kinesis. Please migrate to module "
+ "'org.apache.beam:beam-sdks-java-io-amazon-web-services2'.");

checkArgument(getStreamName() != null, "withStreamName() is required");
checkArgument(
(getPartitionKey() != null) || (getPartitioner() != null),
Expand Down

0 comments on commit 967edbe

Please sign in to comment.