Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pulling changes in for azure-core-amqp release #15068

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion eng/jacoco-test-coverage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>1.5.0-beta.1</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
<version>1.5.0</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
4 changes: 2 additions & 2 deletions eng/versioning/version_client.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ com.azure:azure-ai-anomalydetector;3.0.0-beta.1;3.0.0-beta.2
com.azure:azure-ai-formrecognizer;3.0.0;3.1.0-beta.1
com.azure:azure-ai-textanalytics;5.0.0;5.1.0-beta.1
com.azure:azure-core;1.8.0;1.8.1
com.azure:azure-core-amqp;1.4.0;1.5.0-beta.1
com.azure:azure-core-amqp;1.4.0;1.5.0
com.azure:azure-core-experimental;1.0.0-beta.3;1.0.0-beta.4
com.azure:azure-core-http-jdk-httpclient;1.0.0-beta.1;1.0.0-beta.1
com.azure:azure-core-http-netty;1.6.0;1.6.1
Expand Down Expand Up @@ -135,7 +135,7 @@ unreleased_com.azure:azure-core-test;1.4.2
unreleased_com.azure:azure-core-http-netty;1.6.1
unreleased_com.azure:azure-core-http-okhttp;1.3.1
unreleased_com.azure:azure-identity;1.2.0-beta.1
unreleased_com.azure:azure-core-amqp;1.5.0-beta.1
unreleased_com.azure:azure-core-amqp;1.5.0
unreleased_com.azure:azure-messaging-servicebus;7.0.0-beta.5
unreleased_com.azure:azure-security-keyvault-keys;4.3.0-beta.1

Expand Down
5 changes: 4 additions & 1 deletion sdk/core/azure-core-amqp/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Release History

## 1.5.0-beta.1 (Unreleased)
## 1.5.0 (2020-09-10)
- Remove unused and duplicate logic for Handlers.getErrors().
- Close children sessions and links when a connection is disposed.
- Added AMQP Message envelope which can be accessed using `AmqpAnnotatedMessage`.

## 1.4.0 (2020-08-11)

