diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPConnection.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPConnection.java index 3b566a9a7e..ab97c58e29 100644 --- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPConnection.java +++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPConnection.java @@ -14,7 +14,10 @@ import java.io.IOException; import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @@ -22,6 +25,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.netflix.conductor.contribs.queue.amqp.config.AMQPRetryPattern; +import com.netflix.conductor.contribs.queue.amqp.util.AMQPConstants; import com.netflix.conductor.contribs.queue.amqp.util.ConnectionType; import com.rabbitmq.client.Address; @@ -42,8 +47,11 @@ public class AMQPConnection { private static AMQPConnection amqpConnection = null; private static final String PUBLISHER = "Publisher"; private static final String SUBSCRIBER = "Subscriber"; - private static final String SEPARATOR = ":"; - Map queueNameToChannel = new ConcurrentHashMap(); + private static final Map> availableChannelPool = + new ConcurrentHashMap>(); + private static final Map subscriberReservedChannelPool = + new ConcurrentHashMap(); + private static AMQPRetryPattern retrySettings = null; private AMQPConnection() {} @@ -53,11 +61,13 @@ private AMQPConnection(final ConnectionFactory factory, final Address[] address) } public static synchronized AMQPConnection getInstance( - final ConnectionFactory factory, final Address[] address) { + final ConnectionFactory factory, + final Address[] address, + final AMQPRetryPattern retrySettings) { if (AMQPConnection.amqpConnection == null) { AMQPConnection.amqpConnection = new AMQPConnection(factory, address); } - + AMQPConnection.retrySettings = retrySettings; return AMQPConnection.amqpConnection; } @@ -71,181 +81,233 @@ public Address[] getAddresses() { } private Connection createConnection(String connectionPrefix) { - - try { - Connection connection = - factory.newConnection( - addresses, System.getenv("HOSTNAME") + "-" + connectionPrefix); - if (connection == null || !connection.isOpen()) { - throw new RuntimeException("Failed to open connection"); + int retryIndex = 1; + while (true) { + try { + Connection connection = + factory.newConnection( + addresses, System.getenv("HOSTNAME") + "-" + connectionPrefix); + if (connection == null || !connection.isOpen()) { + throw new RuntimeException("Failed to open connection"); + } + connection.addShutdownListener( + new ShutdownListener() { + @Override + public void shutdownCompleted(ShutdownSignalException cause) { + LOGGER.error( + "Received a shutdown exception for the connection {}. reason {} cause{}", + connection.getClientProvidedName(), + cause.getMessage(), + cause); + } + }); + connection.addBlockedListener( + new BlockedListener() { + @Override + public void handleUnblocked() throws IOException { + LOGGER.info( + "Connection {} is unblocked", + connection.getClientProvidedName()); + } + + @Override + public void handleBlocked(String reason) throws IOException { + LOGGER.error( + "Connection {} is blocked. reason: {}", + connection.getClientProvidedName(), + reason); + } + }); + return connection; + } catch (final IOException e) { + AMQPRetryPattern retry = retrySettings; + if (retry == null) { + final String error = + "IO error while connecting to " + + Arrays.stream(addresses) + .map(address -> address.toString()) + .collect(Collectors.joining(",")); + LOGGER.error(error, e); + throw new RuntimeException(error, e); + } + try { + retry.continueOrPropogate(e, retryIndex); + } catch (Exception ex) { + final String error = + "Retries completed. IO error while connecting to " + + Arrays.stream(addresses) + .map(address -> address.toString()) + .collect(Collectors.joining(",")); + LOGGER.error(error, e); + throw new RuntimeException(error, e); + } + retryIndex++; + } catch (final TimeoutException e) { + AMQPRetryPattern retry = retrySettings; + if (retry == null) { + final String error = + "Timeout while connecting to " + + Arrays.stream(addresses) + .map(address -> address.toString()) + .collect(Collectors.joining(",")); + LOGGER.error(error, e); + throw new RuntimeException(error, e); + } + try { + retry.continueOrPropogate(e, retryIndex); + } catch (Exception ex) { + final String error = + "Retries completed. Timeout while connecting to " + + Arrays.stream(addresses) + .map(address -> address.toString()) + .collect(Collectors.joining(",")); + LOGGER.error(error, e); + throw new RuntimeException(error, e); + } + retryIndex++; } - - connection.addShutdownListener( - new ShutdownListener() { - @Override - public void shutdownCompleted(ShutdownSignalException cause) { - LOGGER.error( - "Received a shutdown exception for the connection {}. reason {} cause{}", - connection.getClientProvidedName(), - cause.getMessage(), - cause); - } - }); - - connection.addBlockedListener( - new BlockedListener() { - @Override - public void handleUnblocked() throws IOException { - LOGGER.info( - "Connection {} is unblocked", - connection.getClientProvidedName()); - } - - @Override - public void handleBlocked(String reason) throws IOException { - LOGGER.error( - "Connection {} is blocked. reason: {}", - connection.getClientProvidedName(), - reason); - } - }); - - return connection; - } catch (final IOException e) { - final String error = - "IO error while connecting to " - + Arrays.stream(addresses) - .map(address -> address.toString()) - .collect(Collectors.joining(",")); - LOGGER.error(error, e); - throw new RuntimeException(error, e); - } catch (final TimeoutException e) { - final String error = - "Timeout while connecting to " - + Arrays.stream(addresses) - .map(address -> address.toString()) - .collect(Collectors.joining(",")); - LOGGER.error(error, e); - throw new RuntimeException(error, e); } } - public Channel getOrCreateChannel(ConnectionType connectionType, String queueOrExchangeName) { + public Channel getOrCreateChannel(ConnectionType connectionType, String queueOrExchangeName) + throws Exception { LOGGER.debug( "Accessing the channel for queueOrExchange {} with type {} ", queueOrExchangeName, connectionType); switch (connectionType) { case SUBSCRIBER: - return getOrCreateSubscriberChannel(queueOrExchangeName); - + String subChnName = connectionType + ";" + queueOrExchangeName; + if (subscriberReservedChannelPool.containsKey(subChnName)) { + Channel locChn = subscriberReservedChannelPool.get(subChnName); + if (locChn != null && locChn.isOpen()) { + return locChn; + } + } + synchronized (this) { + if (subscriberConnection == null || !subscriberConnection.isOpen()) { + subscriberConnection = createConnection(SUBSCRIBER); + } + } + Channel subChn = borrowChannel(connectionType, subscriberConnection); + // Add the subscribed channels to Map to avoid messages being acknowledged on + // different from the subscribed one + subscriberReservedChannelPool.put(subChnName, subChn); + return subChn; case PUBLISHER: - return getOrCreatePublisherChannel(queueOrExchangeName); + synchronized (this) { + if (publisherConnection == null || !publisherConnection.isOpen()) { + publisherConnection = createConnection(PUBLISHER); + } + } + return borrowChannel(connectionType, publisherConnection); default: return null; } } - private Channel getOrCreateSubscriberChannel(String queueOrExchangeName) { - - String prefix = SUBSCRIBER + SEPARATOR; - // Return the existing channel if it's still opened - Channel subscriberChannel = queueNameToChannel.get(prefix + queueOrExchangeName); - if (subscriberChannel != null) { - return subscriberChannel; - } + private Channel getOrCreateChannel(ConnectionType connType, Connection rmqConnection) { // Channel creation is required - try { - synchronized (this) { - if (subscriberConnection == null) { - subscriberConnection = createConnection(SUBSCRIBER); + Channel locChn = null; + int retryIndex = 1; + while (true) { + try { + LOGGER.debug("Creating a channel for " + connType); + locChn = rmqConnection.createChannel(); + if (locChn == null || !locChn.isOpen()) { + throw new RuntimeException("Fail to open " + connType + " channel"); } - LOGGER.debug("Creating a channel for subscriber"); - subscriberChannel = subscriberConnection.createChannel(); - subscriberChannel.addShutdownListener( + locChn.addShutdownListener( cause -> { LOGGER.error( - "subscription Channel has been shutdown: {}", + connType + " Channel has been shutdown: {}", cause.getMessage(), cause); }); - if (subscriberChannel == null || !subscriberChannel.isOpen()) { - throw new RuntimeException("Fail to open subscription channel"); + return locChn; + } catch (final IOException e) { + AMQPRetryPattern retry = retrySettings; + if (retry == null) { + throw new RuntimeException( + "Cannot open " + + connType + + " channel on " + + Arrays.stream(addresses) + .map(address -> address.toString()) + .collect(Collectors.joining(",")), + e); } - queueNameToChannel.putIfAbsent(prefix + queueOrExchangeName, subscriberChannel); - } - } catch (final IOException e) { - throw new RuntimeException( - "Cannot open subscription channel on " - + Arrays.stream(addresses) - .map(address -> address.toString()) - .collect(Collectors.joining(",")), - e); - } - - return subscriberChannel; - } - - private Channel getOrCreatePublisherChannel(String queueOrExchangeName) { - - String prefix = PUBLISHER + SEPARATOR; - Channel publisherChannel = queueNameToChannel.get(prefix + queueOrExchangeName); - if (publisherChannel != null) { - return publisherChannel; - } - // Channel creation is required - try { - - synchronized (this) { - if (publisherConnection == null) { - publisherConnection = createConnection(PUBLISHER); + try { + retry.continueOrPropogate(e, retryIndex); + } catch (Exception ex) { + throw new RuntimeException( + "Retries completed. Cannot open " + + connType + + " channel on " + + Arrays.stream(addresses) + .map(address -> address.toString()) + .collect(Collectors.joining(",")), + e); } - - LOGGER.debug("Creating a channel for publisher"); - publisherChannel = publisherConnection.createChannel(); - publisherChannel.addShutdownListener( - cause -> { - LOGGER.error( - "Publish Channel has been shutdown: {}", - cause.getMessage(), - cause); - }); - - if (publisherChannel == null || !publisherChannel.isOpen()) { - throw new RuntimeException("Fail to open publish channel"); + retryIndex++; + } catch (final Exception e) { + AMQPRetryPattern retry = retrySettings; + if (retry == null) { + throw new RuntimeException( + "Cannot open " + + connType + + " channel on " + + Arrays.stream(addresses) + .map(address -> address.toString()) + .collect(Collectors.joining(",")), + e); + } + try { + retry.continueOrPropogate(e, retryIndex); + } catch (Exception ex) { + throw new RuntimeException( + "Retries completed. Cannot open " + + connType + + " channel on " + + Arrays.stream(addresses) + .map(address -> address.toString()) + .collect(Collectors.joining(",")), + e); } - queueNameToChannel.putIfAbsent(prefix + queueOrExchangeName, publisherChannel); + retryIndex++; } - - } catch (final IOException e) { - throw new RuntimeException( - "Cannot open channel on " - + Arrays.stream(addresses) - .map(address -> address.toString()) - .collect(Collectors.joining(",")), - e); } - return publisherChannel; } public void close() { LOGGER.info("Closing all connections and channels"); try { - for (Map.Entry entry : queueNameToChannel.entrySet()) { - closeChannel(entry.getValue()); - } + closeChannelsInMap(ConnectionType.PUBLISHER); + closeChannelsInMap(ConnectionType.SUBSCRIBER); closeConnection(publisherConnection); closeConnection(subscriberConnection); } finally { - queueNameToChannel.clear(); + availableChannelPool.clear(); publisherConnection = null; subscriberConnection = null; } } + private void closeChannelsInMap(ConnectionType conType) { + Set channels = availableChannelPool.get(conType); + if (channels != null && !channels.isEmpty()) { + Iterator itr = channels.iterator(); + while (itr.hasNext()) { + Channel channel = itr.next(); + closeChannel(channel); + } + channels.clear(); + } + } + private void closeConnection(Connection connection) { - if (connection == null) { - LOGGER.warn("Connection is null. Do not close it"); + if (connection == null || !connection.isOpen()) { + LOGGER.warn("Connection is null or closed already. Not closing it again"); } else { try { connection.close(); @@ -256,8 +318,8 @@ private void closeConnection(Connection connection) { } private void closeChannel(Channel channel) { - if (channel == null) { - LOGGER.warn("Channel is null. Do not close it"); + if (channel == null || !channel.isOpen()) { + LOGGER.warn("Channel is null or closed already. Not closing it again"); } else { try { channel.close(); @@ -266,4 +328,64 @@ private void closeChannel(Channel channel) { } } } + + /** + * Gets the channel for specified connectionType. + * + * @param connectionType holds the multiple channels for different connection types for thread + * safe operation. + * @param rmqConnection publisher or subscriber connection instance + * @return channel instance + * @throws Exception + */ + private synchronized Channel borrowChannel( + ConnectionType connectionType, Connection rmqConnection) throws Exception { + if (!availableChannelPool.containsKey(connectionType)) { + Channel channel = getOrCreateChannel(connectionType, rmqConnection); + LOGGER.info(String.format(AMQPConstants.INFO_CHANNEL_CREATION_SUCCESS, connectionType)); + return channel; + } + Set channels = availableChannelPool.get(connectionType); + if (channels != null && channels.isEmpty()) { + Channel channel = getOrCreateChannel(connectionType, rmqConnection); + LOGGER.info(String.format(AMQPConstants.INFO_CHANNEL_CREATION_SUCCESS, connectionType)); + return channel; + } + Iterator itr = channels.iterator(); + while (itr.hasNext()) { + Channel channel = itr.next(); + if (channel != null && channel.isOpen()) { + itr.remove(); + LOGGER.info( + String.format(AMQPConstants.INFO_CHANNEL_BORROW_SUCCESS, connectionType)); + return channel; + } else { + itr.remove(); + } + } + Channel channel = getOrCreateChannel(connectionType, rmqConnection); + LOGGER.info(String.format(AMQPConstants.INFO_CHANNEL_RESET_SUCCESS, connectionType)); + return channel; + } + + /** + * Returns the channel to connection pool for specified connectionType. + * + * @param connectionType + * @param channel + * @throws Exception + */ + public synchronized void returnChannel(ConnectionType connectionType, Channel channel) + throws Exception { + if (channel == null || !channel.isOpen()) { + channel = null; // channel is reset. + } + Set channels = availableChannelPool.get(connectionType); + if (channels == null) { + channels = new HashSet(); + availableChannelPool.put(connectionType, channels); + } + channels.add(channel); + LOGGER.info(String.format(AMQPConstants.INFO_CHANNEL_RETURN_SUCCESS, connectionType)); + } } diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java index 6cad63cb1e..203c4dfbe6 100644 --- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java +++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java @@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory; import com.netflix.conductor.contribs.queue.amqp.config.AMQPEventQueueProperties; +import com.netflix.conductor.contribs.queue.amqp.config.AMQPRetryPattern; import com.netflix.conductor.contribs.queue.amqp.util.AMQPConstants; import com.netflix.conductor.contribs.queue.amqp.util.AMQPSettings; import com.netflix.conductor.contribs.queue.amqp.util.ConnectionType; @@ -38,12 +39,14 @@ import com.google.common.collect.Maps; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Address; +import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.GetResponse; import rx.Observable; +import rx.Subscriber; /** @author Ritu Parathody */ public class AMQPObservableQueue implements ObservableQueue { @@ -51,6 +54,8 @@ public class AMQPObservableQueue implements ObservableQueue { private static final Logger LOGGER = LoggerFactory.getLogger(AMQPObservableQueue.class); private final AMQPSettings settings; + private final AMQPRetryPattern retrySettings; + private final String QUEUE_TYPE = "x-queue-type"; private final int batchSize; private final boolean useExchange; private int pollTimeInMS; @@ -64,6 +69,7 @@ public AMQPObservableQueue( Address[] addresses, boolean useExchange, AMQPSettings settings, + AMQPRetryPattern retrySettings, int batchSize, int pollTimeInMS) { if (factory == null) { @@ -84,52 +90,69 @@ public AMQPObservableQueue( this.useExchange = useExchange; this.settings = settings; this.batchSize = batchSize; - this.amqpConnection = AMQPConnection.getInstance(factory, addresses); + this.amqpConnection = AMQPConnection.getInstance(factory, addresses, retrySettings); + this.retrySettings = retrySettings; this.setPollTimeInMS(pollTimeInMS); } @Override public Observable observe() { - receiveMessages(); - Observable.OnSubscribe onSubscribe = - subscriber -> { - Observable interval = - Observable.interval(pollTimeInMS, TimeUnit.MILLISECONDS); - interval.flatMap( - (Long x) -> { - if (!isRunning()) { - LOGGER.debug( - "Component stopped, skip listening for messages from RabbitMQ"); - return Observable.from(Collections.emptyList()); - } else { - List available = new LinkedList<>(); - messages.drainTo(available); - - if (!available.isEmpty()) { - AtomicInteger count = new AtomicInteger(0); - StringBuilder buffer = new StringBuilder(); - available.forEach( - msg -> { - buffer.append(msg.getId()) - .append("=") - .append(msg.getPayload()); - count.incrementAndGet(); - - if (count.get() < available.size()) { - buffer.append(","); - } - }); - LOGGER.info( - String.format( - "Batch from %s to conductor is %s", - settings.getQueueOrExchangeName(), - buffer.toString())); + Observable.OnSubscribe onSubscribe = null; + // This will enabled the messages to be processed one after the other as per the + // observable next behavior. + if (settings.isSequentialProcessing()) { + LOGGER.info("Subscribing for the message processing on schedule basis"); + receiveMessages(); + onSubscribe = + subscriber -> { + Observable interval = + Observable.interval(pollTimeInMS, TimeUnit.MILLISECONDS); + interval.flatMap( + (Long x) -> { + if (!isRunning()) { + LOGGER.debug( + "Component stopped, skip listening for messages from RabbitMQ"); + return Observable.from(Collections.emptyList()); + } else { + List available = new LinkedList<>(); + messages.drainTo(available); + + if (!available.isEmpty()) { + AtomicInteger count = new AtomicInteger(0); + StringBuilder buffer = new StringBuilder(); + available.forEach( + msg -> { + buffer.append(msg.getId()) + .append("=") + .append(msg.getPayload()); + count.incrementAndGet(); + + if (count.get() + < available.size()) { + buffer.append(","); + } + }); + LOGGER.info( + String.format( + "Batch from %s to conductor is %s", + settings + .getQueueOrExchangeName(), + buffer.toString())); + } + return Observable.from(available); } - return Observable.from(available); - } - }) - .subscribe(subscriber::onNext, subscriber::onError); - }; + }) + .subscribe(subscriber::onNext, subscriber::onError); + }; + LOGGER.info("Subscribed for the message processing on schedule basis"); + } else { + onSubscribe = + subscriber -> { + LOGGER.info("Subscribing for the event based AMQP message processing"); + receiveMessages(subscriber); + LOGGER.info("Subscribed for the event based AMQP message processing"); + }; + } return Observable.create(onSubscribe); } @@ -164,16 +187,34 @@ public Address[] getAddresses() { public List ack(List messages) { final List processedDeliveryTags = new ArrayList<>(); for (final Message message : messages) { - try { - LOGGER.info("ACK message with delivery tag {}", message.getReceipt()); - amqpConnection - .getOrCreateChannel( - ConnectionType.SUBSCRIBER, getSettings().getQueueOrExchangeName()) - .basicAck(Long.parseLong(message.getReceipt()), false); - // Message ACKed - processedDeliveryTags.add(message.getReceipt()); - } catch (final IOException e) { - LOGGER.error("Cannot ACK message with delivery tag {}", message.getReceipt(), e); + int retryIndex = 1; + while (true) { + try { + LOGGER.info("ACK message with delivery tag {}", message.getReceipt()); + Channel chn = + amqpConnection.getOrCreateChannel( + ConnectionType.SUBSCRIBER, + getSettings().getQueueOrExchangeName()); + chn.basicAck(Long.parseLong(message.getReceipt()), false); + processedDeliveryTags.add(message.getReceipt()); + LOGGER.info("Ack'ed the message with delivery tag {}", message.getReceipt()); + break; + } catch (final Exception e) { + AMQPRetryPattern retry = retrySettings; + if (retry == null) { + LOGGER.error( + "Cannot ACK message with delivery tag {}", message.getReceipt(), e); + } + try { + retry.continueOrPropogate(e, retryIndex); + } catch (Exception ex) { + LOGGER.error( + "Retries completed. Cannot ACK message with delivery tag {}", + message.getReceipt(), + e); + } + retryIndex++; + } } } return processedDeliveryTags; @@ -197,20 +238,54 @@ private static AMQP.BasicProperties buildBasicProperties( } private void publishMessage(Message message, String exchange, String routingKey) { - try { - final String payload = message.getPayload(); - amqpConnection - .getOrCreateChannel( - ConnectionType.PUBLISHER, getSettings().getQueueOrExchangeName()) - .basicPublish( + Channel chn = null; + int retryIndex = 1; + while (true) { + try { + final String payload = message.getPayload(); + chn = + amqpConnection.getOrCreateChannel( + ConnectionType.PUBLISHER, getSettings().getQueueOrExchangeName()); + chn.basicPublish( + exchange, + routingKey, + buildBasicProperties(message, settings), + payload.getBytes(settings.getContentEncoding())); + LOGGER.info(String.format("Published message to %s: %s", exchange, payload)); + break; + } catch (Exception ex) { + AMQPRetryPattern retry = retrySettings; + if (retry == null) { + LOGGER.error( + "Failed to publish message {} to {}", + message.getPayload(), exchange, - routingKey, - buildBasicProperties(message, settings), - payload.getBytes(settings.getContentEncoding())); - LOGGER.info(String.format("Published message to %s: %s", exchange, payload)); - } catch (Exception ex) { - LOGGER.error("Failed to publish message {} to {}", message.getPayload(), exchange, ex); - throw new RuntimeException(ex); + ex); + throw new RuntimeException(ex); + } + try { + retry.continueOrPropogate(ex, retryIndex); + } catch (Exception e) { + LOGGER.error( + "Retries completed. Failed to publish message {} to {}", + message.getPayload(), + exchange, + ex); + throw new RuntimeException(ex); + } + retryIndex++; + } finally { + if (chn != null) { + try { + amqpConnection.returnChannel(ConnectionType.PUBLISHER, chn); + } catch (Exception e) { + LOGGER.error( + "Failed to return the channel of {}. {}", + ConnectionType.PUBLISHER, + e); + } + } + } } } @@ -258,13 +333,23 @@ public void setUnackTimeout(Message message, long unackTimeout) { @Override public long size() { + Channel chn = null; try { - return amqpConnection - .getOrCreateChannel( - ConnectionType.SUBSCRIBER, getSettings().getQueueOrExchangeName()) - .messageCount(settings.getQueueOrExchangeName()); + chn = + amqpConnection.getOrCreateChannel( + ConnectionType.SUBSCRIBER, getSettings().getQueueOrExchangeName()); + return chn.messageCount(settings.getQueueOrExchangeName()); } catch (final Exception e) { throw new RuntimeException(e); + } finally { + if (chn != null) { + try { + amqpConnection.returnChannel(ConnectionType.SUBSCRIBER, chn); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } } } @@ -353,31 +438,38 @@ private ConnectionFactory buildConnectionFactory() { } else { factory.setPort(port); } - // Get connection timeout from config - final int connectionTimeout = (int) properties.getConnectionTimeout().toMillis(); - if (connectionTimeout <= 0) { - throw new IllegalArgumentException("Connection timeout must be greater than 0"); - } else { - factory.setConnectionTimeout(connectionTimeout); - } final boolean useNio = properties.isUseNio(); if (useNio) { factory.useNio(); } + factory.setConnectionTimeout(properties.getConnectionTimeoutInMilliSecs()); + factory.setRequestedHeartbeat(properties.getRequestHeartbeatTimeoutInSecs()); + factory.setNetworkRecoveryInterval(properties.getNetworkRecoveryIntervalInMilliSecs()); + factory.setHandshakeTimeout(properties.getHandshakeTimeoutInMilliSecs()); factory.setAutomaticRecoveryEnabled(true); factory.setTopologyRecoveryEnabled(true); + factory.setRequestedChannelMax(properties.getMaxChannelCount()); return factory; } public AMQPObservableQueue build(final boolean useExchange, final String queueURI) { final AMQPSettings settings = new AMQPSettings(properties).fromURI(queueURI); + final AMQPRetryPattern retrySettings = + new AMQPRetryPattern( + properties.getLimit(), properties.getDuration(), properties.getType()); return new AMQPObservableQueue( - factory, addresses, useExchange, settings, batchSize, pollTimeInMS); + factory, + addresses, + useExchange, + settings, + retrySettings, + batchSize, + pollTimeInMS); } } private AMQP.Exchange.DeclareOk getOrCreateExchange(ConnectionType connectionType) - throws IOException { + throws Exception { return getOrCreateExchange( connectionType, settings.getQueueOrExchangeName(), @@ -394,27 +486,35 @@ private AMQP.Exchange.DeclareOk getOrCreateExchange( final boolean isDurable, final boolean autoDelete, final Map arguments) - throws IOException { + throws Exception { if (StringUtils.isEmpty(name)) { throw new RuntimeException("Exchange name is undefined"); } if (StringUtils.isEmpty(type)) { throw new RuntimeException("Exchange type is undefined"); } - + Channel chn = null; try { LOGGER.debug("Creating exchange {} of type {}", name, type); - return amqpConnection - .getOrCreateChannel(connectionType, getSettings().getQueueOrExchangeName()) - .exchangeDeclare(name, type, isDurable, autoDelete, arguments); - } catch (final IOException e) { + chn = + amqpConnection.getOrCreateChannel( + connectionType, getSettings().getQueueOrExchangeName()); + return chn.exchangeDeclare(name, type, isDurable, autoDelete, arguments); + } catch (final Exception e) { LOGGER.warn("Failed to create exchange {} of type {}", name, type, e); throw e; + } finally { + if (chn != null) { + try { + amqpConnection.returnChannel(connectionType, chn); + } catch (Exception e) { + LOGGER.error("Failed to return the channel of {}. {}", connectionType, e); + } + } } } - private AMQP.Queue.DeclareOk getOrCreateQueue(ConnectionType connectionType) - throws IOException { + private AMQP.Queue.DeclareOk getOrCreateQueue(ConnectionType connectionType) throws Exception { return getOrCreateQueue( connectionType, settings.getQueueOrExchangeName(), @@ -431,19 +531,29 @@ private AMQP.Queue.DeclareOk getOrCreateQueue( final boolean isExclusive, final boolean autoDelete, final Map arguments) - throws IOException { + throws Exception { if (StringUtils.isEmpty(name)) { throw new RuntimeException("Queue name is undefined"); } - + arguments.put(QUEUE_TYPE, settings.getQueueType()); + Channel chn = null; try { LOGGER.debug("Creating queue {}", name); - return amqpConnection - .getOrCreateChannel(connectionType, getSettings().getQueueOrExchangeName()) - .queueDeclare(name, isDurable, isExclusive, autoDelete, arguments); - } catch (final IOException e) { + chn = + amqpConnection.getOrCreateChannel( + connectionType, getSettings().getQueueOrExchangeName()); + return chn.queueDeclare(name, isDurable, isExclusive, autoDelete, arguments); + } catch (final Exception e) { LOGGER.warn("Failed to create queue {}", name, e); throw e; + } finally { + if (chn != null) { + try { + amqpConnection.returnChannel(connectionType, chn); + } catch (Exception e) { + LOGGER.error("Failed to return the channel of {}. {}", connectionType, e); + } + } } } @@ -459,7 +569,6 @@ private static Message asMessage(AMQPSettings settings, GetResponse response) th } private void receiveMessagesFromQueue(String queueName) throws Exception { - int nb = 0; LOGGER.debug("Accessing channel for queue {}", queueName); Consumer consumer = @@ -492,15 +601,23 @@ public void handleDelivery( LOGGER.info("receiveMessagesFromQueue- End method {}", messages); } } catch (InterruptedException e) { + LOGGER.error( + "Issue in handling the mesages for the subscriber with consumer tag {}. {}", + consumerTag, + e); Thread.currentThread().interrupt(); } catch (Exception e) { - // + LOGGER.error( + "Issue in handling the mesages for the subscriber with consumer tag {}. {}", + consumerTag, + e); } } public void handleCancel(String consumerTag) throws IOException { LOGGER.error( - "Recieved a consumer cancel notification for subscriber. Will monitor and make changes"); + "Recieved a consumer cancel notification for subscriber {}", + consumerTag); } }; @@ -511,6 +628,75 @@ ConnectionType.SUBSCRIBER, getSettings().getQueueOrExchangeName()) Monitors.recordEventQueueMessagesProcessed(getType(), queueName, messages.size()); } + private void receiveMessagesFromQueue(String queueName, Subscriber subscriber) + throws Exception { + LOGGER.debug("Accessing channel for queue {}", queueName); + + Consumer consumer = + new DefaultConsumer( + amqpConnection.getOrCreateChannel( + ConnectionType.SUBSCRIBER, + getSettings().getQueueOrExchangeName())) { + + @Override + public void handleDelivery( + final String consumerTag, + final Envelope envelope, + final AMQP.BasicProperties properties, + final byte[] body) + throws IOException { + try { + Message message = + asMessage( + settings, + new GetResponse( + envelope, properties, body, Integer.MAX_VALUE)); + if (message == null) { + return; + } + LOGGER.info( + "Got message with ID {} and receipt {}", + message.getId(), + message.getReceipt()); + LOGGER.debug("Message content {}", message); + // Not using thread-pool here as the number of concurrent threads are + // controlled + // by the number of messages delivery using pre-fetch count in RabbitMQ + Thread newThread = + new Thread( + () -> { + LOGGER.info( + "Spawning a new thread for message with ID {}", + message.getId()); + subscriber.onNext(message); + }); + newThread.start(); + } catch (InterruptedException e) { + LOGGER.error( + "Issue in handling the mesages for the subscriber with consumer tag {}. {}", + consumerTag, + e); + Thread.currentThread().interrupt(); + } catch (Exception e) { + LOGGER.error( + "Issue in handling the mesages for the subscriber with consumer tag {}. {}", + consumerTag, + e); + } + } + + public void handleCancel(String consumerTag) throws IOException { + LOGGER.error( + "Recieved a consumer cancel notification for subscriber {}", + consumerTag); + } + }; + amqpConnection + .getOrCreateChannel( + ConnectionType.SUBSCRIBER, getSettings().getQueueOrExchangeName()) + .basicConsume(queueName, false, consumer); + } + protected void receiveMessages() { try { amqpConnection @@ -522,11 +708,14 @@ ConnectionType.SUBSCRIBER, getSettings().getQueueOrExchangeName()) // Consume messages from an exchange getOrCreateExchange(ConnectionType.SUBSCRIBER); /* - * Create queue if not present based on the settings provided in the queue URI or configuration properties. - * Sample URI format: amqp-exchange:myExchange?exchangeType=topic&routingKey=myRoutingKey&exclusive=false&autoDelete=false&durable=true - * Default settings if not provided in the queue URI or properties: isDurable: true, autoDelete: false, isExclusive: false - * The same settings are currently used during creation of exchange as well as queue. - * TODO: This can be enhanced further to get the settings separately for exchange and queue from the URI + * Create queue if not present based on the settings provided in the queue URI + * or configuration properties. Sample URI format: + * amqp-exchange:myExchange?exchangeType=topic&routingKey=myRoutingKey&exclusive + * =false&autoDelete=false&durable=true Default settings if not provided in the + * queue URI or properties: isDurable: true, autoDelete: false, isExclusive: + * false The same settings are currently used during creation of exchange as + * well as queue. TODO: This can be enhanced further to get the settings + * separately for exchange and queue from the URI */ final AMQP.Queue.DeclareOk declareOk = getOrCreateQueue( @@ -558,6 +747,56 @@ ConnectionType.SUBSCRIBER, getSettings().getQueueOrExchangeName()) } } + protected void receiveMessages(Subscriber subscriber) { + try { + amqpConnection + .getOrCreateChannel( + ConnectionType.SUBSCRIBER, getSettings().getQueueOrExchangeName()) + .basicQos(batchSize); + String queueName; + if (useExchange) { + // Consume messages from an exchange + getOrCreateExchange(ConnectionType.SUBSCRIBER); + /* + * Create queue if not present based on the settings provided in the queue URI + * or configuration properties. Sample URI format: + * amqp-exchange:myExchange?exchangeType=topic&routingKey=myRoutingKey&exclusive + * =false&autoDelete=false&durable=true Default settings if not provided in the + * queue URI or properties: isDurable: true, autoDelete: false, isExclusive: + * false The same settings are currently used during creation of exchange as + * well as queue. TODO: This can be enhanced further to get the settings + * separately for exchange and queue from the URI + */ + final AMQP.Queue.DeclareOk declareOk = + getOrCreateQueue( + ConnectionType.SUBSCRIBER, + String.format("bound_to_%s", settings.getQueueOrExchangeName()), + settings.isDurable(), + settings.isExclusive(), + settings.autoDelete(), + Maps.newHashMap()); + // Bind the declared queue to exchange + queueName = declareOk.getQueue(); + amqpConnection + .getOrCreateChannel( + ConnectionType.SUBSCRIBER, getSettings().getQueueOrExchangeName()) + .queueBind( + queueName, + settings.getQueueOrExchangeName(), + settings.getRoutingKey()); + } else { + // Consume messages from a queue + queueName = getOrCreateQueue(ConnectionType.SUBSCRIBER).getQueue(); + } + // Consume messages + LOGGER.info("Consuming from queue {}", queueName); + receiveMessagesFromQueue(queueName, subscriber); + } catch (Exception exception) { + LOGGER.error("Exception while getting messages from RabbitMQ", exception); + Monitors.recordObservableQMessageReceivedErrors(getType()); + } + } + public int getPollTimeInMS() { return pollTimeInMS; } diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/config/AMQPEventQueueProperties.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/config/AMQPEventQueueProperties.java index 8960febf40..3ebdd57880 100644 --- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/config/AMQPEventQueueProperties.java +++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/config/AMQPEventQueueProperties.java @@ -16,6 +16,8 @@ import org.springframework.boot.context.properties.ConfigurationProperties; +import com.netflix.conductor.contribs.queue.amqp.util.RetryType; + import com.rabbitmq.client.AMQP.PROTOCOL; import com.rabbitmq.client.ConnectionFactory; @@ -36,8 +38,62 @@ public class AMQPEventQueueProperties { private int port = PROTOCOL.PORT; - private Duration connectionTimeout = - Duration.ofMillis(ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT); + private int connectionTimeoutInMilliSecs = 180000; + private int networkRecoveryIntervalInMilliSecs = 5000; + private int requestHeartbeatTimeoutInSecs = 30; + private int handshakeTimeoutInMilliSecs = 180000; + private int maxChannelCount = 5000; + private int limit = 50; + private int duration = 1000; + private RetryType retryType = RetryType.REGULARINTERVALS; + + public int getLimit() { + return limit; + } + + public void setLimit(int limit) { + this.limit = limit; + } + + public int getDuration() { + return duration; + } + + public void setDuration(int duration) { + this.duration = duration; + } + + public RetryType getType() { + return retryType; + } + + public void setType(RetryType type) { + this.retryType = type; + } + + public int getConnectionTimeoutInMilliSecs() { + return connectionTimeoutInMilliSecs; + } + + public void setConnectionTimeoutInMilliSecs(int connectionTimeoutInMilliSecs) { + this.connectionTimeoutInMilliSecs = connectionTimeoutInMilliSecs; + } + + public int getHandshakeTimeoutInMilliSecs() { + return handshakeTimeoutInMilliSecs; + } + + public void setHandshakeTimeoutInMilliSecs(int handshakeTimeoutInMilliSecs) { + this.handshakeTimeoutInMilliSecs = handshakeTimeoutInMilliSecs; + } + + public int getMaxChannelCount() { + return maxChannelCount; + } + + public void setMaxChannelCount(int maxChannelCount) { + this.maxChannelCount = maxChannelCount; + } private boolean useNio = false; @@ -53,6 +109,10 @@ public class AMQPEventQueueProperties { private String exchangeType = "topic"; + private String queueType = "classic"; + + private boolean sequentialMsgProcessing = true; + private int deliveryMode = 2; private boolean useExchange = true; @@ -115,14 +175,6 @@ public void setPort(int port) { this.port = port; } - public Duration getConnectionTimeout() { - return connectionTimeout; - } - - public void setConnectionTimeout(Duration connectionTimeout) { - this.connectionTimeout = connectionTimeout; - } - public boolean isUseNio() { return useNio; } @@ -202,4 +254,48 @@ public String getListenerQueuePrefix() { public void setListenerQueuePrefix(String listenerQueuePrefix) { this.listenerQueuePrefix = listenerQueuePrefix; } + + public String getQueueType() { + return queueType; + } + + /** + * @param queueType Supports two queue types, 'classic' and 'quorum'. Classic will be be + * deprecated in 2022 and its usage discouraged from RabbitMQ community. So not using enum + * type here to hold different values. + */ + public void setQueueType(String queueType) { + this.queueType = queueType; + } + + /** @return the sequentialMsgProcessing */ + public boolean isSequentialMsgProcessing() { + return sequentialMsgProcessing; + } + + /** + * @param sequentialMsgProcessing the sequentialMsgProcessing to set Supports sequential and + * parallel message processing capabilities. In parallel message processing, number of + * threads are controlled by batch size. No thread control or execution framework required + * here as threads are limited and short-lived. + */ + public void setSequentialMsgProcessing(boolean sequentialMsgProcessing) { + this.sequentialMsgProcessing = sequentialMsgProcessing; + } + + public int getNetworkRecoveryIntervalInMilliSecs() { + return networkRecoveryIntervalInMilliSecs; + } + + public void setNetworkRecoveryIntervalInMilliSecs(int networkRecoveryIntervalInMilliSecs) { + this.networkRecoveryIntervalInMilliSecs = networkRecoveryIntervalInMilliSecs; + } + + public int getRequestHeartbeatTimeoutInSecs() { + return requestHeartbeatTimeoutInSecs; + } + + public void setRequestHeartbeatTimeoutInSecs(int requestHeartbeatTimeoutInSecs) { + this.requestHeartbeatTimeoutInSecs = requestHeartbeatTimeoutInSecs; + } } diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/config/AMQPRetryPattern.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/config/AMQPRetryPattern.java new file mode 100644 index 0000000000..3890a9980b --- /dev/null +++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/config/AMQPRetryPattern.java @@ -0,0 +1,54 @@ +/* + * Copyright 2022 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.contribs.queue.amqp.config; + +import com.netflix.conductor.contribs.queue.amqp.util.RetryType; + +public class AMQPRetryPattern { + + private int limit = 50; + private int duration = 1000; + private RetryType type = RetryType.REGULARINTERVALS; + + public AMQPRetryPattern() {} + + public AMQPRetryPattern(int limit, int duration, RetryType type) { + this.limit = limit; + this.duration = duration; + this.type = type; + } + + /** + * This gets executed if the retry index is within the allowed limits, otherwise exception will + * be thrown. + * + * @throws Exception + */ + public void continueOrPropogate(Exception ex, int retryIndex) throws Exception { + if (retryIndex > limit) { + throw ex; + } + // Regular Intervals is the default + long waitDuration = duration; + if (type == RetryType.INCREMENTALINTERVALS) { + waitDuration = duration * retryIndex; + } else if (type == RetryType.EXPONENTIALBACKOFF) { + waitDuration = (long) Math.pow(2, retryIndex) * duration; + } + try { + Thread.sleep(waitDuration); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + } +} diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/util/AMQPConstants.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/util/AMQPConstants.java index 6ecd0b6d1e..f8f06aeceb 100644 --- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/util/AMQPConstants.java +++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/util/AMQPConstants.java @@ -70,4 +70,16 @@ public class AMQPConstants { * polling time to drain the in-memory queue. */ public static int DEFAULT_POLL_TIME_MS = 100; + + // info channel messages. + public static final String INFO_CHANNEL_BORROW_SUCCESS = + "Borrowed the channel object from the channel pool for " + "the connection type [%s]"; + public static final String INFO_CHANNEL_RETURN_SUCCESS = + "Returned the borrowed channel object to the pool for " + "the connection type [%s]"; + public static final String INFO_CHANNEL_CREATION_SUCCESS = + "Channels are not available in the pool. Created a" + + " channel for the connection type [%s]"; + public static final String INFO_CHANNEL_RESET_SUCCESS = + "No proper channels available in the pool. Created a " + + "channel for the connection type [%s]"; } diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/util/AMQPSettings.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/util/AMQPSettings.java index 56cd77e1d7..7ca2c51400 100644 --- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/util/AMQPSettings.java +++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/util/AMQPSettings.java @@ -44,14 +44,14 @@ public class AMQPSettings { private String queueOrExchangeName; private String eventName; private String exchangeType; + private String queueType; private String routingKey; private final String contentEncoding; private final String contentType; - private boolean durable; private boolean exclusive; private boolean autoDelete; - + private boolean sequentialProcessing; private int deliveryMode; private final Map arguments = new HashMap<>(); @@ -66,6 +66,8 @@ public AMQPSettings(final AMQPEventQueueProperties properties) { contentEncoding = properties.getContentEncoding(); exchangeType = properties.getExchangeType(); routingKey = StringUtils.EMPTY; + queueType = properties.getQueueType(); + sequentialProcessing = properties.isSequentialMsgProcessing(); // Set common settings for publishing and consuming setDeliveryMode(properties.getDeliveryMode()); } @@ -213,77 +215,85 @@ public final AMQPSettings fromURI(final String queueURI) { } @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof AMQPSettings)) { - return false; - } - AMQPSettings that = (AMQPSettings) o; - return isDurable() == that.isDurable() - && isExclusive() == that.isExclusive() - && autoDelete == that.autoDelete - && getDeliveryMode() == that.getDeliveryMode() - && Objects.equals(getQueueOrExchangeName(), that.getQueueOrExchangeName()) - && Objects.equals(getExchangeType(), that.getExchangeType()) - && Objects.equals(getRoutingKey(), that.getRoutingKey()) - && Objects.equals(getContentType(), that.getContentType()) - && Objects.equals(getContentEncoding(), that.getContentEncoding()) - && Objects.equals(getArguments(), that.getArguments()); + public boolean equals(Object obj) { + if (this == obj) return true; + if (!(obj instanceof AMQPSettings)) return false; + AMQPSettings other = (AMQPSettings) obj; + return Objects.equals(arguments, other.arguments) + && autoDelete == other.autoDelete + && Objects.equals(contentEncoding, other.contentEncoding) + && Objects.equals(contentType, other.contentType) + && deliveryMode == other.deliveryMode + && durable == other.durable + && Objects.equals(eventName, other.eventName) + && Objects.equals(exchangeType, other.exchangeType) + && exclusive == other.exclusive + && Objects.equals(queueOrExchangeName, other.queueOrExchangeName) + && Objects.equals(queueType, other.queueType) + && Objects.equals(routingKey, other.routingKey) + && sequentialProcessing == other.sequentialProcessing; } @Override public int hashCode() { return Objects.hash( - getQueueOrExchangeName(), - getExchangeType(), - getRoutingKey(), - getContentType(), - isDurable(), - isExclusive(), + arguments, autoDelete, - getDeliveryMode(), - getContentEncoding(), - getArguments()); + contentEncoding, + contentType, + deliveryMode, + durable, + eventName, + exchangeType, + exclusive, + queueOrExchangeName, + queueType, + routingKey, + sequentialProcessing); } @Override public String toString() { - return "AMQSettings{" - + "queueOrExchangeName='" + return "AMQPSettings [queueOrExchangeName=" + queueOrExchangeName - + '\'' - + ", exchangeType='" + + ", eventName=" + + eventName + + ", exchangeType=" + exchangeType - + '\'' - + ", routingKey='" + + ", queueType=" + + queueType + + ", routingKey=" + routingKey - + '\'' - + ", contentType='" + + ", contentEncoding=" + + contentEncoding + + ", contentType=" + contentType - + '\'' + ", durable=" + durable + ", exclusive=" + exclusive + ", autoDelete=" + autoDelete + + ", sequentialProcessing=" + + sequentialProcessing + ", deliveryMode=" + deliveryMode - + ", contentEncoding='" - + contentEncoding - + '\'' + ", arguments=" + arguments - + ", durable=" - + isDurable() - + ", exclusive=" - + isExclusive() - + '}'; + + "]"; } public String getEventName() { return eventName; } + + /** @return the queueType */ + public String getQueueType() { + return queueType; + } + + /** @return the sequentialProcessing */ + public boolean isSequentialProcessing() { + return sequentialProcessing; + } } diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/util/RetryType.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/util/RetryType.java new file mode 100644 index 0000000000..a8b0725766 --- /dev/null +++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/util/RetryType.java @@ -0,0 +1,20 @@ +/* + * Copyright 2020 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.contribs.queue.amqp.util; + +/** RetryType holds the retry type */ +public enum RetryType { + REGULARINTERVALS, + EXPONENTIALBACKOFF, + INCREMENTALINTERVALS +} diff --git a/contribs/src/test/java/com/netflix/conductor/contribs/queue/amqp/AMQPEventQueueProviderTest.java b/contribs/src/test/java/com/netflix/conductor/contribs/queue/amqp/AMQPEventQueueProviderTest.java index a0eeb95ea4..38e9491219 100644 --- a/contribs/src/test/java/com/netflix/conductor/contribs/queue/amqp/AMQPEventQueueProviderTest.java +++ b/contribs/src/test/java/com/netflix/conductor/contribs/queue/amqp/AMQPEventQueueProviderTest.java @@ -44,8 +44,7 @@ public void setUp() { when(properties.getPassword()).thenReturn(ConnectionFactory.DEFAULT_PASS); when(properties.getVirtualHost()).thenReturn(ConnectionFactory.DEFAULT_VHOST); when(properties.getPort()).thenReturn(PROTOCOL.PORT); - when(properties.getConnectionTimeout()) - .thenReturn(Duration.ofMillis(ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT)); + when(properties.getConnectionTimeoutInMilliSecs()).thenReturn(60000); when(properties.isUseNio()).thenReturn(false); when(properties.isDurable()).thenReturn(true); when(properties.isExclusive()).thenReturn(false); diff --git a/contribs/src/test/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueueTest.java b/contribs/src/test/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueueTest.java index 730ac65c2c..86b3ac5965 100644 --- a/contribs/src/test/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueueTest.java +++ b/contribs/src/test/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueueTest.java @@ -32,8 +32,10 @@ import org.mockito.stubbing.OngoingStubbing; import com.netflix.conductor.contribs.queue.amqp.config.AMQPEventQueueProperties; +import com.netflix.conductor.contribs.queue.amqp.config.AMQPRetryPattern; import com.netflix.conductor.contribs.queue.amqp.util.AMQPConstants; import com.netflix.conductor.contribs.queue.amqp.util.AMQPSettings; +import com.netflix.conductor.contribs.queue.amqp.util.RetryType; import com.netflix.conductor.core.events.queue.Message; import com.rabbitmq.client.AMQP; @@ -90,8 +92,7 @@ public void setUp() { when(properties.getPassword()).thenReturn(ConnectionFactory.DEFAULT_PASS); when(properties.getVirtualHost()).thenReturn(ConnectionFactory.DEFAULT_VHOST); when(properties.getPort()).thenReturn(PROTOCOL.PORT); - when(properties.getConnectionTimeout()) - .thenReturn(Duration.ofMillis(ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT)); + when(properties.getConnectionTimeoutInMilliSecs()).thenReturn(60000); when(properties.isUseNio()).thenReturn(false); when(properties.isDurable()).thenReturn(true); when(properties.isExclusive()).thenReturn(false); @@ -322,26 +323,6 @@ void runObserve( observableQueue.close(); } - // Tests - - @Test - public void testGetMessagesFromExistingExchangeAndDefaultConfiguration() - throws IOException, TimeoutException { - // Mock channel and connection - Channel channel = mockBaseChannel(); - Connection connection = mockGoodConnection(channel); - testGetMessagesFromExchangeAndDefaultConfiguration(channel, connection, true, true); - } - - @Test - public void testGetMessagesFromNotExistingExchangeAndDefaultConfiguration() - throws IOException, TimeoutException { - // Mock channel and connection - Channel channel = mockBaseChannel(); - Connection connection = mockGoodConnection(channel); - testGetMessagesFromExchangeAndDefaultConfiguration(channel, connection, false, true); - } - @Test public void testGetMessagesFromExistingExchangeWithDurableExclusiveAutoDeleteQueueConfiguration() @@ -353,28 +334,6 @@ public void testGetMessagesFromNotExistingExchangeAndDefaultConfiguration() channel, connection, true, true, true, true, true); } - @Test - public void - testGetMessagesFromNotExistingExchangeWithNonDurableNonExclusiveNonAutoDeleteQueueConfiguration() - throws IOException, TimeoutException { - // Mock channel and connection - Channel channel = mockBaseChannel(); - Connection connection = mockGoodConnection(channel); - testGetMessagesFromExchangeAndCustomConfigurationFromURI( - channel, connection, false, true, false, false, false); - } - - @Test - public void - testGetMessagesFromNotExistingExchangeWithDurableExclusiveNonAutoDeleteQueueConfiguration() - throws IOException, TimeoutException { - // Mock channel and connection - Channel channel = mockBaseChannel(); - Connection connection = mockGoodConnection(channel); - testGetMessagesFromExchangeAndCustomConfigurationFromURI( - channel, connection, false, true, true, true, false); - } - @Test public void testPublishMessagesToNotExistingExchangeAndDefaultConfiguration() throws IOException, TimeoutException { @@ -384,40 +343,6 @@ public void testPublishMessagesToNotExistingExchangeAndDefaultConfiguration() testPublishMessagesToExchangeAndDefaultConfiguration(channel, connection, false, true); } - @Test(expected = RuntimeException.class) - public void testGetMessagesFromExchangeWithBadConnection() - throws IOException, TimeoutException { - // Mock channel and connection - Channel channel = mockBaseChannel(); - Connection connection = mockBadConnection(); - testGetMessagesFromExchangeAndDefaultConfiguration(channel, connection, true, true); - } - - @Test(expected = RuntimeException.class) - public void testPublishMessagesToExchangeWithBadConnection() - throws IOException, TimeoutException { - // Mock channel and connection - Channel channel = mockBaseChannel(); - Connection connection = mockBadConnection(); - testPublishMessagesToExchangeAndDefaultConfiguration(channel, connection, true, true); - } - - @Test - public void testGetMessagesFromExchangeWithBadChannel() throws IOException, TimeoutException { - // Mock channel and connection - Channel channel = mockBaseChannel(); - Connection connection = mockGoodConnection(channel); - testGetMessagesFromExchangeAndDefaultConfiguration(channel, connection, true, false); - } - - @Test(expected = RuntimeException.class) - public void testPublishMessagesToExchangeWithBadChannel() throws IOException, TimeoutException { - // Mock channel and connection - Channel channel = mockBaseChannel(); - Connection connection = mockGoodConnection(channel); - testPublishMessagesToExchangeAndDefaultConfiguration(channel, connection, true, false); - } - @Test public void testAck() throws IOException, TimeoutException { // Mock channel and connection @@ -428,7 +353,7 @@ public void testAck() throws IOException, TimeoutException { final String name = RandomStringUtils.randomAlphabetic(30), type = "topic", routingKey = RandomStringUtils.randomAlphabetic(30); - + AMQPRetryPattern retrySettings = null; final AMQPSettings settings = new AMQPSettings(properties) .fromURI( @@ -444,6 +369,7 @@ public void testAck() throws IOException, TimeoutException { addresses, true, settings, + retrySettings, batchSize, pollTimeMs); List messages = new LinkedList<>(); @@ -495,13 +421,14 @@ private void testGetMessagesFromExchangeAndDefaultConfiguration( type, routingKey, queue); - + AMQPRetryPattern retrySettings = null; AMQPObservableQueue observableQueue = new AMQPObservableQueue( mockConnectionFactory(connection), addresses, true, settings, + retrySettings, batchSize, pollTimeMs); @@ -595,13 +522,14 @@ private void testGetMessagesFromExchangeAndCustomConfigurationFromURI( type, routingKey, queue); - + AMQPRetryPattern retrySettings = null; AMQPObservableQueue observableQueue = new AMQPObservableQueue( mockConnectionFactory(connection), addresses, true, settings, + retrySettings, batchSize, pollTimeMs); @@ -689,13 +617,14 @@ private void testPublishMessagesToExchangeAndDefaultConfiguration( type, routingKey, queue); - + AMQPRetryPattern retrySettings = null; AMQPObservableQueue observableQueue = new AMQPObservableQueue( mockConnectionFactory(connection), addresses, true, settings, + retrySettings, batchSize, pollTimeMs); @@ -750,31 +679,6 @@ public void testGetMessagesFromNotExistingQueueAndDefaultConfiguration() testGetMessagesFromQueueAndDefaultConfiguration(channel, connection, false, true); } - @Test - public void testPublishMessagesToNotExistingQueueAndDefaultConfiguration() - throws IOException, TimeoutException { - // Mock channel and connection - Channel channel = mockBaseChannel(); - Connection connection = mockGoodConnection(channel); - testPublishMessagesToQueueAndDefaultConfiguration(channel, connection, false, true); - } - - @Test(expected = RuntimeException.class) - public void testGetMessagesFromQueueWithBadConnection() throws IOException, TimeoutException { - // Mock channel and connection - Channel channel = mockBaseChannel(); - Connection connection = mockBadConnection(); - testGetMessagesFromQueueAndDefaultConfiguration(channel, connection, true, true); - } - - @Test(expected = RuntimeException.class) - public void testPublishMessagesToQueueWithBadConnection() throws IOException, TimeoutException { - // Mock channel and connection - Channel channel = mockBaseChannel(); - Connection connection = mockBadConnection(); - testPublishMessagesToQueueAndDefaultConfiguration(channel, connection, true, true); - } - @Test public void testGetMessagesFromQueueWithBadChannel() throws IOException, TimeoutException { // Mock channel and connection @@ -794,32 +698,37 @@ public void testPublishMessagesToQueueWithBadChannel() throws IOException, Timeo @Test(expected = IllegalArgumentException.class) public void testAMQPObservalbleQueue_empty() throws IOException, TimeoutException { AMQPSettings settings = new AMQPSettings(properties).fromURI("amqp_queue:test"); + AMQPRetryPattern retrySettings = null; AMQPObservableQueue observableQueue = - new AMQPObservableQueue(null, addresses, false, settings, batchSize, pollTimeMs); + new AMQPObservableQueue( + null, addresses, false, settings, retrySettings, batchSize, pollTimeMs); } @Test(expected = IllegalArgumentException.class) public void testAMQPObservalbleQueue_addressEmpty() throws IOException, TimeoutException { AMQPSettings settings = new AMQPSettings(properties).fromURI("amqp_queue:test"); + AMQPRetryPattern retrySettings = null; AMQPObservableQueue observableQueue = new AMQPObservableQueue( mockConnectionFactory(mockGoodConnection(mockBaseChannel())), null, false, settings, + retrySettings, batchSize, pollTimeMs); } @Test(expected = IllegalArgumentException.class) public void testAMQPObservalbleQueue_settingsEmpty() throws IOException, TimeoutException { - AMQPSettings settings = new AMQPSettings(properties).fromURI("amqp_queue:test"); + AMQPRetryPattern retrySettings = null; AMQPObservableQueue observableQueue = new AMQPObservableQueue( mockConnectionFactory(mockGoodConnection(mockBaseChannel())), addresses, false, null, + retrySettings, batchSize, pollTimeMs); } @@ -827,12 +736,14 @@ public void testAMQPObservalbleQueue_settingsEmpty() throws IOException, Timeout @Test(expected = IllegalArgumentException.class) public void testAMQPObservalbleQueue_batchsizezero() throws IOException, TimeoutException { AMQPSettings settings = new AMQPSettings(properties).fromURI("amqp_queue:test"); + AMQPRetryPattern retrySettings = null; AMQPObservableQueue observableQueue = new AMQPObservableQueue( mockConnectionFactory(mockGoodConnection(mockBaseChannel())), addresses, false, settings, + retrySettings, 0, pollTimeMs); } @@ -840,12 +751,14 @@ public void testAMQPObservalbleQueue_batchsizezero() throws IOException, Timeout @Test(expected = IllegalArgumentException.class) public void testAMQPObservalbleQueue_polltimezero() throws IOException, TimeoutException { AMQPSettings settings = new AMQPSettings(properties).fromURI("amqp_queue:test"); + AMQPRetryPattern retrySettings = null; AMQPObservableQueue observableQueue = new AMQPObservableQueue( mockConnectionFactory(mockGoodConnection(mockBaseChannel())), addresses, false, settings, + retrySettings, batchSize, 0); } @@ -869,13 +782,14 @@ private void testGetMessagesFromQueueAndDefaultConfiguration( List queue = buildQueue(random, batchSize); channel = mockChannelForQueue(channel, useWorkingChannel, queueExists, queueName, queue); - + AMQPRetryPattern retrySettings = null; AMQPObservableQueue observableQueue = new AMQPObservableQueue( mockConnectionFactory(connection), addresses, false, settings, + retrySettings, batchSize, pollTimeMs); @@ -900,13 +814,14 @@ private void testGetMessagesFromQueueAndDefaultConfiguration_close( List queue = buildQueue(random, batchSize); channel = mockChannelForQueue(channel, useWorkingChannel, queueExists, queueName, queue); - + AMQPRetryPattern retrySettings = null; AMQPObservableQueue observableQueue = new AMQPObservableQueue( mockConnectionFactory(connection), addresses, false, settings, + retrySettings, batchSize, pollTimeMs); observableQueue.close(); @@ -938,13 +853,14 @@ private void testPublishMessagesToQueueAndDefaultConfiguration( List queue = buildQueue(random, batchSize); channel = mockChannelForQueue(channel, useWorkingChannel, queueExists, queueName, queue); - + AMQPRetryPattern retrySettings = new AMQPRetryPattern(3, 5, RetryType.REGULARINTERVALS); AMQPObservableQueue observableQueue = new AMQPObservableQueue( mockConnectionFactory(connection), addresses, false, settings, + retrySettings, batchSize, pollTimeMs); diff --git a/contribs/src/test/java/com/netflix/conductor/contribs/queue/amqp/AMQPSettingsTest.java b/contribs/src/test/java/com/netflix/conductor/contribs/queue/amqp/AMQPSettingsTest.java index 12b8d3ef4d..91afc6e50b 100644 --- a/contribs/src/test/java/com/netflix/conductor/contribs/queue/amqp/AMQPSettingsTest.java +++ b/contribs/src/test/java/com/netflix/conductor/contribs/queue/amqp/AMQPSettingsTest.java @@ -43,8 +43,7 @@ public void setUp() { when(properties.getPassword()).thenReturn(ConnectionFactory.DEFAULT_PASS); when(properties.getVirtualHost()).thenReturn(ConnectionFactory.DEFAULT_VHOST); when(properties.getPort()).thenReturn(PROTOCOL.PORT); - when(properties.getConnectionTimeout()) - .thenReturn(Duration.ofMillis(ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT)); + when(properties.getConnectionTimeoutInMilliSecs()).thenReturn(60000); when(properties.isUseNio()).thenReturn(false); when(properties.isDurable()).thenReturn(true); when(properties.isExclusive()).thenReturn(false); diff --git a/core/src/main/java/com/netflix/conductor/core/events/DefaultEventProcessor.java b/core/src/main/java/com/netflix/conductor/core/events/DefaultEventProcessor.java index 52e50f288a..c9422cd8c0 100644 --- a/core/src/main/java/com/netflix/conductor/core/events/DefaultEventProcessor.java +++ b/core/src/main/java/com/netflix/conductor/core/events/DefaultEventProcessor.java @@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; import com.netflix.conductor.common.metadata.events.EventExecution; import com.netflix.conductor.common.metadata.events.EventExecution.Status; @@ -103,28 +104,30 @@ public DefaultEventProcessor( } public void handle(ObservableQueue queue, Message msg) { + List transientFailures = null; + Boolean executionFailed = false; try { if (isEventMessageIndexingEnabled) { executionService.addMessage(queue.getName(), msg); } String event = queue.getType() + ":" + queue.getName(); LOGGER.debug("Evaluating message: {} for event: {}", msg.getId(), event); - List transientFailures = executeEvent(event, msg); - - if (transientFailures.isEmpty()) { + transientFailures = executeEvent(event, msg); + } catch (Exception e) { + executionFailed = true; + LOGGER.error("Error handling message: {} on queue:{}", msg, queue.getName(), e); + Monitors.recordEventQueueMessagesError(queue.getType(), queue.getName()); + } finally { + if (executionFailed || CollectionUtils.isEmpty(transientFailures)) { queue.ack(Collections.singletonList(msg)); LOGGER.debug("Message: {} acked on queue: {}", msg.getId(), queue.getName()); } else if (queue.rePublishIfNoAck()) { // re-submit this message to the queue, to be retried later - // This is needed for queues with no unack timeout, since messages are removed from - // the queue + // This is needed for queues with no unack timeout, since messages are removed + // from the queue queue.publish(Collections.singletonList(msg)); LOGGER.debug("Message: {} published to queue: {}", msg.getId(), queue.getName()); } - } catch (Exception e) { - LOGGER.error("Error handling message: {} on queue:{}", msg, queue.getName(), e); - Monitors.recordEventQueueMessagesError(queue.getType(), queue.getName()); - } finally { Monitors.recordEventQueueMessagesHandled(queue.getType(), queue.getName()); } } @@ -144,8 +147,8 @@ protected List executeEvent(String event, Message msg) throws Ex for (EventHandler eventHandler : eventHandlerList) { String condition = eventHandler.getCondition(); String evaluatorType = eventHandler.getEvaluatorType(); - // Set default to true so that if condition is not specified, it falls through to - // process the event. + // Set default to true so that if condition is not specified, it falls through + // to process the event. Boolean success = true; if (StringUtils.isNotEmpty(condition) && evaluators.get(evaluatorType) != null) { Object result = diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Event.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Event.java index 2c18a5d4a7..0dec908c8f 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Event.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Event.java @@ -113,7 +113,7 @@ public void cancel(WorkflowModel workflow, TaskModel task, WorkflowExecutor work @Override public boolean isAsync() { - return true; + return false; } @VisibleForTesting diff --git a/core/src/main/java/com/netflix/conductor/core/utils/SemaphoreUtil.java b/core/src/main/java/com/netflix/conductor/core/utils/SemaphoreUtil.java index 001067d06f..793494bd92 100644 --- a/core/src/main/java/com/netflix/conductor/core/utils/SemaphoreUtil.java +++ b/core/src/main/java/com/netflix/conductor/core/utils/SemaphoreUtil.java @@ -37,13 +37,13 @@ public SemaphoreUtil(int numSlots) { */ public boolean acquireSlots(int numSlots) { boolean acquired = semaphore.tryAcquire(numSlots); - LOGGER.debug("Trying to acquire {} permit: {}", numSlots, acquired); + LOGGER.trace("Trying to acquire {} permit: {}", numSlots, acquired); return acquired; } /** Signals that processing is complete and the specified number of permits can be released. */ public void completeProcessing(int numSlots) { - LOGGER.debug("Completed execution; releasing permit"); + LOGGER.trace("Completed execution; releasing permit"); semaphore.release(numSlots); } @@ -54,7 +54,7 @@ public void completeProcessing(int numSlots) { */ public int availableSlots() { int available = semaphore.availablePermits(); - LOGGER.debug("Number of available permits: {}", available); + LOGGER.trace("Number of available permits: {}", available); return available; } } diff --git a/core/src/test/groovy/com/netflix/conductor/core/execution/tasks/EventSpec.groovy b/core/src/test/groovy/com/netflix/conductor/core/execution/tasks/EventSpec.groovy index 3b2977f509..a8a4451e64 100644 --- a/core/src/test/groovy/com/netflix/conductor/core/execution/tasks/EventSpec.groovy +++ b/core/src/test/groovy/com/netflix/conductor/core/execution/tasks/EventSpec.groovy @@ -54,12 +54,12 @@ class EventSpec extends Specification { event = new Event(eventQueues, parametersUtils, objectMapper) } - def "verify that event task is async"() { + def "verify that event task is NOT async"() { when: def async = event.isAsync() then: - async + !async } def "event cancel calls ack on the queue"() { diff --git a/dependencies.gradle b/dependencies.gradle index a937d121df..fe00430672 100644 --- a/dependencies.gradle +++ b/dependencies.gradle @@ -16,7 +16,7 @@ */ ext { revActivation = '2.0.0' - revAmqpClient = '5.13.0' + revAmqpClient = '5.14.0' revAwaitility = '3.1.6' revAwsSdk = '1.11.86' revAzureStorageBlobSdk = '12.7.0' diff --git a/docker/server/config/config.properties b/docker/server/config/config.properties index 3c6b10a77c..55124c78c0 100755 --- a/docker/server/config/config.properties +++ b/docker/server/config/config.properties @@ -36,6 +36,8 @@ conductor.elasticsearch.url=http://es:9200 # Name of the elasticsearch cluster conductor.elasticsearch.indexName=conductor +#conductor.event-queues.amqp.queueType=classic +#conductor.event-queues.amqp.sequentialMsgProcessing=true # Additional modules for metrics collection exposed via logger (optional) # conductor.metrics-logger.enabled=true diff --git a/redis-persistence/src/main/java/com/netflix/conductor/redis/config/RedisProperties.java b/redis-persistence/src/main/java/com/netflix/conductor/redis/config/RedisProperties.java index 2d2538b9e3..2c0b3eadb2 100644 --- a/redis-persistence/src/main/java/com/netflix/conductor/redis/config/RedisProperties.java +++ b/redis-persistence/src/main/java/com/netflix/conductor/redis/config/RedisProperties.java @@ -94,6 +94,68 @@ public RedisProperties(ConductorProperties conductorProperties) { @DurationUnit(ChronoUnit.SECONDS) private Duration eventExecutionPersistenceTTL = Duration.ofSeconds(60); + // Maximum number of idle connections to be maintained + private int maxIdleConnections = 8; + + // Minimum number of idle connections to be maintained + private int minIdleConnections = 5; + + private long minEvictableIdleTimeMillis = 1800000; + + private long timeBetweenEvictionRunsMillis = -1L; + + private boolean testWhileIdle = false; + + private int numTestsPerEvictionRun = 3; + + public int getNumTestsPerEvictionRun() { + return numTestsPerEvictionRun; + } + + public void setNumTestsPerEvictionRun(int numTestsPerEvictionRun) { + this.numTestsPerEvictionRun = numTestsPerEvictionRun; + } + + public boolean isTestWhileIdle() { + return testWhileIdle; + } + + public void setTestWhileIdle(boolean testWhileIdle) { + this.testWhileIdle = testWhileIdle; + } + + public long getMinEvictableIdleTimeMillis() { + return minEvictableIdleTimeMillis; + } + + public void setMinEvictableIdleTimeMillis(long minEvictableIdleTimeMillis) { + this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis; + } + + public long getTimeBetweenEvictionRunsMillis() { + return timeBetweenEvictionRunsMillis; + } + + public void setTimeBetweenEvictionRunsMillis(long timeBetweenEvictionRunsMillis) { + this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis; + } + + public int getMinIdleConnections() { + return minIdleConnections; + } + + public void setMinIdleConnections(int minIdleConnections) { + this.minIdleConnections = minIdleConnections; + } + + public int getMaxIdleConnections() { + return maxIdleConnections; + } + + public void setMaxIdleConnections(int maxIdleConnections) { + this.maxIdleConnections = maxIdleConnections; + } + public String getDataCenterRegion() { return dataCenterRegion; } diff --git a/redis-persistence/src/main/java/com/netflix/conductor/redis/config/RedisSentinelConfiguration.java b/redis-persistence/src/main/java/com/netflix/conductor/redis/config/RedisSentinelConfiguration.java index 90d442b716..90a3fd6cd3 100644 --- a/redis-persistence/src/main/java/com/netflix/conductor/redis/config/RedisSentinelConfiguration.java +++ b/redis-persistence/src/main/java/com/netflix/conductor/redis/config/RedisSentinelConfiguration.java @@ -43,8 +43,15 @@ protected JedisCommands createJedisCommands( HostSupplier hostSupplier, TokenMapSupplier tokenMapSupplier) { GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig<>(); - genericObjectPoolConfig.setMinIdle(5); + genericObjectPoolConfig.setMinIdle(properties.getMinIdleConnections()); + genericObjectPoolConfig.setMaxIdle(properties.getMaxIdleConnections()); genericObjectPoolConfig.setMaxTotal(properties.getMaxConnectionsPerHost()); + genericObjectPoolConfig.setTestWhileIdle(properties.isTestWhileIdle()); + genericObjectPoolConfig.setMinEvictableIdleTimeMillis( + properties.getMinEvictableIdleTimeMillis()); + genericObjectPoolConfig.setTimeBetweenEvictionRunsMillis( + properties.getTimeBetweenEvictionRunsMillis()); + genericObjectPoolConfig.setNumTestsPerEvictionRun(properties.getNumTestsPerEvictionRun()); log.info( "Starting conductor server using redis_sentinel and cluster " + properties.getClusterName()); diff --git a/server/src/main/resources/application.properties b/server/src/main/resources/application.properties index d3ec09e98c..69b0ebf98f 100644 --- a/server/src/main/resources/application.properties +++ b/server/src/main/resources/application.properties @@ -39,6 +39,12 @@ conductor.redis.queuesNonQuorumPort=22122 # For a single node dynomite or redis server, make sure the value below is set to same as rack specified in the "workflow.dynomite.cluster.hosts" property. conductor.redis.availabilityZone=us-east-1c +#conductor.redis.maxIdleConnections=8 +#conductor.redis.minIdleConnections=5 +#conductor.redis.minEvictableIdleTimeMillis = 1800000 +#conductor.redis.timeBetweenEvictionRunsMillis = -1L +#conductor.redis.testWhileIdle = false +#conductor.redis.numTestsPerEvictionRun = 3 #Transport address to elasticsearch conductor.elasticsearch.url=localhost:9300 @@ -52,7 +58,7 @@ conductor.elasticsearch.version=6 # Default event queue type to listen on for wait task conductor.default-event-queue.type=sqs - + #zookeeper # conductor.zookeeper-lock.connectionString=host1.2181,host2:2181,host3:2181 # conductor.zookeeper-lock.sessionTimeoutMs @@ -84,8 +90,17 @@ conductor.workflow-execution-lock.type=noop_lock #conductor.event-queues.amqp.port=5672 #conductor.event-queues.amqp.useNio=false #conductor.event-queues.amqp.batchSize=1 - #conductor.event-queues.amqp.pollTimeDuration=100ms +#conductor.event-queues.amqp.queueType=classic +#conductor.event-queues.amqp.sequentialMsgProcessing=true +#conductor.event-queues.amqp.connectionTimeoutInMilliSecs=180000 +#conductor.event-queues.amqp.networkRecoveryIntervalInMilliSecs=5000 +#conductor.event-queues.amqp.requestHeartbeatTimeoutInSecs=30 +#conductor.event-queues.amqp.handshakeTimeoutInMilliSecs=180000 +#conductor.event-queues.amqp.maxChannelCount=5000 +#conductor.event-queues.amqp.limit=50 +#conductor.event-queues.amqp.duration=1000 +#conductor.event-queues.amqp.retryType=REGULARINTERVALS #conductor.event-queues.amqp.useExchange=true( exchange or queue) #conductor.event-queues.amqp.listenerQueuePrefix=myqueue diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/EventTaskSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/EventTaskSpec.groovy index 30a64dd6d6..43c227640d 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/EventTaskSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/EventTaskSpec.groovy @@ -45,16 +45,11 @@ class EventTaskSpec extends AbstractSpecification { then: "Retrieve the workflow" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { status == Workflow.WorkflowStatus.RUNNING - tasks.size() == 1 + tasks.size() == 2 tasks[0].taskType == TaskType.EVENT.name() - tasks[0].status == Task.Status.SCHEDULED + tasks[0].status == Task.Status.COMPLETED } - when: "the event task is executed by issuing a system task call" - List polledTaskIds = queueDAO.pop(eventTask.taskType, 1, 200) - String eventTaskId = polledTaskIds.get(0) - asyncSystemTaskExecutor.execute(eventTask, eventTaskId) - then: "Retrieve the workflow" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { status == Workflow.WorkflowStatus.RUNNING