Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
ctlove0523 committed Dec 12, 2022
1 parent 21abd2f commit 26552dc
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import java.util.HashMap;
import java.util.Map;

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import org.apache.commons.lang3.StringUtils;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
Expand All @@ -29,7 +31,6 @@
import com.netflix.conductor.sqs.eventqueue.SQSObservableQueue.Builder;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.sqs.AmazonSQSClient;
import rx.Scheduler;

@Configuration
Expand All @@ -39,13 +40,13 @@ public class SQSEventQueueConfiguration {

@ConditionalOnMissingBean
@Bean
public AmazonSQSClient getSQSClient(AWSCredentialsProvider credentialsProvider) {
return new AmazonSQSClient(credentialsProvider);
public AmazonSQS getSQSClient(AWSCredentialsProvider credentialsProvider) {
return AmazonSQSClientBuilder.standard().withCredentials(credentialsProvider).build();
}

@Bean
public EventQueueProvider sqsEventQueueProvider(
AmazonSQSClient sqsClient, SQSEventQueueProperties properties, Scheduler scheduler) {
AmazonSQS sqsClient, SQSEventQueueProperties properties, Scheduler scheduler) {
return new SQSEventQueueProvider(sqsClient, properties, scheduler);
}

Expand All @@ -57,7 +58,7 @@ public EventQueueProvider sqsEventQueueProvider(
public Map<Status, ObservableQueue> getQueues(
ConductorProperties conductorProperties,
SQSEventQueueProperties properties,
AmazonSQSClient sqsClient) {
AmazonSQS sqsClient) {
String stack = "";
if (conductorProperties.getStack() != null && conductorProperties.getStack().length() > 0) {
stack = conductorProperties.getStack() + "_";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,26 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.amazonaws.services.sqs.AmazonSQS;
import org.springframework.lang.NonNull;

import com.netflix.conductor.core.events.EventQueueProvider;
import com.netflix.conductor.core.events.queue.ObservableQueue;
import com.netflix.conductor.sqs.eventqueue.SQSObservableQueue;

import com.amazonaws.services.sqs.AmazonSQSClient;
import rx.Scheduler;

public class SQSEventQueueProvider implements EventQueueProvider {

private static final Logger LOGGER = LoggerFactory.getLogger(SQSEventQueueProvider.class);
private final Map<String, ObservableQueue> queues = new ConcurrentHashMap<>();
private final AmazonSQSClient client;
private final AmazonSQS client;
private final int batchSize;
private final long pollTimeInMS;
private final int visibilityTimeoutInSeconds;
private final Scheduler scheduler;

public SQSEventQueueProvider(
AmazonSQSClient client, SQSEventQueueProperties properties, Scheduler scheduler) {
AmazonSQS client, SQSEventQueueProperties properties, Scheduler scheduler) {
this.client = client;
this.batchSize = properties.getBatchSize();
this.pollTimeInMS = properties.getPollTimeDuration().toMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import com.amazonaws.services.sqs.AmazonSQS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -35,7 +36,6 @@
import com.amazonaws.auth.policy.Statement;
import com.amazonaws.auth.policy.Statement.Effect;
import com.amazonaws.auth.policy.actions.SQSActions;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.BatchResultErrorEntry;
import com.amazonaws.services.sqs.model.ChangeMessageVisibilityRequest;
import com.amazonaws.services.sqs.model.CreateQueueRequest;
Expand Down Expand Up @@ -64,15 +64,15 @@ public class SQSObservableQueue implements ObservableQueue {
private final String queueName;
private final int visibilityTimeoutInSeconds;
private final int batchSize;
private final AmazonSQSClient client;
private final AmazonSQS client;
private final long pollTimeInMS;
private final String queueURL;
private final Scheduler scheduler;
private volatile boolean running;

private SQSObservableQueue(
String queueName,
AmazonSQSClient client,
AmazonSQS client,
int visibilityTimeoutInSeconds,
int batchSize,
long pollTimeInMS,
Expand Down Expand Up @@ -176,7 +176,7 @@ public static class Builder {
private int visibilityTimeout = 30; // seconds
private int batchSize = 5;
private long pollTimeInMS = 100;
private AmazonSQSClient client;
private AmazonSQS client;
private List<String> accountsToAuthorize = new LinkedList<>();
private Scheduler scheduler;

Expand All @@ -199,7 +199,7 @@ public Builder withBatchSize(int batchSize) {
return this;
}

public Builder withClient(AmazonSQSClient client) {
public Builder withClient(AmazonSQS client) {
this.client = client;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import com.netflix.conductor.core.events.queue.Message;

import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.ListQueuesRequest;
import com.amazonaws.services.sqs.model.ListQueuesResult;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
Expand Down Expand Up @@ -74,7 +74,7 @@ public void testException() {
.withReceiptHandle("receiptHandle");
Answer<?> answer = (Answer<ReceiveMessageResult>) invocation -> new ReceiveMessageResult();

AmazonSQSClient client = mock(AmazonSQSClient.class);
AmazonSQS client = mock(AmazonSQS.class);
when(client.listQueues(any(ListQueuesRequest.class)))
.thenReturn(new ListQueuesResult().withQueueUrls("junit_queue_url"));
when(client.receiveMessage(any(ReceiveMessageRequest.class)))
Expand Down

0 comments on commit 26552dc

Please sign in to comment.