Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
AMQP enhancements for resiliency and concurrency (#2587)
Browse files Browse the repository at this point in the history
* AMQP messages will generate the observable change

* Reverting unnecessary changes

* Capturing the only changes required for AMQP messagres reading

* Capturing the only changes required for AMQP messagres reading

* Support for sequential and parallel processing of messages

* Channel reusability changes

* Channel reusability changes

* Subscriber channel caching

* Max channel changes

* Connection and opertion retries

* Channel usage for exchanges and publishers

* Acknowledge the message even on failures

* Resolved conflicts

* Apply coding style changes using spotless

* Retry on publish and ack failures

* Externalizing the redis config

* Externalizing the redis config

* Make Event System Task Synchronous

Event is a task that publishes messages to an external eventing system.
The publishing is not a long running task, and publishing message to an async system using async task is not optimal.

marking event as sync, improves the overall performance and has negligible overhead given the task completes quite quick.

* Update integration tests to reflect Event is synchronous

* Getting the latest changes

Co-authored-by: Thangella, Venkat <venkat.thangella@netapp.com>
Co-authored-by: Viren Baraiya <virenx@gmail.com>
  • Loading branch information
3 people committed Feb 23, 2022
1 parent c3e85a5 commit 9ffcabc
Show file tree
Hide file tree
Showing 20 changed files with 988 additions and 437 deletions.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

import org.springframework.boot.context.properties.ConfigurationProperties;

import com.netflix.conductor.contribs.queue.amqp.util.RetryType;

import com.rabbitmq.client.AMQP.PROTOCOL;
import com.rabbitmq.client.ConnectionFactory;

Expand All @@ -36,8 +38,62 @@ public class AMQPEventQueueProperties {

private int port = PROTOCOL.PORT;

private Duration connectionTimeout =
Duration.ofMillis(ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT);
private int connectionTimeoutInMilliSecs = 180000;
private int networkRecoveryIntervalInMilliSecs = 5000;
private int requestHeartbeatTimeoutInSecs = 30;
private int handshakeTimeoutInMilliSecs = 180000;
private int maxChannelCount = 5000;
private int limit = 50;
private int duration = 1000;
private RetryType retryType = RetryType.REGULARINTERVALS;

public int getLimit() {
return limit;
}

public void setLimit(int limit) {
this.limit = limit;
}

public int getDuration() {
return duration;
}

public void setDuration(int duration) {
this.duration = duration;
}

public RetryType getType() {
return retryType;
}

public void setType(RetryType type) {
this.retryType = type;
}

public int getConnectionTimeoutInMilliSecs() {
return connectionTimeoutInMilliSecs;
}

public void setConnectionTimeoutInMilliSecs(int connectionTimeoutInMilliSecs) {
this.connectionTimeoutInMilliSecs = connectionTimeoutInMilliSecs;
}

public int getHandshakeTimeoutInMilliSecs() {
return handshakeTimeoutInMilliSecs;
}

public void setHandshakeTimeoutInMilliSecs(int handshakeTimeoutInMilliSecs) {
this.handshakeTimeoutInMilliSecs = handshakeTimeoutInMilliSecs;
}

public int getMaxChannelCount() {
return maxChannelCount;
}

public void setMaxChannelCount(int maxChannelCount) {
this.maxChannelCount = maxChannelCount;
}

private boolean useNio = false;

Expand All @@ -53,6 +109,10 @@ public class AMQPEventQueueProperties {

private String exchangeType = "topic";

private String queueType = "classic";

private boolean sequentialMsgProcessing = true;

private int deliveryMode = 2;

private boolean useExchange = true;
Expand Down Expand Up @@ -115,14 +175,6 @@ public void setPort(int port) {
this.port = port;
}

public Duration getConnectionTimeout() {
return connectionTimeout;
}

public void setConnectionTimeout(Duration connectionTimeout) {
this.connectionTimeout = connectionTimeout;
}

public boolean isUseNio() {
return useNio;
}
Expand Down Expand Up @@ -202,4 +254,48 @@ public String getListenerQueuePrefix() {
public void setListenerQueuePrefix(String listenerQueuePrefix) {
this.listenerQueuePrefix = listenerQueuePrefix;
}

public String getQueueType() {
return queueType;
}

/**
* @param queueType Supports two queue types, 'classic' and 'quorum'. Classic will be be
* deprecated in 2022 and its usage discouraged from RabbitMQ community. So not using enum
* type here to hold different values.
*/
public void setQueueType(String queueType) {
this.queueType = queueType;
}

/** @return the sequentialMsgProcessing */
public boolean isSequentialMsgProcessing() {
return sequentialMsgProcessing;
}

/**
* @param sequentialMsgProcessing the sequentialMsgProcessing to set Supports sequential and
* parallel message processing capabilities. In parallel message processing, number of
* threads are controlled by batch size. No thread control or execution framework required
* here as threads are limited and short-lived.
*/
public void setSequentialMsgProcessing(boolean sequentialMsgProcessing) {
this.sequentialMsgProcessing = sequentialMsgProcessing;
}

public int getNetworkRecoveryIntervalInMilliSecs() {
return networkRecoveryIntervalInMilliSecs;
}

public void setNetworkRecoveryIntervalInMilliSecs(int networkRecoveryIntervalInMilliSecs) {
this.networkRecoveryIntervalInMilliSecs = networkRecoveryIntervalInMilliSecs;
}

public int getRequestHeartbeatTimeoutInSecs() {
return requestHeartbeatTimeoutInSecs;
}

public void setRequestHeartbeatTimeoutInSecs(int requestHeartbeatTimeoutInSecs) {
this.requestHeartbeatTimeoutInSecs = requestHeartbeatTimeoutInSecs;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.contribs.queue.amqp.config;

import com.netflix.conductor.contribs.queue.amqp.util.RetryType;

public class AMQPRetryPattern {

private int limit = 50;
private int duration = 1000;
private RetryType type = RetryType.REGULARINTERVALS;

public AMQPRetryPattern() {}

public AMQPRetryPattern(int limit, int duration, RetryType type) {
this.limit = limit;
this.duration = duration;
this.type = type;
}

/**
* This gets executed if the retry index is within the allowed limits, otherwise exception will
* be thrown.
*
* @throws Exception
*/
public void continueOrPropogate(Exception ex, int retryIndex) throws Exception {
if (retryIndex > limit) {
throw ex;
}
// Regular Intervals is the default
long waitDuration = duration;
if (type == RetryType.INCREMENTALINTERVALS) {
waitDuration = duration * retryIndex;
} else if (type == RetryType.EXPONENTIALBACKOFF) {
waitDuration = (long) Math.pow(2, retryIndex) * duration;
}
try {
Thread.sleep(waitDuration);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,16 @@ public class AMQPConstants {
* polling time to drain the in-memory queue.
*/
public static int DEFAULT_POLL_TIME_MS = 100;

// info channel messages.
public static final String INFO_CHANNEL_BORROW_SUCCESS =
"Borrowed the channel object from the channel pool for " + "the connection type [%s]";
public static final String INFO_CHANNEL_RETURN_SUCCESS =
"Returned the borrowed channel object to the pool for " + "the connection type [%s]";
public static final String INFO_CHANNEL_CREATION_SUCCESS =
"Channels are not available in the pool. Created a"
+ " channel for the connection type [%s]";
public static final String INFO_CHANNEL_RESET_SUCCESS =
"No proper channels available in the pool. Created a "
+ "channel for the connection type [%s]";
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ public class AMQPSettings {
private String queueOrExchangeName;
private String eventName;
private String exchangeType;
private String queueType;
private String routingKey;
private final String contentEncoding;
private final String contentType;

private boolean durable;
private boolean exclusive;
private boolean autoDelete;

private boolean sequentialProcessing;
private int deliveryMode;

private final Map<String, Object> arguments = new HashMap<>();
Expand All @@ -66,6 +66,8 @@ public AMQPSettings(final AMQPEventQueueProperties properties) {
contentEncoding = properties.getContentEncoding();
exchangeType = properties.getExchangeType();
routingKey = StringUtils.EMPTY;
queueType = properties.getQueueType();
sequentialProcessing = properties.isSequentialMsgProcessing();
// Set common settings for publishing and consuming
setDeliveryMode(properties.getDeliveryMode());
}
Expand Down Expand Up @@ -213,77 +215,85 @@ public final AMQPSettings fromURI(final String queueURI) {
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof AMQPSettings)) {
return false;
}
AMQPSettings that = (AMQPSettings) o;
return isDurable() == that.isDurable()
&& isExclusive() == that.isExclusive()
&& autoDelete == that.autoDelete
&& getDeliveryMode() == that.getDeliveryMode()
&& Objects.equals(getQueueOrExchangeName(), that.getQueueOrExchangeName())
&& Objects.equals(getExchangeType(), that.getExchangeType())
&& Objects.equals(getRoutingKey(), that.getRoutingKey())
&& Objects.equals(getContentType(), that.getContentType())
&& Objects.equals(getContentEncoding(), that.getContentEncoding())
&& Objects.equals(getArguments(), that.getArguments());
public boolean equals(Object obj) {
if (this == obj) return true;
if (!(obj instanceof AMQPSettings)) return false;
AMQPSettings other = (AMQPSettings) obj;
return Objects.equals(arguments, other.arguments)
&& autoDelete == other.autoDelete
&& Objects.equals(contentEncoding, other.contentEncoding)
&& Objects.equals(contentType, other.contentType)
&& deliveryMode == other.deliveryMode
&& durable == other.durable
&& Objects.equals(eventName, other.eventName)
&& Objects.equals(exchangeType, other.exchangeType)
&& exclusive == other.exclusive
&& Objects.equals(queueOrExchangeName, other.queueOrExchangeName)
&& Objects.equals(queueType, other.queueType)
&& Objects.equals(routingKey, other.routingKey)
&& sequentialProcessing == other.sequentialProcessing;
}

@Override
public int hashCode() {
return Objects.hash(
getQueueOrExchangeName(),
getExchangeType(),
getRoutingKey(),
getContentType(),
isDurable(),
isExclusive(),
arguments,
autoDelete,
getDeliveryMode(),
getContentEncoding(),
getArguments());
contentEncoding,
contentType,
deliveryMode,
durable,
eventName,
exchangeType,
exclusive,
queueOrExchangeName,
queueType,
routingKey,
sequentialProcessing);
}

@Override
public String toString() {
return "AMQSettings{"
+ "queueOrExchangeName='"
return "AMQPSettings [queueOrExchangeName="
+ queueOrExchangeName
+ '\''
+ ", exchangeType='"
+ ", eventName="
+ eventName
+ ", exchangeType="
+ exchangeType
+ '\''
+ ", routingKey='"
+ ", queueType="
+ queueType
+ ", routingKey="
+ routingKey
+ '\''
+ ", contentType='"
+ ", contentEncoding="
+ contentEncoding
+ ", contentType="
+ contentType
+ '\''
+ ", durable="
+ durable
+ ", exclusive="
+ exclusive
+ ", autoDelete="
+ autoDelete
+ ", sequentialProcessing="
+ sequentialProcessing
+ ", deliveryMode="
+ deliveryMode
+ ", contentEncoding='"
+ contentEncoding
+ '\''
+ ", arguments="
+ arguments
+ ", durable="
+ isDurable()
+ ", exclusive="
+ isExclusive()
+ '}';
+ "]";
}

public String getEventName() {
return eventName;
}

/** @return the queueType */
public String getQueueType() {
return queueType;
}

/** @return the sequentialProcessing */
public boolean isSequentialProcessing() {
return sequentialProcessing;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2020 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.contribs.queue.amqp.util;

/** RetryType holds the retry type */
public enum RetryType {
REGULARINTERVALS,
EXPONENTIALBACKOFF,
INCREMENTALINTERVALS
}
Loading

0 comments on commit 9ffcabc

Please sign in to comment.