Skip to content

Commit

Permalink
Catch KCL known exceptions to reduce noisy log statements
Browse files Browse the repository at this point in the history
Signed-off-by: Souvik Bose <souvbose@amazon.com>
  • Loading branch information
sbose2k21 committed Oct 24, 2024
1 parent 96f119a commit 26b67d7
Showing 1 changed file with 9 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

package org.opensearch.dataprepper.plugins.kinesis.source;

import com.amazonaws.SdkClientException;
import lombok.Setter;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
Expand All @@ -35,6 +36,8 @@
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.KinesisClientLibDependencyException;
import software.amazon.kinesis.exceptions.ThrottlingException;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

Expand Down Expand Up @@ -134,20 +137,21 @@ public void shutDown() {
}

public Scheduler getScheduler(final Buffer<Record<Event>> buffer) {

int numRetries = 0;
while (scheduler == null && numRetries++ < kinesisSourceConfig.getMaxInitializationAttempts()) {
int maxAttempts = kinesisSourceConfig.getMaxInitializationAttempts();
while (scheduler == null && maxAttempts-- > 0 ) {
try {
scheduler = createScheduler(buffer);
} catch (SdkClientException | KinesisClientLibDependencyException | ThrottlingException ex) {
LOG.error(NOISY, "Caught exception when initializing KCL Scheduler due to {}. Number of remaining retries: {}", ex.getMessage(), maxAttempts);
} catch (Exception ex) {
LOG.error(NOISY, "Caught exception when initializing KCL Scheduler. Will retry", ex);
LOG.error(NOISY, "Caught exception when initializing KCL Scheduler. Number of remaining retries: {}", maxAttempts, ex);
}

if (scheduler == null) {
try {
Thread.sleep(kinesisSourceConfig.getInitializationBackoffTime().toMillis());
} catch (InterruptedException e){
LOG.debug("Interrupted exception!");
LOG.debug("Interrupted exception.");
}
}
}
Expand Down

0 comments on commit 26b67d7

Please sign in to comment.