Expand Down
2 changes: 1 addition & 1 deletion sdk/core/azure-core-amqp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ own AMQP client library that abstracts from the underlying transport library's i
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>1.4.0</version>
<version>1.5.0</version>
</dependency>
```
[//]: # ({x-version-update-end})
Expand Down
2 changes: 1 addition & 1 deletion sdk/core/azure-core-amqp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>1.5.0-beta.1</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
<version>1.5.0</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
<packaging>jar</packaging>

<name>Microsoft Azure Java Core AMQP Library</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,36 @@ public enum AmqpMessageConstant {
/**
* The name of the entity that published a message.
*/
PUBLISHER_ANNOTATION_NAME("x-opt-publisher");
PUBLISHER_ANNOTATION_NAME("x-opt-publisher"),
/**
* The name representing scheduled enqueue time.
*/
SCHEDULED_ENQUEUE_UTC_TIME_NAME("x-opt-scheduled-enqueue-time"),
/**
* The identifier associated with a given via-partition.
*/
VIA_PARTITION_KEY_ANNOTATION_NAME("x-opt-via-partition-key"),
/**
* The identifier for locked until.
*/
LOCKED_UNTIL_KEY_ANNOTATION_NAME("x-opt-locked-until"),
/**
* The identifier for deadletter source.
*/
DEAD_LETTER_SOURCE_KEY_ANNOTATION_NAME("x-opt-deadletter-source"),
/**
* The name representing enqueue sequence number.
* This one appears to always be 0, but is always returned with each message.
*/
ENQUEUED_SEQUENCE_NUMBER_ANNOTATION_NAME("x-opt-enqueue-sequence-number"),
/**
* The identifier for deadletter description.
*/
DEAD_LETTER_DESCRIPTION_ANNOTATION_NAME("DeadLetterErrorDescription"),
/**
* The identifier for deadletter reason.
*/
DEAD_LETTER_REASON_ANNOTATION_NAME("DeadLetterReason");

private static final Map<String, AmqpMessageConstant> RESERVED_CONSTANTS_MAP = new HashMap<>();
private final String constant;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ public class AmqpRetryOptions {
* Creates an instance with the default retry options set.
*/
public AmqpRetryOptions() {
maxRetries = 3;
delay = Duration.ofMillis(800);
maxDelay = Duration.ofMinutes(1);
tryTimeout = Duration.ofMinutes(1);
retryMode = AmqpRetryMode.EXPONENTIAL;
this.maxRetries = 3;
this.delay = Duration.ofMillis(800);
this.maxDelay = Duration.ofMinutes(1);
this.tryTimeout = Duration.ofMinutes(1);
this.retryMode = AmqpRetryMode.EXPONENTIAL;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,16 @@
import com.azure.core.amqp.implementation.handler.ConnectionHandler;
import com.azure.core.amqp.implementation.handler.SessionHandler;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.reactor.Reactor;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
Expand All @@ -35,6 +36,8 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.azure.core.amqp.implementation.ClientConstants.NOT_APPLICABLE;

public class ReactorConnection implements AmqpConnection {
private static final String CBS_SESSION_NAME = "cbs-session";
private static final String CBS_ADDRESS = "$cbs";
Expand All @@ -43,12 +46,10 @@ public class ReactorConnection implements AmqpConnection {
private final ClientLogger logger = new ClientLogger(ReactorConnection.class);
private final ConcurrentMap<String, SessionSubscription> sessionMap = new ConcurrentHashMap<>();

private final AtomicBoolean hasConnection = new AtomicBoolean();
private final AtomicBoolean isDisposed = new AtomicBoolean();
private final DirectProcessor<AmqpShutdownSignal> shutdownSignals = DirectProcessor.create();
private final ReplayProcessor<AmqpEndpointState> endpointStates =
ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED);
private FluxSink<AmqpEndpointState> endpointStatesSink = endpointStates.sink(FluxSink.OverflowStrategy.BUFFER);
private final FluxSink<AmqpShutdownSignal> shutdownSignalsSink = shutdownSignals.sink();
private final ReplayProcessor<AmqpEndpointState> endpointStates;

private final String connectionId;
private final Mono<Connection> connectionMono;
Expand All @@ -58,7 +59,6 @@ public class ReactorConnection implements AmqpConnection {
private final MessageSerializer messageSerializer;
private final ConnectionOptions connectionOptions;
private final ReactorProvider reactorProvider;
private final Disposable.Composite subscriptions;
private final AmqpRetryPolicy retryPolicy;
private final SenderSettleMode senderSettleMode;
private final ReceiverSettleMode receiverSettleMode;
Expand Down Expand Up @@ -86,8 +86,8 @@ public class ReactorConnection implements AmqpConnection {
*/
public ReactorConnection(String connectionId, ConnectionOptions connectionOptions, ReactorProvider reactorProvider,
ReactorHandlerProvider handlerProvider, TokenManagerProvider tokenManagerProvider,
MessageSerializer messageSerializer, String product, String clientVersion,
SenderSettleMode senderSettleMode, ReceiverSettleMode receiverSettleMode) {
MessageSerializer messageSerializer, String product, String clientVersion, SenderSettleMode senderSettleMode,
ReceiverSettleMode receiverSettleMode) {

this.connectionOptions = connectionOptions;
this.reactorProvider = reactorProvider;
Expand All @@ -103,26 +103,14 @@ public ReactorConnection(String connectionId, ConnectionOptions connectionOption
this.senderSettleMode = senderSettleMode;
this.receiverSettleMode = receiverSettleMode;

this.connectionMono = Mono.fromCallable(this::getOrCreateConnection)
.doOnSubscribe(c -> hasConnection.set(true));

this.subscriptions = Disposables.composite(
this.handler.getEndpointStates().subscribe(
state -> {
logger.verbose("connectionId[{}]: Connection state: {}", connectionId, state);
endpointStatesSink.next(AmqpEndpointStateUtil.getConnectionState(state));
}, error -> {
logger.error("connectionId[{}] Error occurred in connection endpoint.", connectionId, error);
endpointStatesSink.error(error);
}, () -> {
endpointStatesSink.next(AmqpEndpointState.CLOSED);
endpointStatesSink.complete();
}),

this.handler.getErrors().subscribe(error -> {
logger.error("connectionId[{}] Error occurred in connection handler.", connectionId, error);
endpointStatesSink.error(error);
}));
this.connectionMono = Mono.fromCallable(this::getOrCreateConnection);

this.endpointStates = this.handler.getEndpointStates()
.takeUntilOther(shutdownSignals)
.map(state -> {
logger.verbose("connectionId[{}]: State {}", connectionId, state);
return AmqpEndpointStateUtil.getConnectionState(state);
}).subscribeWith(ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED));
}

/**
Expand All @@ -148,14 +136,12 @@ public Mono<ClaimsBasedSecurityNode> getClaimsBasedSecurityNode() {
"connectionId[%s]: Connection is disposed. Cannot get CBS node.", connectionId))));
}

final Mono<ClaimsBasedSecurityNode> cbsNodeMono = RetryUtil.withRetry(
getEndpointStates().takeUntil(x -> x == AmqpEndpointState.ACTIVE),
connectionOptions.getRetry().getTryTimeout(), retryPolicy)
final Mono<ClaimsBasedSecurityNode> cbsNodeMono =
RetryUtil.withRetry(getEndpointStates().takeUntil(x -> x == AmqpEndpointState.ACTIVE),
connectionOptions.getRetry().getTryTimeout(), retryPolicy)
.then(Mono.fromCallable(this::getOrCreateCBSNode));

return hasConnection.get()
? cbsNodeMono
: connectionMono.then(cbsNodeMono);
return connectionMono.then(cbsNodeMono);
}

@Override
Expand Down Expand Up @@ -249,17 +235,7 @@ protected AmqpSession createSession(String sessionName, Session session, Session
*/
@Override
public boolean removeSession(String sessionName) {
if (sessionName == null) {
return false;
}

final SessionSubscription removed = sessionMap.remove(sessionName);

if (removed != null) {
removed.dispose();
}

return removed != null;
return removeSession(sessionName, null);
}

@Override
Expand All @@ -272,18 +248,23 @@ public boolean isDisposed() {
*/
@Override
public void dispose() {
dispose(null);
shutdownSignalsSink.next(new AmqpShutdownSignal(false, true,
"Disposed by client."));
}

void dispose(ErrorCondition errorCondition) {
if (isDisposed.getAndSet(true)) {
return;
}

logger.info("connectionId[{}]: Disposing of ReactorConnection.", connectionId);
subscriptions.dispose();
endpointStatesSink.complete();
logger.info("connectionId[{}], errorCondition[{}]: Disposing of ReactorConnection.", connectionId,
errorCondition != null ? errorCondition : NOT_APPLICABLE);

final String[] keys = sessionMap.keySet().toArray(new String[0]);
for (String key : keys) {
logger.info("connectionId[{}]: Removing session '{}'", connectionId, key);
removeSession(key);
removeSession(key, errorCondition);
}

if (connection != null) {
Expand Down Expand Up @@ -331,6 +312,20 @@ protected Mono<RequestResponseChannel> createRequestResponseChannel(String sessi
new ClientLogger(String.format("%s<%s>", RequestResponseChannel.class, sessionName))));
}

private boolean removeSession(String sessionName, ErrorCondition errorCondition) {
if (sessionName == null) {
return false;
}

final SessionSubscription removed = sessionMap.remove(sessionName);

if (removed != null) {
removed.dispose(errorCondition);
}

return removed != null;
}

private synchronized ClaimsBasedSecurityNode getOrCreateCBSNode() {
if (cbsChannel == null) {
logger.info("Setting CBS channel.");
Expand Down Expand Up @@ -380,6 +375,7 @@ public void onConnectionError(Throwable exception) {
getId(), getFullyQualifiedNamespace(), exception.getMessage());

endpointStates.onError(exception);
ReactorConnection.this.dispose();
}

@Override
Expand All @@ -393,16 +389,12 @@ void onConnectionShutdown(AmqpShutdownSignal shutdownSignal) {
"onReactorError connectionId[{}], hostName[{}], message[Shutting down], shutdown signal[{}]",
getId(), getFullyQualifiedNamespace(), shutdownSignal.isInitiatedByClient(), shutdownSignal);

if (!endpointStatesSink.isCancelled()) {
endpointStatesSink.next(AmqpEndpointState.CLOSED);
endpointStatesSink.complete();
}

dispose();
dispose(new ErrorCondition(Symbol.getSymbol("onReactorError"), shutdownSignal.toString()));
shutdownSignalsSink.next(shutdownSignal);
}
}

private static final class SessionSubscription implements Disposable {
private static final class SessionSubscription {
private final AtomicBoolean isDisposed = new AtomicBoolean();
private final AmqpSession session;
private final Disposable subscription;
Expand All @@ -412,22 +404,23 @@ private SessionSubscription(AmqpSession session, Disposable subscription) {
this.subscription = subscription;
}

public Disposable getSubscription() {
return subscription;
}

public AmqpSession getSession() {
return session;
}

@Override
public void dispose() {
void dispose(ErrorCondition errorCondition) {
if (isDisposed.getAndSet(true)) {
return;
}

if (session instanceof ReactorSession) {
final ReactorSession reactorSession = (ReactorSession) session;
reactorSession.dispose(errorCondition);
} else {
session.dispose();
}

subscription.dispose();
session.dispose();
}
}
}
Loading