diff --git a/eng/jacoco-test-coverage/pom.xml b/eng/jacoco-test-coverage/pom.xml
index 44c5f8d020771..6f31487758a84 100644
--- a/eng/jacoco-test-coverage/pom.xml
+++ b/eng/jacoco-test-coverage/pom.xml
@@ -59,7 +59,7 @@
com.azureazure-core-amqp
- 1.5.0-beta.1
+ 1.5.0com.azure
diff --git a/eng/versioning/version_client.txt b/eng/versioning/version_client.txt
index fc0ff09a11df7..013ac426e68f5 100644
--- a/eng/versioning/version_client.txt
+++ b/eng/versioning/version_client.txt
@@ -8,7 +8,7 @@ com.azure:azure-ai-anomalydetector;3.0.0-beta.1;3.0.0-beta.2
com.azure:azure-ai-formrecognizer;3.0.0;3.1.0-beta.1
com.azure:azure-ai-textanalytics;5.0.0;5.1.0-beta.1
com.azure:azure-core;1.8.0;1.8.1
-com.azure:azure-core-amqp;1.4.0;1.5.0-beta.1
+com.azure:azure-core-amqp;1.4.0;1.5.0
com.azure:azure-core-experimental;1.0.0-beta.3;1.0.0-beta.4
com.azure:azure-core-http-jdk-httpclient;1.0.0-beta.1;1.0.0-beta.1
com.azure:azure-core-http-netty;1.6.0;1.6.1
@@ -135,7 +135,7 @@ unreleased_com.azure:azure-core-test;1.4.2
unreleased_com.azure:azure-core-http-netty;1.6.1
unreleased_com.azure:azure-core-http-okhttp;1.3.1
unreleased_com.azure:azure-identity;1.2.0-beta.1
-unreleased_com.azure:azure-core-amqp;1.5.0-beta.1
+unreleased_com.azure:azure-core-amqp;1.5.0
unreleased_com.azure:azure-messaging-servicebus;7.0.0-beta.5
unreleased_com.azure:azure-security-keyvault-keys;4.3.0-beta.1
diff --git a/sdk/core/azure-core-amqp/CHANGELOG.md b/sdk/core/azure-core-amqp/CHANGELOG.md
index 37453e692edf1..9ff0794f16467 100644
--- a/sdk/core/azure-core-amqp/CHANGELOG.md
+++ b/sdk/core/azure-core-amqp/CHANGELOG.md
@@ -1,6 +1,9 @@
# Release History
-## 1.5.0-beta.1 (Unreleased)
+## 1.5.0 (2020-09-10)
+- Remove unused and duplicate logic for Handlers.getErrors().
+- Close children sessions and links when a connection is disposed.
+- Added AMQP Message envelope which can be accessed using `AmqpAnnotatedMessage`.
## 1.4.0 (2020-08-11)
diff --git a/sdk/core/azure-core-amqp/README.md b/sdk/core/azure-core-amqp/README.md
index 46ef182c058aa..35b96b72deeea 100644
--- a/sdk/core/azure-core-amqp/README.md
+++ b/sdk/core/azure-core-amqp/README.md
@@ -16,7 +16,7 @@ own AMQP client library that abstracts from the underlying transport library's i
com.azureazure-core-amqp
- 1.4.0
+ 1.5.0
```
[//]: # ({x-version-update-end})
diff --git a/sdk/core/azure-core-amqp/pom.xml b/sdk/core/azure-core-amqp/pom.xml
index a1242f343b3ce..d2d4c6bb96460 100644
--- a/sdk/core/azure-core-amqp/pom.xml
+++ b/sdk/core/azure-core-amqp/pom.xml
@@ -14,7 +14,7 @@
com.azureazure-core-amqp
- 1.5.0-beta.1
+ 1.5.0jarMicrosoft Azure Java Core AMQP Library
diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpMessageConstant.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpMessageConstant.java
index 4f250ca9719f1..3cc986e779ef8 100644
--- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpMessageConstant.java
+++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpMessageConstant.java
@@ -93,7 +93,36 @@ public enum AmqpMessageConstant {
/**
* The name of the entity that published a message.
*/
- PUBLISHER_ANNOTATION_NAME("x-opt-publisher");
+ PUBLISHER_ANNOTATION_NAME("x-opt-publisher"),
+ /**
+ * The name representing scheduled enqueue time.
+ */
+ SCHEDULED_ENQUEUE_UTC_TIME_NAME("x-opt-scheduled-enqueue-time"),
+ /**
+ * The identifier associated with a given via-partition.
+ */
+ VIA_PARTITION_KEY_ANNOTATION_NAME("x-opt-via-partition-key"),
+ /**
+ * The identifier for locked until.
+ */
+ LOCKED_UNTIL_KEY_ANNOTATION_NAME("x-opt-locked-until"),
+ /**
+ * The identifier for deadletter source.
+ */
+ DEAD_LETTER_SOURCE_KEY_ANNOTATION_NAME("x-opt-deadletter-source"),
+ /**
+ * The name representing enqueue sequence number.
+ * This one appears to always be 0, but is always returned with each message.
+ */
+ ENQUEUED_SEQUENCE_NUMBER_ANNOTATION_NAME("x-opt-enqueue-sequence-number"),
+ /**
+ * The identifier for deadletter description.
+ */
+ DEAD_LETTER_DESCRIPTION_ANNOTATION_NAME("DeadLetterErrorDescription"),
+ /**
+ * The identifier for deadletter reason.
+ */
+ DEAD_LETTER_REASON_ANNOTATION_NAME("DeadLetterReason");
private static final Map RESERVED_CONSTANTS_MAP = new HashMap<>();
private final String constant;
diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpRetryOptions.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpRetryOptions.java
index 0b266ef518f40..db3daea8af6c4 100644
--- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpRetryOptions.java
+++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpRetryOptions.java
@@ -23,11 +23,11 @@ public class AmqpRetryOptions {
* Creates an instance with the default retry options set.
*/
public AmqpRetryOptions() {
- maxRetries = 3;
- delay = Duration.ofMillis(800);
- maxDelay = Duration.ofMinutes(1);
- tryTimeout = Duration.ofMinutes(1);
- retryMode = AmqpRetryMode.EXPONENTIAL;
+ this.maxRetries = 3;
+ this.delay = Duration.ofMillis(800);
+ this.maxDelay = Duration.ofMinutes(1);
+ this.tryTimeout = Duration.ofMinutes(1);
+ this.retryMode = AmqpRetryMode.EXPONENTIAL;
}
/**
diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java
index 8d67a2f86907d..c60fc0aa9a665 100644
--- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java
+++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java
@@ -12,15 +12,16 @@
import com.azure.core.amqp.implementation.handler.ConnectionHandler;
import com.azure.core.amqp.implementation.handler.SessionHandler;
import com.azure.core.util.logging.ClientLogger;
-import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.reactor.Reactor;
import reactor.core.Disposable;
-import reactor.core.Disposables;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
@@ -35,6 +36,8 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import static com.azure.core.amqp.implementation.ClientConstants.NOT_APPLICABLE;
+
public class ReactorConnection implements AmqpConnection {
private static final String CBS_SESSION_NAME = "cbs-session";
private static final String CBS_ADDRESS = "$cbs";
@@ -43,12 +46,10 @@ public class ReactorConnection implements AmqpConnection {
private final ClientLogger logger = new ClientLogger(ReactorConnection.class);
private final ConcurrentMap sessionMap = new ConcurrentHashMap<>();
- private final AtomicBoolean hasConnection = new AtomicBoolean();
private final AtomicBoolean isDisposed = new AtomicBoolean();
private final DirectProcessor shutdownSignals = DirectProcessor.create();
- private final ReplayProcessor endpointStates =
- ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED);
- private FluxSink endpointStatesSink = endpointStates.sink(FluxSink.OverflowStrategy.BUFFER);
+ private final FluxSink shutdownSignalsSink = shutdownSignals.sink();
+ private final ReplayProcessor endpointStates;
private final String connectionId;
private final Mono connectionMono;
@@ -58,7 +59,6 @@ public class ReactorConnection implements AmqpConnection {
private final MessageSerializer messageSerializer;
private final ConnectionOptions connectionOptions;
private final ReactorProvider reactorProvider;
- private final Disposable.Composite subscriptions;
private final AmqpRetryPolicy retryPolicy;
private final SenderSettleMode senderSettleMode;
private final ReceiverSettleMode receiverSettleMode;
@@ -86,8 +86,8 @@ public class ReactorConnection implements AmqpConnection {
*/
public ReactorConnection(String connectionId, ConnectionOptions connectionOptions, ReactorProvider reactorProvider,
ReactorHandlerProvider handlerProvider, TokenManagerProvider tokenManagerProvider,
- MessageSerializer messageSerializer, String product, String clientVersion,
- SenderSettleMode senderSettleMode, ReceiverSettleMode receiverSettleMode) {
+ MessageSerializer messageSerializer, String product, String clientVersion, SenderSettleMode senderSettleMode,
+ ReceiverSettleMode receiverSettleMode) {
this.connectionOptions = connectionOptions;
this.reactorProvider = reactorProvider;
@@ -103,26 +103,14 @@ public ReactorConnection(String connectionId, ConnectionOptions connectionOption
this.senderSettleMode = senderSettleMode;
this.receiverSettleMode = receiverSettleMode;
- this.connectionMono = Mono.fromCallable(this::getOrCreateConnection)
- .doOnSubscribe(c -> hasConnection.set(true));
-
- this.subscriptions = Disposables.composite(
- this.handler.getEndpointStates().subscribe(
- state -> {
- logger.verbose("connectionId[{}]: Connection state: {}", connectionId, state);
- endpointStatesSink.next(AmqpEndpointStateUtil.getConnectionState(state));
- }, error -> {
- logger.error("connectionId[{}] Error occurred in connection endpoint.", connectionId, error);
- endpointStatesSink.error(error);
- }, () -> {
- endpointStatesSink.next(AmqpEndpointState.CLOSED);
- endpointStatesSink.complete();
- }),
-
- this.handler.getErrors().subscribe(error -> {
- logger.error("connectionId[{}] Error occurred in connection handler.", connectionId, error);
- endpointStatesSink.error(error);
- }));
+ this.connectionMono = Mono.fromCallable(this::getOrCreateConnection);
+
+ this.endpointStates = this.handler.getEndpointStates()
+ .takeUntilOther(shutdownSignals)
+ .map(state -> {
+ logger.verbose("connectionId[{}]: State {}", connectionId, state);
+ return AmqpEndpointStateUtil.getConnectionState(state);
+ }).subscribeWith(ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED));
}
/**
@@ -148,14 +136,12 @@ public Mono getClaimsBasedSecurityNode() {
"connectionId[%s]: Connection is disposed. Cannot get CBS node.", connectionId))));
}
- final Mono cbsNodeMono = RetryUtil.withRetry(
- getEndpointStates().takeUntil(x -> x == AmqpEndpointState.ACTIVE),
- connectionOptions.getRetry().getTryTimeout(), retryPolicy)
+ final Mono cbsNodeMono =
+ RetryUtil.withRetry(getEndpointStates().takeUntil(x -> x == AmqpEndpointState.ACTIVE),
+ connectionOptions.getRetry().getTryTimeout(), retryPolicy)
.then(Mono.fromCallable(this::getOrCreateCBSNode));
- return hasConnection.get()
- ? cbsNodeMono
- : connectionMono.then(cbsNodeMono);
+ return connectionMono.then(cbsNodeMono);
}
@Override
@@ -249,17 +235,7 @@ protected AmqpSession createSession(String sessionName, Session session, Session
*/
@Override
public boolean removeSession(String sessionName) {
- if (sessionName == null) {
- return false;
- }
-
- final SessionSubscription removed = sessionMap.remove(sessionName);
-
- if (removed != null) {
- removed.dispose();
- }
-
- return removed != null;
+ return removeSession(sessionName, null);
}
@Override
@@ -272,18 +248,23 @@ public boolean isDisposed() {
*/
@Override
public void dispose() {
+ dispose(null);
+ shutdownSignalsSink.next(new AmqpShutdownSignal(false, true,
+ "Disposed by client."));
+ }
+
+ void dispose(ErrorCondition errorCondition) {
if (isDisposed.getAndSet(true)) {
return;
}
- logger.info("connectionId[{}]: Disposing of ReactorConnection.", connectionId);
- subscriptions.dispose();
- endpointStatesSink.complete();
+ logger.info("connectionId[{}], errorCondition[{}]: Disposing of ReactorConnection.", connectionId,
+ errorCondition != null ? errorCondition : NOT_APPLICABLE);
final String[] keys = sessionMap.keySet().toArray(new String[0]);
for (String key : keys) {
logger.info("connectionId[{}]: Removing session '{}'", connectionId, key);
- removeSession(key);
+ removeSession(key, errorCondition);
}
if (connection != null) {
@@ -331,6 +312,20 @@ protected Mono createRequestResponseChannel(String sessi
new ClientLogger(String.format("%s<%s>", RequestResponseChannel.class, sessionName))));
}
+ private boolean removeSession(String sessionName, ErrorCondition errorCondition) {
+ if (sessionName == null) {
+ return false;
+ }
+
+ final SessionSubscription removed = sessionMap.remove(sessionName);
+
+ if (removed != null) {
+ removed.dispose(errorCondition);
+ }
+
+ return removed != null;
+ }
+
private synchronized ClaimsBasedSecurityNode getOrCreateCBSNode() {
if (cbsChannel == null) {
logger.info("Setting CBS channel.");
@@ -380,6 +375,7 @@ public void onConnectionError(Throwable exception) {
getId(), getFullyQualifiedNamespace(), exception.getMessage());
endpointStates.onError(exception);
+ ReactorConnection.this.dispose();
}
@Override
@@ -393,16 +389,12 @@ void onConnectionShutdown(AmqpShutdownSignal shutdownSignal) {
"onReactorError connectionId[{}], hostName[{}], message[Shutting down], shutdown signal[{}]",
getId(), getFullyQualifiedNamespace(), shutdownSignal.isInitiatedByClient(), shutdownSignal);
- if (!endpointStatesSink.isCancelled()) {
- endpointStatesSink.next(AmqpEndpointState.CLOSED);
- endpointStatesSink.complete();
- }
-
- dispose();
+ dispose(new ErrorCondition(Symbol.getSymbol("onReactorError"), shutdownSignal.toString()));
+ shutdownSignalsSink.next(shutdownSignal);
}
}
- private static final class SessionSubscription implements Disposable {
+ private static final class SessionSubscription {
private final AtomicBoolean isDisposed = new AtomicBoolean();
private final AmqpSession session;
private final Disposable subscription;
@@ -412,22 +404,23 @@ private SessionSubscription(AmqpSession session, Disposable subscription) {
this.subscription = subscription;
}
- public Disposable getSubscription() {
- return subscription;
- }
-
public AmqpSession getSession() {
return session;
}
- @Override
- public void dispose() {
+ void dispose(ErrorCondition errorCondition) {
if (isDisposed.getAndSet(true)) {
return;
}
+ if (session instanceof ReactorSession) {
+ final ReactorSession reactorSession = (ReactorSession) session;
+ reactorSession.dispose(errorCondition);
+ } else {
+ session.dispose();
+ }
+
subscription.dispose();
- session.dispose();
}
}
}
diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java
index 5e0fcd0440343..673d6deea1131 100644
--- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java
+++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java
@@ -7,14 +7,14 @@
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.Message;
import reactor.core.Disposable;
-import reactor.core.Disposables;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
-import reactor.core.publisher.FluxSink;
import reactor.core.publisher.ReplayProcessor;
import java.io.IOException;
@@ -36,13 +36,11 @@ public class ReactorReceiver implements AmqpReceiveLink {
private final ReceiveLinkHandler handler;
private final TokenManager tokenManager;
private final ReactorDispatcher dispatcher;
- private final Disposable.Composite subscriptions;
+ private final Disposable subscriptions;
private final AtomicBoolean isDisposed = new AtomicBoolean();
private final EmitterProcessor messagesProcessor;
private final ClientLogger logger = new ClientLogger(ReactorReceiver.class);
- private final ReplayProcessor endpointStates =
- ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED);
- private FluxSink endpointStateSink = endpointStates.sink(FluxSink.OverflowStrategy.BUFFER);
+ private final ReplayProcessor endpointStates;
private final AtomicReference> creditSupplier = new AtomicReference<>();
@@ -69,43 +67,28 @@ protected ReactorReceiver(String entityPath, Receiver receiver, ReceiveLinkHandl
}
})
.subscribeWith(EmitterProcessor.create());
-
- this.subscriptions = Disposables.composite(
- this.handler.getEndpointStates().subscribe(
- state -> {
- logger.verbose("Connection state: {}", state);
- endpointStateSink.next(AmqpEndpointStateUtil.getConnectionState(state));
- }, error -> {
- logger.error("connectionId[{}] linkName[{}] entityPath[{}] Error occurred in connection.",
- handler.getConnectionId(), receiver.getName(), entityPath, error);
- endpointStateSink.error(error);
- dispose();
- }, () -> {
- endpointStateSink.next(AmqpEndpointState.CLOSED);
- dispose();
- }),
-
- this.handler.getErrors().subscribe(error -> {
- logger.error("connectionId[{}] linkName[{}] entityPath[{}] Error occurred in link.",
- handler.getConnectionId(), receiver.getName(), entityPath, error);
- endpointStateSink.error(error);
- dispose();
- }),
-
- this.tokenManager.getAuthorizationResults().subscribe(
- response -> {
- logger.verbose("Token refreshed: {}", response);
- hasAuthorized.set(true);
- }, error -> {
- logger.info("connectionId[{}], path[{}], linkName[{}] - tokenRenewalFailure[{}]",
- handler.getConnectionId(), this.entityPath, getLinkName(), error.getMessage());
- hasAuthorized.set(false);
- }, () -> hasAuthorized.set(false)));
+ this.endpointStates = this.handler.getEndpointStates()
+ .map(state -> {
+ logger.verbose("connectionId[{}], path[{}], linkName[{}]: State {}", handler.getConnectionId(),
+ entityPath, getLinkName(), state);
+ return AmqpEndpointStateUtil.getConnectionState(state);
+ })
+ .subscribeWith(ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED));
+
+ this.subscriptions = this.tokenManager.getAuthorizationResults().subscribe(
+ response -> {
+ logger.verbose("Token refreshed: {}", response);
+ hasAuthorized.set(true);
+ }, error -> {
+ logger.info("connectionId[{}], path[{}], linkName[{}] - tokenRenewalFailure[{}]",
+ handler.getConnectionId(), this.entityPath, getLinkName(), error.getMessage());
+ hasAuthorized.set(false);
+ }, () -> hasAuthorized.set(false));
}
@Override
public Flux getEndpointStates() {
- return endpointStates;
+ return endpointStates.distinct();
}
@Override
@@ -162,7 +145,6 @@ public void dispose() {
}
subscriptions.dispose();
- endpointStateSink.complete();
messagesProcessor.onComplete();
tokenManager.close();
receiver.close();
@@ -178,6 +160,41 @@ public void dispose() {
}
}
+ /**
+ * Disposes of the sender when an exception is encountered.
+ *
+ * @param condition Error condition associated with close operation.
+ */
+ void dispose(ErrorCondition condition) {
+ if (isDisposed.getAndSet(true)) {
+ return;
+ }
+
+ logger.verbose("connectionId[{}], path[{}], linkName[{}]: setting error condition {}",
+ handler.getConnectionId(), entityPath, getLinkName(), condition);
+
+ if (receiver.getLocalState() != EndpointState.CLOSED) {
+ receiver.close();
+
+ if (receiver.getCondition() == null) {
+ receiver.setCondition(condition);
+ }
+ }
+
+ try {
+ dispatcher.invoke(() -> {
+ receiver.free();
+ handler.close();
+ });
+ } catch (IOException e) {
+ logger.warning("Could not schedule disposing of receiver on ReactorDispatcher.", e);
+ handler.close();
+ }
+
+ messagesProcessor.onComplete();
+ tokenManager.close();
+ }
+
protected Message decodeDelivery(Delivery delivery) {
final int messageSize = delivery.pending();
final byte[] buffer = new byte[messageSize];
diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java
index 066c0196626b7..0d11b882ea4b0 100644
--- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java
+++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java
@@ -21,6 +21,7 @@
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transaction.Declared;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Sender;
@@ -29,7 +30,6 @@
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
-import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;
@@ -51,6 +51,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import static com.azure.core.amqp.implementation.ClientConstants.MAX_AMQP_HEADER_SIZE_BYTES;
+import static com.azure.core.amqp.implementation.ClientConstants.NOT_APPLICABLE;
import static com.azure.core.amqp.implementation.ClientConstants.SERVER_BUSY_BASE_SLEEP_TIME_IN_SECS;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -74,9 +75,7 @@ class ReactorSender implements AmqpSendLink {
private final PriorityQueue pendingSendsQueue =
new PriorityQueue<>(1000, new DeliveryTagComparator());
private final ClientLogger logger = new ClientLogger(ReactorSender.class);
- private final ReplayProcessor endpointStates =
- ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED);
- private FluxSink endpointStateSink = endpointStates.sink(FluxSink.OverflowStrategy.BUFFER);
+ private final ReplayProcessor endpointStates;
private final TokenManager tokenManager;
private final MessageSerializer messageSerializer;
@@ -101,43 +100,34 @@ class ReactorSender implements AmqpSendLink {
this.retry = retry;
this.timeout = timeout;
+ this.endpointStates = this.handler.getEndpointStates()
+ .map(state -> {
+ logger.verbose("connectionId[{}], path[{}], linkName[{}]: State {}", handler.getConnectionId(),
+ entityPath, getLinkName(), state);
+ this.hasConnected.set(state == EndpointState.ACTIVE);
+ return AmqpEndpointStateUtil.getConnectionState(state);
+ }).subscribeWith(ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED));
+
this.subscriptions = Disposables.composite(
this.handler.getDeliveredMessages().subscribe(this::processDeliveredMessage),
this.handler.getLinkCredits().subscribe(credit -> {
- logger.verbose("Credits on link: {}", credit);
+ logger.verbose("connectionId[{}], entityPath[{}], linkName[{}]: Credits on link: {}",
+ handler.getConnectionId(), entityPath, getLinkName(), credit);
this.scheduleWorkOnDispatcher();
- }),
-
- this.handler.getEndpointStates().subscribe(
- state -> {
- logger.verbose("[{}] Connection state: {}", entityPath, state);
- this.hasConnected.set(state == EndpointState.ACTIVE);
- endpointStateSink.next(AmqpEndpointStateUtil.getConnectionState(state));
- }, error -> {
- logger.error("[{}] Error occurred in sender endpoint handler.", entityPath, error);
- endpointStateSink.error(error);
- }, () -> {
- endpointStateSink.next(AmqpEndpointState.CLOSED);
- endpointStateSink.complete();
- hasConnected.set(false);
- }),
-
- this.handler.getErrors().subscribe(error -> {
- logger.error("[{}] Error occurred in sender error handler.", entityPath, error);
- endpointStateSink.error(error);
})
);
if (tokenManager != null) {
this.subscriptions.add(this.tokenManager.getAuthorizationResults().subscribe(
response -> {
- logger.verbose("Token refreshed: {}", response);
+ logger.verbose("connectionId[{}], entityPath[{}], linkName[{}]: Token refreshed: {}",
+ handler.getConnectionId(), entityPath, getLinkName(), response);
hasAuthorized.set(true);
},
error -> {
- logger.info("clientId[{}], path[{}], linkName[{}] - tokenRenewalFailure[{}]",
- handler.getConnectionId(), this.entityPath, getLinkName(), error.getMessage());
+ logger.info("connectionId[{}], entityPath[{}], linkName[{}]: tokenRenewalFailure[{}]",
+ handler.getConnectionId(), entityPath, getLinkName(), error.getMessage());
hasAuthorized.set(false);
}, () -> hasAuthorized.set(false)));
}
@@ -293,13 +283,35 @@ public boolean isDisposed() {
@Override
public void dispose() {
+ dispose(null);
+ }
+
+ /**
+ * Disposes of the sender when an exception is encountered.
+ *
+ * @param errorCondition Error condition associated with close operation.
+ */
+ void dispose(ErrorCondition errorCondition) {
if (isDisposed.getAndSet(true)) {
return;
}
subscriptions.dispose();
- endpointStateSink.complete();
tokenManager.close();
+
+ if (sender.getLocalState() == EndpointState.CLOSED) {
+ return;
+ }
+
+ logger.verbose("connectionId[{}], path[{}], linkName[{}]: setting error condition {}",
+ handler.getConnectionId(), entityPath, getLinkName(),
+ errorCondition != null ? errorCondition : NOT_APPLICABLE);
+
+ if (errorCondition != null && sender.getCondition() == null) {
+ sender.setCondition(errorCondition);
+ }
+
+ sender.close();
}
@Override
@@ -311,15 +323,9 @@ public Mono send(byte[] bytes, int arrayOffset, int messageFormat
}
private Mono validateEndpoint() {
- return Mono.defer(() -> {
- if (hasConnected.get()) {
- return Mono.empty();
- } else {
- return RetryUtil.withRetry(
- handler.getEndpointStates().takeUntil(state -> state == EndpointState.ACTIVE), timeout, retry)
- .then();
- }
- });
+ return Mono.defer(() -> RetryUtil.withRetry(
+ handler.getEndpointStates().takeUntil(state -> state == EndpointState.ACTIVE), timeout, retry)
+ .then());
}
/**
diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java
index 6330477d472e4..c05e70c5e5498 100644
--- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java
+++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java
@@ -17,16 +17,16 @@
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transaction.Coordinator;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import reactor.core.Disposable;
-import reactor.core.Disposables;
import reactor.core.publisher.Flux;
-import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;
@@ -38,6 +38,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import static com.azure.core.amqp.implementation.ClientConstants.NOT_APPLICABLE;
+
/**
* Represents an AMQP session using proton-j reactor.
*/
@@ -49,9 +51,7 @@ public class ReactorSession implements AmqpSession {
private final AtomicBoolean isDisposed = new AtomicBoolean();
private final ClientLogger logger = new ClientLogger(ReactorSession.class);
- private final ReplayProcessor endpointStates =
- ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED);
- private FluxSink endpointStateSink = endpointStates.sink(FluxSink.OverflowStrategy.BUFFER);
+ private final ReplayProcessor endpointStates;
private final Session session;
private final SessionHandler sessionHandler;
@@ -61,14 +61,13 @@ public class ReactorSession implements AmqpSession {
private final MessageSerializer messageSerializer;
private final Duration openTimeout;
- private final Disposable.Composite subscriptions;
private final ReactorHandlerProvider handlerProvider;
private final Mono cbsNodeSupplier;
private final AtomicReference> coordinatorLink = new AtomicReference<>();
private final AtomicReference transactionCoordinator = new AtomicReference<>();
- private AmqpRetryPolicy retryPolicy;
+ private final AmqpRetryPolicy retryPolicy;
/**
* Creates a new AMQP session using proton-j.
@@ -98,27 +97,13 @@ public ReactorSession(Session session, SessionHandler sessionHandler, String ses
this.messageSerializer = messageSerializer;
this.openTimeout = openTimeout;
this.retryPolicy = retryPolicy;
-
- this.subscriptions = Disposables.composite(
- this.sessionHandler.getEndpointStates().subscribe(
- state -> {
- logger.verbose("Connection state: {}", state);
- endpointStateSink.next(AmqpEndpointStateUtil.getConnectionState(state));
- }, error -> {
- logger.error("[{}] Error occurred in session endpoint handler.", sessionName, error);
- endpointStateSink.error(error);
- dispose();
- }, () -> {
- endpointStateSink.next(AmqpEndpointState.CLOSED);
- endpointStateSink.complete();
- dispose();
- }),
-
- this.sessionHandler.getErrors().subscribe(error -> {
- logger.error("[{}] Error occurred in session error handler.", sessionName, error);
- endpointStateSink.error(error);
- dispose();
- }));
+ this.endpointStates = sessionHandler.getEndpointStates()
+ .map(state -> {
+ logger.verbose("connectionId[{}], sessionName[{}]: State ", sessionHandler.getConnectionId(),
+ sessionName, state);
+ return AmqpEndpointStateUtil.getConnectionState(state);
+ })
+ .subscribeWith(ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED));
session.open();
}
@@ -142,20 +127,27 @@ public boolean isDisposed() {
*/
@Override
public void dispose() {
+ dispose(null);
+ }
+
+ void dispose(ErrorCondition errorCondition) {
if (isDisposed.getAndSet(true)) {
return;
}
- logger.info("sessionId[{}]: Disposing of session.", sessionName);
+ logger.info("connectionId[{}], sessionId[{}], errorCondition[{}]: Disposing of session.",
+ sessionHandler.getConnectionId(), sessionName, errorCondition != null ? errorCondition : NOT_APPLICABLE);
- session.close();
- subscriptions.dispose();
+ if (session.getLocalState() != EndpointState.CLOSED) {
+ session.close();
- openReceiveLinks.forEach((key, link) -> link.dispose());
- openReceiveLinks.clear();
+ if (session.getCondition() == null) {
+ session.setCondition(errorCondition);
+ }
+ }
- openSendLinks.forEach((key, link) -> link.dispose());
- openSendLinks.clear();
+ openReceiveLinks.forEach((key, link) -> link.dispose(errorCondition));
+ openSendLinks.forEach((key, link) -> link.dispose(errorCondition));
}
/**
@@ -174,7 +166,6 @@ public Duration getOperationTimeout() {
return openTimeout;
}
-
/**
* {@inheritDoc}
*/
@@ -283,7 +274,7 @@ private Mono createCoordinatorSendLink(Duration timeout, AmqpRetry
} else {
logger.info("linkName[{}]: Another coordinator send link exists. Disposing of new one.",
TRANSACTION_LINK_NAME);
- linkSubscription.dispose();
+ linkSubscription.dispose(null);
}
sink.success(coordinatorLink.get().getLink());
@@ -333,7 +324,7 @@ private boolean removeLink(ConcurrentMap removed = openLinks.remove(key);
if (removed != null) {
- removed.dispose();
+ removed.dispose(null);
}
return removed != null;
@@ -361,8 +352,8 @@ private boolean removeLink(ConcurrentMap createConsumer(String linkName, String entityPath, Duration timeout,
- AmqpRetryPolicy retry, Map sourceFilters,
- Map receiverProperties, Symbol[] receiverDesiredCapabilities, SenderSettleMode senderSettleMode,
+ AmqpRetryPolicy retry, Map sourceFilters, Map receiverProperties,
+ Symbol[] receiverDesiredCapabilities, SenderSettleMode senderSettleMode,
ReceiverSettleMode receiverSettleMode) {
if (isDisposed()) {
@@ -445,7 +436,7 @@ protected Mono createProducer(String linkName, String entityPath, Dura
return RetryUtil.withRetry(
getEndpointStates().takeUntil(state -> state == AmqpEndpointState.ACTIVE),
- timeout, retry).then(tokenManager.authorize()).then(Mono.create(sink -> {
+ timeout, retry).then(tokenManager.authorize()).then(Mono.create(sink -> {
try {
// We have to invoke this in the same thread or else proton-j will not properly link up the created
// sender because the link names are not unique. Link name == entity path.
@@ -572,7 +563,7 @@ private LinkSubscription getSubscription(String linkName, Strin
return new LinkSubscription<>(reactorReceiver, subscription);
}
- private static final class LinkSubscription implements Disposable {
+ private static final class LinkSubscription {
private final AtomicBoolean isDisposed = new AtomicBoolean();
private final T link;
private final Disposable subscription;
@@ -582,22 +573,26 @@ private LinkSubscription(T link, Disposable subscription) {
this.subscription = subscription;
}
- public Disposable getSubscription() {
- return subscription;
- }
-
public T getLink() {
return link;
}
- @Override
- public void dispose() {
+ void dispose(ErrorCondition errorCondition) {
if (isDisposed.getAndSet(true)) {
return;
}
+ if (link instanceof ReactorReceiver) {
+ final ReactorReceiver reactorReceiver = (ReactorReceiver) link;
+ reactorReceiver.dispose(errorCondition);
+ } else if (link instanceof ReactorSender) {
+ final ReactorSender reactorSender = (ReactorSender) link;
+ reactorSender.dispose(errorCondition);
+ } else {
+ link.dispose();
+ }
+
subscription.dispose();
- link.dispose();
}
}
}
diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseChannel.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseChannel.java
index 0bb755be2a841..d2f7be85eb828 100644
--- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseChannel.java
+++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseChannel.java
@@ -138,12 +138,10 @@ protected RequestResponseChannel(String connectionId, String fullyQualifiedNames
receiveLinkHandler.getEndpointStates().subscribe(
state -> endpointStatesSink.next(AmqpEndpointStateUtil.getConnectionState(state)),
this::handleError, this::dispose),
- receiveLinkHandler.getErrors().subscribe(this::handleError),
sendLinkHandler.getEndpointStates().subscribe(state ->
endpointStatesSink.next(AmqpEndpointStateUtil.getConnectionState(state)),
- this::handleError, this::dispose),
- sendLinkHandler.getErrors().subscribe(this::handleError)
+ this::handleError, this::dispose)
);
//@formatter:on
diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ConnectionHandler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ConnectionHandler.java
index 0858cd22bf442..4c73984d1f3ad 100644
--- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ConnectionHandler.java
+++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ConnectionHandler.java
@@ -152,7 +152,6 @@ public void onTransportError(Event event) {
if (connection != null) {
notifyErrorContext(connection, condition);
- onNext(connection.getRemoteState());
}
// onTransportError event is not handled by the global IO Handler for cleanup
@@ -172,7 +171,6 @@ public void onTransportClosed(Event event) {
if (connection != null) {
notifyErrorContext(connection, condition);
- onNext(connection.getRemoteState());
}
}
@@ -208,8 +206,6 @@ public void onConnectionLocalClose(Event event) {
transport.unbind(); // we proactively dispose IO even if service fails to close
}
}
-
- onNext(connection.getRemoteState());
}
@Override
@@ -218,9 +214,11 @@ public void onConnectionRemoteClose(Event event) {
final ErrorCondition error = connection.getRemoteCondition();
logErrorCondition("onConnectionRemoteClose", connection, error);
-
- onNext(connection.getRemoteState());
- notifyErrorContext(connection, error);
+ if (error == null) {
+ onNext(connection.getRemoteState());
+ } else {
+ notifyErrorContext(connection, error);
+ }
}
@Override
@@ -262,7 +260,7 @@ private void notifyErrorContext(Connection connection, ErrorCondition condition)
final Throwable exception = ExceptionUtil.toException(condition.getCondition().toString(),
condition.getDescription(), getErrorContext());
- onNext(exception);
+ onError(exception);
}
private void logErrorCondition(String eventName, Connection connection, ErrorCondition error) {
diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/CustomIOHandler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/CustomIOHandler.java
index d000bdb447173..1258d76987835 100644
--- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/CustomIOHandler.java
+++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/CustomIOHandler.java
@@ -9,6 +9,8 @@
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.reactor.impl.IOHandler;
+import static com.azure.core.amqp.implementation.ClientConstants.NOT_APPLICABLE;
+
public class CustomIOHandler extends IOHandler {
private final ClientLogger logger = new ClientLogger(CustomIOHandler.class);
private final String connectionId;
@@ -23,7 +25,7 @@ public void onTransportClosed(Event event) {
final Connection connection = event.getConnection();
logger.info("onTransportClosed connectionId[{}], hostname[{}]",
- connectionId, (connection != null ? connection.getHostname() : "n/a"));
+ connectionId, (connection != null ? connection.getHostname() : NOT_APPLICABLE));
if (transport != null && connection != null && connection.getTransport() != null) {
transport.unbind();
diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/DispatchHandler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/DispatchHandler.java
index 23293da62c2f9..75b7ec0d330d3 100644
--- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/DispatchHandler.java
+++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/DispatchHandler.java
@@ -23,8 +23,7 @@ public class DispatchHandler extends BaseHandler {
* @param work The work to run on the {@link Reactor}.
*/
public DispatchHandler(Runnable work) {
- Objects.requireNonNull(work);
- this.work = work;
+ this.work = Objects.requireNonNull(work, "'work' cannot be null.");
}
/**
@@ -32,7 +31,7 @@ public DispatchHandler(Runnable work) {
*/
@Override
public void onTimerTask(Event e) {
- logger.verbose("Running task for event: %s", e);
+ logger.verbose("Running task for event: {}", e);
this.work.run();
}
}
diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/Handler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/Handler.java
index c68f3598a81ff..4ddbde2218a9b 100644
--- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/Handler.java
+++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/Handler.java
@@ -8,16 +8,15 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.ReplayProcessor;
-import reactor.core.publisher.UnicastProcessor;
import java.io.Closeable;
+import java.util.concurrent.atomic.AtomicBoolean;
public abstract class Handler extends BaseHandler implements Closeable {
+ private final AtomicBoolean isTerminal = new AtomicBoolean();
private final ReplayProcessor endpointStateProcessor =
ReplayProcessor.cacheLastOrDefault(EndpointState.UNINITIALIZED);
- private final UnicastProcessor errorContextProcessor = UnicastProcessor.create();
private final FluxSink endpointSink = endpointStateProcessor.sink();
- private final FluxSink errorSink = errorContextProcessor.sink();
private final String connectionId;
private final String hostname;
@@ -38,25 +37,26 @@ public Flux getEndpointStates() {
return endpointStateProcessor.distinct();
}
- public Flux getErrors() {
- return errorContextProcessor;
- }
-
void onNext(EndpointState state) {
endpointSink.next(state);
+ }
- if (state == EndpointState.CLOSED) {
- endpointSink.complete();
+ void onError(Throwable error) {
+ if (isTerminal.getAndSet(true)) {
+ return;
}
- }
- void onNext(Throwable context) {
- errorSink.next(context);
+ endpointSink.next(EndpointState.CLOSED);
+ endpointSink.error(error);
}
@Override
public void close() {
+ if (isTerminal.getAndSet(true)) {
+ return;
+ }
+
+ endpointSink.next(EndpointState.CLOSED);
endpointSink.complete();
- errorSink.complete();
}
}
diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/LinkHandler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/LinkHandler.java
index e347bcc293bf2..8ea445cb91ed2 100644
--- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/LinkHandler.java
+++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/LinkHandler.java
@@ -5,7 +5,6 @@
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.LinkErrorContext;
-import com.azure.core.amqp.implementation.ClientConstants;
import com.azure.core.amqp.implementation.ExceptionUtil;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
@@ -14,11 +13,11 @@
import org.apache.qpid.proton.engine.Link;
import static com.azure.core.amqp.implementation.AmqpErrorCode.TRACKING_ID_PROPERTY;
+import static com.azure.core.amqp.implementation.ClientConstants.NOT_APPLICABLE;
abstract class LinkHandler extends Handler {
-
private final String entityPath;
- ClientLogger logger;
+ final ClientLogger logger;
LinkHandler(String connectionId, String hostname, String entityPath, ClientLogger logger) {
super(connectionId, hostname);
@@ -32,9 +31,10 @@ public void onLinkLocalClose(Event event) {
final ErrorCondition condition = link.getCondition();
logger.info("onLinkLocalClose connectionId[{}], linkName[{}], errorCondition[{}], errorDescription[{}]",
- getConnectionId(), link.getName(),
- condition != null ? condition.getCondition() : ClientConstants.NOT_APPLICABLE,
- condition != null ? condition.getDescription() : ClientConstants.NOT_APPLICABLE);
+ getConnectionId(),
+ link.getName(),
+ condition != null ? condition.getCondition() : NOT_APPLICABLE,
+ condition != null ? condition.getDescription() : NOT_APPLICABLE);
}
@Override
@@ -44,8 +44,8 @@ public void onLinkRemoteClose(Event event) {
logger.info("onLinkRemoteClose connectionId[{}], linkName[{}], errorCondition[{}], errorDescription[{}]",
getConnectionId(), link.getName(),
- condition != null ? condition.getCondition() : ClientConstants.NOT_APPLICABLE,
- condition != null ? condition.getDescription() : ClientConstants.NOT_APPLICABLE);
+ condition != null ? condition.getCondition() : NOT_APPLICABLE,
+ condition != null ? condition.getDescription() : NOT_APPLICABLE);
handleRemoteLinkClosed(event);
}
@@ -57,15 +57,18 @@ public void onLinkRemoteDetach(Event event) {
logger.info("onLinkRemoteClose connectionId[{}], linkName[{}], errorCondition[{}], errorDescription[{}]",
getConnectionId(), link.getName(),
- condition != null ? condition.getCondition() : ClientConstants.NOT_APPLICABLE,
- condition != null ? condition.getDescription() : ClientConstants.NOT_APPLICABLE);
+ condition != null ? condition.getCondition() : NOT_APPLICABLE,
+ condition != null ? condition.getDescription() : NOT_APPLICABLE);
handleRemoteLinkClosed(event);
}
@Override
public void onLinkFinal(Event event) {
- logger.info("onLinkFinal connectionId[{}], linkName[{}]", getConnectionId(), event.getLink().getName());
+ final String linkName = event != null && event.getLink() != null
+ ? event.getLink().getName()
+ : NOT_APPLICABLE;
+ logger.info("onLinkFinal connectionId[{}], linkName[{}]", getConnectionId(), linkName);
close();
}
@@ -80,22 +83,6 @@ public AmqpErrorContext getErrorContext(Link link) {
return new LinkErrorContext(getHostname(), entityPath, referenceId, link.getCredit());
}
- private void processOnClose(Link link, ErrorCondition condition) {
- logger.info("processOnClose connectionId[{}], linkName[{}], errorCondition[{}], errorDescription[{}]",
- getConnectionId(), link.getName(),
- condition != null ? condition.getCondition() : ClientConstants.NOT_APPLICABLE,
- condition != null ? condition.getDescription() : ClientConstants.NOT_APPLICABLE);
-
- if (condition != null && condition.getCondition() != null) {
- final Throwable exception = ExceptionUtil.toException(condition.getCondition().toString(),
- condition.getDescription(), getErrorContext(link));
-
- onNext(exception);
- }
-
- onNext(EndpointState.CLOSED);
- }
-
private void handleRemoteLinkClosed(final Event event) {
final Link link = event.getLink();
final ErrorCondition condition = link.getRemoteCondition();
@@ -105,6 +92,18 @@ private void handleRemoteLinkClosed(final Event event) {
link.close();
}
- processOnClose(link, condition);
+ logger.info("processOnClose connectionId[{}], linkName[{}], errorCondition[{}], errorDescription[{}]",
+ getConnectionId(), link.getName(),
+ condition != null ? condition.getCondition() : NOT_APPLICABLE,
+ condition != null ? condition.getDescription() : NOT_APPLICABLE);
+
+ if (condition != null && condition.getCondition() != null) {
+ final Throwable exception = ExceptionUtil.toException(condition.getCondition().toString(),
+ condition.getDescription(), getErrorContext(link));
+
+ onError(exception);
+ } else {
+ close();
+ }
}
}
diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ReceiveLinkHandler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ReceiveLinkHandler.java
index 0017fed355f93..f52756a50d9a4 100644
--- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ReceiveLinkHandler.java
+++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ReceiveLinkHandler.java
@@ -4,9 +4,6 @@
package com.azure.core.amqp.implementation.handler;
import com.azure.core.util.logging.ClientLogger;
-import java.util.Collections;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
@@ -17,20 +14,25 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
public class ReceiveLinkHandler extends LinkHandler {
private final String linkName;
- private AtomicBoolean isFirstResponse = new AtomicBoolean(true);
+ private final AtomicBoolean isFirstResponse = new AtomicBoolean(true);
private final DirectProcessor deliveries;
- private FluxSink deliverySink;
- private Set queuedDeliveries = Collections.newSetFromMap(new ConcurrentHashMap<>());
+ private final FluxSink deliverySink;
+ private final Set queuedDeliveries = Collections.newSetFromMap(new ConcurrentHashMap<>());
+ private final String entityPath;
public ReceiveLinkHandler(String connectionId, String hostname, String linkName, String entityPath) {
super(connectionId, hostname, entityPath, new ClientLogger(ReceiveLinkHandler.class));
this.deliveries = DirectProcessor.create();
this.deliverySink = deliveries.sink(FluxSink.OverflowStrategy.BUFFER);
this.linkName = linkName;
+ this.entityPath = entityPath;
}
public String getLinkName() {
@@ -38,11 +40,9 @@ public String getLinkName() {
}
public Flux getDeliveredMessages() {
- return deliveries
- .doOnNext(this::removeQueuedDelivery);
+ return deliveries.doOnNext(delivery -> queuedDeliveries.remove(delivery));
}
-
@Override
public void close() {
deliverySink.complete();
@@ -60,26 +60,28 @@ public void close() {
public void onLinkLocalOpen(Event event) {
final Link link = event.getLink();
if (link instanceof Receiver) {
- logger.info("onLinkLocalOpen connectionId[{}], linkName[{}], localSource[{}]",
- getConnectionId(), link.getName(), link.getSource());
+ logger.info("onLinkLocalOpen connectionId[{}], entityPath[{}], linkName[{}], localSource[{}]",
+ getConnectionId(), entityPath, link.getName(), link.getSource());
}
}
@Override
public void onLinkRemoteOpen(Event event) {
final Link link = event.getLink();
- if (link instanceof Receiver) {
- if (link.getRemoteSource() != null) {
- logger.info("onLinkRemoteOpen connectionId[{}], linkName[{}], remoteSource[{}]",
- getConnectionId(), link.getName(), link.getRemoteSource());
+ if (!(link instanceof Receiver)) {
+ return;
+ }
- if (isFirstResponse.getAndSet(false)) {
- onNext(EndpointState.ACTIVE);
- }
- } else {
- logger.info("onLinkRemoteOpen connectionId[{}], linkName[{}], action[waitingForError]",
- getConnectionId(), link.getName());
+ if (link.getRemoteSource() != null) {
+ logger.info("onLinkRemoteOpen connectionId[{}], entityPath[{}], linkName[{}], remoteSource[{}]",
+ getConnectionId(), entityPath, link.getName(), link.getRemoteSource());
+
+ if (isFirstResponse.getAndSet(false)) {
+ onNext(EndpointState.ACTIVE);
}
+ } else {
+ logger.info("onLinkRemoteOpen connectionId[{}], entityPath[{}], linkName[{}], action[waitingForError]",
+ getConnectionId(), entityPath, link.getName());
}
}
@@ -102,9 +104,9 @@ public void onDelivery(Event event) {
// before we fix proton-j - this work around ensures that we ignore the duplicate Delivery event
if (delivery.isSettled()) {
if (link != null) {
- logger.verbose("onDelivery connectionId[{}], linkName[{}], updatedLinkCredit[{}], remoteCredit[{}],"
- + " remoteCondition[{}], delivery.isSettled[{}]",
- getConnectionId(), link.getName(), link.getCredit(), link.getRemoteCredit(),
+ logger.verbose("onDelivery connectionId[{}], entityPath[{}], linkName[{}], updatedLinkCredit[{}],"
+ + " remoteCredit[{}], remoteCondition[{}], delivery.isSettled[{}]",
+ getConnectionId(), entityPath, link.getName(), link.getCredit(), link.getRemoteCredit(),
link.getRemoteCondition(), delivery.isSettled());
} else {
logger.warning("connectionId[{}], delivery.isSettled[{}]", getConnectionId(), delivery.isSettled());
@@ -126,20 +128,16 @@ public void onDelivery(Event event) {
}
if (link != null) {
- logger.verbose("onDelivery connectionId[{}], linkName[{}], updatedLinkCredit[{}], remoteCredit[{}],"
- + " remoteCondition[{}], delivery.isPartial[{}]",
- getConnectionId(), link.getName(), link.getCredit(), link.getRemoteCredit(), link.getRemoteCondition(),
- delivery.isPartial());
+ logger.verbose("onDelivery connectionId[{}], entityPath[{}], linkName[{}], updatedLinkCredit[{}],"
+ + "remoteCredit[{}], remoteCondition[{}], delivery.isPartial[{}]",
+ getConnectionId(), entityPath, link.getName(), link.getCredit(), link.getRemoteCredit(),
+ link.getRemoteCondition(), delivery.isPartial());
}
}
@Override
public void onLinkRemoteClose(Event event) {
- super.onLinkRemoteClose(event);
deliverySink.complete();
- }
-
- private void removeQueuedDelivery(Delivery delivery) {
- queuedDeliveries.remove(delivery);
+ super.onLinkRemoteClose(event);
}
}
diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SendLinkHandler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SendLinkHandler.java
index 1d92def80e0f7..25e1fd3535192 100644
--- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SendLinkHandler.java
+++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SendLinkHandler.java
@@ -18,16 +18,22 @@
import java.util.concurrent.atomic.AtomicBoolean;
public class SendLinkHandler extends LinkHandler {
- private final String senderName;
+ private final String linkName;
+ private final String entityPath;
private final AtomicBoolean isFirstFlow = new AtomicBoolean(true);
private final UnicastProcessor creditProcessor = UnicastProcessor.create();
private final DirectProcessor deliveryProcessor = DirectProcessor.create();
private final FluxSink creditSink = creditProcessor.sink();
private final FluxSink deliverySink = deliveryProcessor.sink();
- public SendLinkHandler(String connectionId, String hostname, String senderName, String entityPath) {
+ public SendLinkHandler(String connectionId, String hostname, String linkName, String entityPath) {
super(connectionId, hostname, entityPath, new ClientLogger(SendLinkHandler.class));
- this.senderName = senderName;
+ this.linkName = linkName;
+ this.entityPath = entityPath;
+ }
+
+ public String getLinkName() {
+ return linkName;
}
public Flux getLinkCredits() {
@@ -49,27 +55,29 @@ public void close() {
public void onLinkLocalOpen(Event event) {
final Link link = event.getLink();
if (link instanceof Sender) {
- logger.verbose("onLinkLocalOpen connectionId[{}], linkName[{}], localTarget[{}]",
- getConnectionId(), link.getName(), link.getTarget());
+ logger.verbose("onLinkLocalOpen connectionId[{}], entityPath[{}], linkName[{}], localTarget[{}]",
+ getConnectionId(), entityPath, link.getName(), link.getTarget());
}
}
@Override
public void onLinkRemoteOpen(Event event) {
final Link link = event.getLink();
- if (link instanceof Sender) {
- if (link.getRemoteTarget() != null) {
- logger.info("onLinkRemoteOpen connectionId[{}], linkName[{}], remoteTarget[{}]",
- getConnectionId(), link.getName(), link.getRemoteTarget());
-
- if (isFirstFlow.getAndSet(false)) {
- onNext(EndpointState.ACTIVE);
- }
- } else {
- logger.info("onLinkRemoteOpen connectionId[{}], linkName[{}], remoteTarget[null], remoteSource[null], "
- + "action[waitingForError]",
- getConnectionId(), link.getName());
+ if (!(link instanceof Sender)) {
+ return;
+ }
+
+ if (link.getRemoteTarget() != null) {
+ logger.info("onLinkRemoteOpen connectionId[{}], entityPath[{}], linkName[{}], remoteTarget[{}]",
+ getConnectionId(), entityPath, link.getName(), link.getRemoteTarget());
+
+ if (isFirstFlow.getAndSet(false)) {
+ onNext(EndpointState.ACTIVE);
}
+ } else {
+ logger.info("onLinkRemoteOpen connectionId[{}], entityPath[{}], linkName[{}], remoteTarget[null],"
+ + " remoteSource[null], action[waitingForError]",
+ getConnectionId(), entityPath, link.getName());
}
}
@@ -82,8 +90,8 @@ public void onLinkFlow(Event event) {
final Sender sender = event.getSender();
creditSink.next(sender.getRemoteCredit());
- logger.verbose("onLinkFlow connectionId[{}], linkName[{}], unsettled[{}], credit[{}]",
- getConnectionId(), sender.getName(), sender.getUnsettled(), sender.getCredit());
+ logger.verbose("onLinkFlow connectionId[{}], entityPath[{}], linkName[{}], unsettled[{}], credit[{}]",
+ getConnectionId(), entityPath, sender.getName(), sender.getUnsettled(), sender.getCredit());
}
@Override
@@ -93,9 +101,9 @@ public void onDelivery(Event event) {
while (delivery != null) {
Sender sender = (Sender) delivery.getLink();
- logger.verbose("onDelivery connectionId[{}], linkName[{}], unsettled[{}], credit[{}], deliveryState[{}], "
- + "delivery.isBuffered[{}], delivery.id[{}]",
- getConnectionId(), sender.getName(), sender.getUnsettled(), sender.getRemoteCredit(),
+ logger.verbose("onDelivery connectionId[{}], entityPath[{}], linkName[{}], unsettled[{}], credit[{}],"
+ + " deliveryState[{}], delivery.isBuffered[{}], delivery.id[{}]",
+ getConnectionId(), entityPath, sender.getName(), sender.getUnsettled(), sender.getRemoteCredit(),
delivery.getRemoteState(), delivery.isBuffered(), new String(delivery.getTag(),
StandardCharsets.UTF_8));
diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SessionHandler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SessionHandler.java
index 4c9b47d2beb62..0fb9aee7786b4 100644
--- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SessionHandler.java
+++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SessionHandler.java
@@ -63,20 +63,23 @@ public void onSessionLocalOpen(Event e) {
getConnectionId(), this.entityName, ioException.getMessage());
final Throwable exception = new AmqpException(false, message, ioException, getErrorContext());
- onNext(exception);
+ onError(exception);
}
}
@Override
public void onSessionRemoteOpen(Event e) {
final Session session = e.getSession();
-
- logger.info(
- "onSessionRemoteOpen connectionId[{}], entityName[{}], sessionIncCapacity[{}], sessionOutgoingWindow[{}]",
- getConnectionId(), entityName, session.getIncomingCapacity(), session.getOutgoingWindow());
-
if (session.getLocalState() == EndpointState.UNINITIALIZED) {
+ logger.warning("onSessionRemoteOpen connectionId[{}], entityName[{}], sessionIncCapacity[{}],"
+ + " sessionOutgoingWindow[{}] endpoint was uninitialised.",
+ getConnectionId(), entityName, session.getIncomingCapacity(), session.getOutgoingWindow());
+
session.open();
+ } else {
+ logger.info("onSessionRemoteOpen connectionId[{}], entityName[{}], sessionIncCapacity[{}],"
+ + " sessionOutgoingWindow[{}]",
+ getConnectionId(), entityName, session.getIncomingCapacity(), session.getOutgoingWindow());
}
onNext(EndpointState.ACTIVE);
@@ -117,26 +120,25 @@ public void onSessionRemoteClose(Event e) {
session.close();
}
- onNext(EndpointState.CLOSED);
-
- if (condition != null) {
+ if (condition == null) {
+ onNext(EndpointState.CLOSED);
+ } else {
final String id = getConnectionId();
final AmqpErrorContext context = getErrorContext();
final Exception exception;
if (condition.getCondition() == null) {
- exception = new AmqpException(false,
- String.format(Locale.US,
+ exception = new AmqpException(false, String.format(Locale.US,
"onSessionRemoteClose connectionId[%s], entityName[%s], condition[%s]", id, entityName,
condition),
context);
} else {
- exception = ExceptionUtil.toException(condition.getCondition().toString(),
- String.format(Locale.US, "onSessionRemoteClose connectionId[%s], entityName[%s]", id,
+ exception = ExceptionUtil.toException(condition.getCondition().toString(), String.format(Locale.US,
+ "onSessionRemoteClose connectionId[%s], entityName[%s]", id,
entityName),
context);
}
- onNext(exception);
+ onError(exception);
}
}
diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpAnnotatedMessage.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpAnnotatedMessage.java
new file mode 100644
index 0000000000000..a07e930123180
--- /dev/null
+++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpAnnotatedMessage.java
@@ -0,0 +1,123 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.core.amqp.models;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The representation of message as defined by AMQP protocol.
+ *
+ * @see
+ * Amqp Message Format.
+ */
+public final class AmqpAnnotatedMessage {
+ private final AmqpMessageBody amqpMessageBody;
+ private final Map applicationProperties;
+ private final Map deliveryAnnotations;
+ private final Map messageAnnotations;
+ private final Map footer;
+ private final AmqpMessageHeader header;
+ private final AmqpMessageProperties properties;
+
+ /**
+ * Creates instance of {@link AmqpAnnotatedMessage} with given {@link AmqpMessageBody}.
+ *
+ * @param body to be set on amqp message.
+ *
+ * @throws NullPointerException if {@code body} is null.
+ */
+ public AmqpAnnotatedMessage(AmqpMessageBody body) {
+ amqpMessageBody = Objects.requireNonNull(body, "'body' cannot be null.");
+
+ applicationProperties = new HashMap<>();
+ deliveryAnnotations = new HashMap<>();
+ messageAnnotations = new HashMap<>();
+ footer = new HashMap<>();
+ header = new AmqpMessageHeader();
+ properties = new AmqpMessageProperties();
+ }
+
+ /**
+ * Creates instance of {@link AmqpAnnotatedMessage} with given {@link AmqpAnnotatedMessage} instance.
+ *
+ * @param message used to create another instance of {@link AmqpAnnotatedMessage}.
+ *
+ * @throws NullPointerException if {@code message} or {@link AmqpAnnotatedMessage#getBody() body} is null.
+ */
+ public AmqpAnnotatedMessage(AmqpAnnotatedMessage message) {
+ Objects.requireNonNull(message, "'message' cannot be null.");
+ amqpMessageBody = Objects.requireNonNull(message.getBody(), "'message.body' cannot be null.");
+ applicationProperties = new HashMap<>(message.getApplicationProperties());
+ deliveryAnnotations = new HashMap<>(message.getDeliveryAnnotations());
+ messageAnnotations = new HashMap<>(message.getMessageAnnotations());
+ footer = new HashMap<>(message.getFooter());
+ header = new AmqpMessageHeader(message.getHeader());
+ properties = new AmqpMessageProperties(message.getProperties());
+ }
+
+ /**
+ * Gets the {@link Map} of application properties.
+ *
+ * @return The application properties.
+ */
+ public Map getApplicationProperties() {
+ return applicationProperties;
+ }
+
+ /**
+ * Gets the {@link AmqpMessageBody} of an amqp message.
+ *
+ * @return the {@link AmqpMessageBody} object.
+ */
+ public AmqpMessageBody getBody() {
+ return amqpMessageBody;
+ }
+
+ /**
+ * Gets the {@link Map} representation of delivery annotations defined on an amqp message.
+ *
+ * @return the {@link Map} representation of delivery annotations.
+ */
+ public Map getDeliveryAnnotations() {
+ return deliveryAnnotations;
+ }
+
+ /**
+ * Gets the {@link Map} representation of footer defined on an amqp message.
+ *
+ * @return the {@link Map} representation of footer.
+ */
+ public Map getFooter() {
+ return footer;
+ }
+
+ /**
+ * Gets the {@link AmqpMessageHeader} defined on an amqp message.
+ *
+ * @return the {@link AmqpMessageHeader} object.
+ */
+ public AmqpMessageHeader getHeader() {
+ return header;
+ }
+
+ /**
+ * Gets the {@link Map} representation of message annotations defined on an amqp message.
+ *
+ * @return the {@link Map} representation of message annotations.
+ */
+ public Map getMessageAnnotations() {
+ return messageAnnotations;
+ }
+
+ /**
+ * Gets the {@link AmqpMessageProperties} defined on an amqp message.
+ *
+ * @return the {@link AmqpMessageProperties} object.
+ */
+ public AmqpMessageProperties getProperties() {
+ return properties;
+ }
+}
diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpBodyType.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpBodyType.java
new file mode 100644
index 0000000000000..80c66af356087
--- /dev/null
+++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpBodyType.java
@@ -0,0 +1,23 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.core.amqp.models;
+
+/**
+ * All AmqpBodyType available for AMQP Message.
+ */
+public enum AmqpBodyType {
+ /**
+ * Represent Amqp Data type
+ */
+ DATA,
+ /**
+ * Represent Amqp Value type
+ */
+ VALUE,
+ /**
+ * Represent Amqp Sequence type
+ */
+ SEQUENCE;
+
+}
diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpDataBody.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpDataBody.java
new file mode 100644
index 0000000000000..a71b65f45d618
--- /dev/null
+++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpDataBody.java
@@ -0,0 +1,41 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.core.amqp.models;
+
+import com.azure.core.util.IterableStream;
+
+import java.util.Objects;
+
+/**
+ * This is amqp message body which represents {@link AmqpBodyType#DATA} type.
+ */
+public final class AmqpDataBody implements AmqpMessageBody {
+ private final IterableStream data;
+
+ /**
+ * Creates instance of {@link AmqpDataBody} with given {@link Iterable} of {@link BinaryData}.
+ *
+ * @param data to be set on amqp body.
+ *
+ * @throws NullPointerException if {@code data} is null.
+ */
+ public AmqpDataBody(Iterable data) {
+ Objects.requireNonNull(data, "'data' cannot be null.");
+ this.data = new IterableStream<>(data);
+ }
+
+ @Override
+ public AmqpBodyType getBodyType() {
+ return AmqpBodyType.DATA;
+ }
+
+ /**
+ * Gets {@link BinaryData} set on this {@link AmqpDataBody}.
+ *
+ * @return data set on {@link AmqpDataBody}.
+ */
+ public IterableStream getData() {
+ return data;
+ }
+}
diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageBody.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageBody.java
new file mode 100644
index 0000000000000..a984fb566720d
--- /dev/null
+++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageBody.java
@@ -0,0 +1,16 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.core.amqp.models;
+
+/**
+ * Interface representing Amqp Message Body.
+ */
+public interface AmqpMessageBody {
+ /**
+ * Type representing various supported amqp body types.
+ *
+ * @return The {@link AmqpBodyType}.
+ */
+ AmqpBodyType getBodyType();
+}
diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageHeader.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageHeader.java
new file mode 100644
index 0000000000000..e15a482558f3a
--- /dev/null
+++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageHeader.java
@@ -0,0 +1,138 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.core.amqp.models;
+
+import com.azure.core.annotation.Fluent;
+
+import java.time.Duration;
+import java.util.Objects;
+
+/**
+ * The representation of message header as defined by AMQP protocol.
+ * @see
+ * Amqp Message Format.
+ */
+@Fluent
+public class AmqpMessageHeader {
+
+ private Long deliveryCount;
+ private Boolean durable;
+ private Boolean firstAcquirer;
+ private Short priority;
+ private Duration timeToLive;
+
+ AmqpMessageHeader() {
+ // This class does not have any public constructors, and is not able to be instantiated using 'new'.
+ }
+
+ /**
+ * The constructor is used to clone the values.
+ */
+ AmqpMessageHeader(AmqpMessageHeader header) {
+ super();
+ Objects.requireNonNull(header, "'header' cannot be null.");
+ deliveryCount = header.getDeliveryCount();
+ durable = header.isDurable();
+ firstAcquirer = header.isFirstAcquirer();
+ timeToLive = header.getTimeToLive();
+ priority = header.getPriority();
+ }
+
+ /**
+ * Gets the delivery count from amqp message header.
+ *
+ * @return the delivery count value.
+ */
+ public Long getDeliveryCount() {
+ return deliveryCount;
+ }
+
+ /**
+ * Sets the given {@code deliveryCount} value on {@link AmqpMessageHeader} object.
+ * @param deliveryCount to be set.
+ *
+ * @return updated {@link AmqpMessageHeader} object.
+ */
+ public AmqpMessageHeader setDeliveryCount(Long deliveryCount) {
+ this.deliveryCount = deliveryCount;
+ return this;
+ }
+
+ /**
+ * Gets durable boolean flag from amqp message header.
+ * @return the durable flag.
+ */
+ public Boolean isDurable() {
+ return durable;
+ }
+
+ /**
+ * Sets the given {@code durable} value on {@link AmqpMessageHeader} object.
+ * @param durable to set on {@link AmqpMessageHeader}.
+ *
+ * @return updated {@link AmqpMessageHeader} object.
+ */
+ public AmqpMessageHeader setDurable(Boolean durable) {
+ this.durable = durable;
+ return this;
+ }
+
+ /**
+ * Gets boolean flag for {@code firstAcquirer} from amqp message header.
+ * @return the {@code firstAcquirer} value.
+ */
+ public Boolean isFirstAcquirer() {
+ return this.firstAcquirer;
+ }
+
+ /**
+ * Sets the given {@code firstAcquirer} value on {@link AmqpMessageHeader} object.
+ * @param firstAcquirer to set on {@link AmqpMessageHeader}.
+ *
+ * @return updated {@link AmqpMessageHeader} object.
+ */
+ public AmqpMessageHeader setFirstAcquirer(Boolean firstAcquirer) {
+ this.firstAcquirer = firstAcquirer;
+ return this;
+ }
+
+ /**
+ * Gets the priority on {@code amqpMessage} from amqp message header.
+ * @return the {@code priority} value.
+ */
+ public Short getPriority() {
+ return priority;
+ }
+
+ /**
+ * Sets the given {@code priority} value on {@link AmqpMessageHeader} object.
+ * @param priority to set on {@link AmqpMessageHeader}.
+ *
+ * @return updated {@link AmqpMessageHeader} object.
+ */
+ public AmqpMessageHeader setPriority(Short priority) {
+ this.priority = priority;
+ return this;
+ }
+
+ /**
+ * Gets {@code timeToLive} from amqp message header.
+ *
+ * @return the {@code timeToLive} value.
+ */
+ public Duration getTimeToLive() {
+ return timeToLive;
+ }
+
+ /**
+ * Sets the given {@code timeToLive} value on {@link AmqpMessageHeader} object.
+ * @param timeToLive to set on {@link AmqpMessageHeader}.
+ *
+ * @return updated {@link AmqpMessageHeader} object.
+ */
+ public AmqpMessageHeader setTimeToLive(Duration timeToLive) {
+ this.timeToLive = timeToLive;
+ return this;
+ }
+}
diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageProperties.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageProperties.java
new file mode 100644
index 0000000000000..29e7595e5b417
--- /dev/null
+++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageProperties.java
@@ -0,0 +1,331 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.core.amqp.models;
+
+import com.azure.core.annotation.Fluent;
+
+import java.time.OffsetDateTime;
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * The representation of message properties as defined by AMQP protocol.
+ *
+ * @see
+ * Amqp Message Format.
+ */
+@Fluent
+public class AmqpMessageProperties {
+
+ private OffsetDateTime absoluteExpiryTime;
+ private String contentEncoding;
+ private String contentType;
+ private String correlationId;
+ private OffsetDateTime creationTime;
+ private String groupId;
+ private Long groupSequence;
+ private String messageId;
+ private String replyToGroupId;
+ private String replyTo;
+ private String to;
+ private String subject;
+ private byte[] userId;
+
+ AmqpMessageProperties() {
+ // This class does not have any public constructors, and is not able to be instantiated using 'new'.
+ }
+
+ /**
+ * The constructor is used to clone the values.
+ */
+ AmqpMessageProperties(AmqpMessageProperties properties) {
+ super();
+ Objects.requireNonNull(properties, "'properties' cannot be null.");
+ absoluteExpiryTime = properties.getAbsoluteExpiryTime();
+ contentEncoding = properties.getContentEncoding();
+ contentType = properties.getContentType();
+ correlationId = properties.getCorrelationId();
+ creationTime = properties.getCreationTime();
+ groupId = properties.getGroupId();
+ groupSequence = properties.getGroupSequence();
+ messageId = properties.getMessageId();
+ replyToGroupId = properties.getReplyToGroupId();
+ replyTo = properties.getReplyTo();
+ to = properties.getTo();
+ subject = properties.getSubject();
+ userId = properties.getUserId();
+ }
+
+ /**
+ * Gets {@code absoluteExpiryTime} from amqp message properties.
+ *
+ * @return the {@code absoluteExpiryTime} value.
+ */
+ public OffsetDateTime getAbsoluteExpiryTime() {
+ return absoluteExpiryTime;
+ }
+
+ /**
+ * Sets the given {@code absoluteExpiryTime} value on {@link AmqpMessageProperties} object.
+ *
+ * @param absoluteExpiryTime to be set.
+ * @return updated {@link AmqpMessageProperties} object.
+ */
+ public AmqpMessageProperties setAbsoluteExpiryTime(OffsetDateTime absoluteExpiryTime) {
+ this.absoluteExpiryTime = absoluteExpiryTime;
+ return this;
+ }
+
+ /**
+ * Gets AbsoluteExpiryTime from amqp message properties.
+ *
+ * @return the {@code absoluteExpiryTime} value.
+ */
+ public String getContentEncoding() {
+ return contentEncoding;
+ }
+
+ /**
+ * Sets the given {@code contentEncoding} value on {@link AmqpMessageProperties} object.
+ *
+ * @param contentEncoding to be set.
+ *
+ * @return updated {@link AmqpMessageProperties} object.
+ */
+ public AmqpMessageProperties setContentEncoding(String contentEncoding) {
+ this.contentEncoding = contentEncoding;
+ return this;
+ }
+
+ /**
+ * Gets {@code contentType} from amqp message properties.
+ *
+ * @return the {@code contentType} value.
+ */
+ public String getContentType() {
+ return contentType;
+ }
+
+ /**
+ * Sets the given {@code contentType} value on {@link AmqpMessageProperties} object.
+ *
+ * @param contentType to be set.
+ *
+ * @return updated {@link AmqpMessageProperties} object.
+ */
+ public AmqpMessageProperties setContentType(String contentType) {
+ this.contentType = contentType;
+ return this;
+ }
+
+ /**
+ * Gets {@code correlationId} from amqp message properties.
+ *
+ * @return the {@code correlationId} value.
+ */
+ public String getCorrelationId() {
+ return correlationId;
+ }
+
+ /**
+ * Sets the given {@code correlationId} value on {@link AmqpMessageProperties} object.
+ *
+ * @param correlationId to be set.
+ *
+ * @return updated {@link AmqpMessageProperties} object.
+ */
+ public AmqpMessageProperties setCorrelationId(String correlationId) {
+ this.correlationId = correlationId;
+ return this;
+ }
+
+ /**
+ * Gets {@code creationTime} from amqp message properties.
+ *
+ * @return the {@code creationTime} value.
+ */
+ public OffsetDateTime getCreationTime() {
+ return creationTime;
+ }
+
+ /**
+ * Sets the given {@code creationTime} value on {@link AmqpMessageProperties} object.
+ *
+ * @param creationTime to be set.
+ *
+ * @return updated {@link AmqpMessageProperties} object.
+ */
+ public AmqpMessageProperties setCreationTime(OffsetDateTime creationTime) {
+ this.creationTime = creationTime;
+ return this;
+ }
+
+ /**
+ * Gets {@code groupId} from amqp message properties.
+ *
+ * @return the {@code groupId} value.
+ */
+ public String getGroupId() {
+ return groupId;
+ }
+
+ /**
+ * Sets the given {@code groupId} value on {@link AmqpMessageProperties} object.
+ *
+ * @param groupId to be set.
+ *
+ * @return updated {@link AmqpMessageProperties} object.
+ */
+ public AmqpMessageProperties setGroupId(String groupId) {
+ this.groupId = groupId;
+ return this;
+ }
+
+ /**
+ * Gets {@code groupSequence} from amqp message properties.
+ *
+ * @return the {@code groupSequence} value.
+ */
+ public Long getGroupSequence() {
+ return groupSequence;
+ }
+
+ /**
+ * Sets the given {@code groupSequence} value on {@link AmqpMessageProperties} object.
+ *
+ * @param groupSequence to be set.
+ *
+ * @return updated {@link AmqpMessageProperties} object.
+ */
+ public AmqpMessageProperties setGroupSequence(Long groupSequence) {
+ this.groupSequence = groupSequence;
+ return this;
+ }
+
+ /**
+ * Gets {@code messageId} from amqp message properties.
+ *
+ * @return the {@code messageId} value.
+ */
+ public String getMessageId() {
+ return messageId;
+ }
+
+ /**
+ * Sets the given {@code messageId} value on {@link AmqpMessageProperties} object.
+ *
+ * @param messageId to be set .
+ *
+ * @return updated {@link AmqpMessageProperties} object.
+ */
+ public AmqpMessageProperties setMessageId(String messageId) {
+ this.messageId = messageId;
+ return this;
+ }
+
+ /**
+ * Gets {@code replyTo} from amqp message properties.
+ *
+ * @return The {@code replyTo} value.
+ */
+ public String getReplyTo() {
+ return replyTo;
+ }
+
+ /**
+ * Sets the given {@code replyTo} value on {@link AmqpMessageProperties} object.
+ *
+ * @param replyTo to be set.
+ *
+ * @return updated {@link AmqpMessageProperties} object.
+ */
+ public AmqpMessageProperties setReplyTo(String replyTo) {
+ this.replyTo = replyTo;
+ return this;
+ }
+
+ /**
+ * Gets {@code replyToGroupId} from amqp message properties.
+ *
+ * @return The {@code replyToGroupId} value.
+ */
+ public String getReplyToGroupId() {
+ return replyToGroupId;
+ }
+
+ /**
+ * Sets the given {@code replyToGroupId} value on {@link AmqpMessageProperties} object.
+ *
+ * @param replyToGroupId to be set.
+ *
+ * @return updated {@link AmqpMessageProperties} object.
+ */
+ public AmqpMessageProperties setReplyToGroupId(String replyToGroupId) {
+ this.replyToGroupId = replyToGroupId;
+ return this;
+ }
+
+ /**
+ * Gets {@code subject} from amqp message properties.
+ *
+ * @return the {@code subject} value.
+ */
+ public String getSubject() {
+ return subject;
+ }
+
+ /**
+ * Sets the given {@code subject} value on {@link AmqpMessageProperties} object.
+ *
+ * @param subject to be set.
+ *
+ * @return updated {@link AmqpMessageProperties} object.
+ */
+ public AmqpMessageProperties setSubject(String subject) {
+ this.subject = subject;
+ return this;
+ }
+
+ /**
+ * Gets {@code to} from amqp message properties.
+ *
+ * @return the {@code to} value.
+ */
+ public String getTo() {
+ return to;
+ }
+
+ /**
+ * Sets the given {@code to} value on {@link AmqpMessageProperties} object.
+ *
+ * @param to to be set.
+ *
+ * @return updated {@link AmqpMessageProperties} object.
+ */
+ public AmqpMessageProperties setTo(String to) {
+ this.to = to;
+ return this;
+ }
+
+ /**
+ * Gets {@code userId} from amqp message properties.
+ *
+ * @return the {@code userId} value.
+ */
+ public byte[] getUserId() {
+ return userId != null ? Arrays.copyOf(userId, userId.length) : new byte[0];
+ }
+
+ /**
+ * Sets the given {@code userId} value on {@link AmqpMessageProperties} object.
+ *
+ * @param userId to be set .
+ * @return updated {@link AmqpMessageProperties} object.
+ */
+ public AmqpMessageProperties setUserId(byte[] userId) {
+ this.userId = userId != null ? Arrays.copyOf(userId, userId.length) : new byte[0];
+ return this;
+ }
+
+}
diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/BinaryData.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/BinaryData.java
new file mode 100644
index 0000000000000..218e1eed10a36
--- /dev/null
+++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/BinaryData.java
@@ -0,0 +1,34 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.core.amqp.models;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * Binary representation of amqp message body.
+ */
+public final class BinaryData {
+ private final byte[] data;
+
+ /**
+ * Create {@link BinaryData} instance with given byte array data.
+ *
+ * @param data to use.
+ */
+ public BinaryData(byte[] data) {
+ Objects.requireNonNull(data, "'data' cannot be null.");
+ this.data = Arrays.copyOf(data, data.length);
+ }
+
+ /**
+ * Gets the data.
+ *
+ * @return byte array representing {@link BinaryData}.
+ */
+
+ public byte[] getData() {
+ return Arrays.copyOf(data, data.length);
+ }
+}
diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/package-info.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/package-info.java
new file mode 100644
index 0000000000000..49c5ed1490ff0
--- /dev/null
+++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/package-info.java
@@ -0,0 +1,7 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+/**
+ * Package containing classes related to AMQP models classes.
+ */
+package com.azure.core.amqp.models;
diff --git a/sdk/core/azure-core-amqp/src/main/java/module-info.java b/sdk/core/azure-core-amqp/src/main/java/module-info.java
index 77497a6a08af5..3edfcafb02114 100644
--- a/sdk/core/azure-core-amqp/src/main/java/module-info.java
+++ b/sdk/core/azure-core-amqp/src/main/java/module-info.java
@@ -9,6 +9,7 @@
requires transitive org.apache.qpid.proton.j;
exports com.azure.core.amqp;
+ exports com.azure.core.amqp.models;
exports com.azure.core.amqp.exception;
// FIXME this should not be a long-term solution
diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionTest.java
index 5c670c80d1836..e00b4c43d926e 100644
--- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionTest.java
+++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionTest.java
@@ -248,9 +248,7 @@ void initialConnectionState() {
// Assert
StepVerifier.create(connection.getEndpointStates())
.expectNext(AmqpEndpointState.UNINITIALIZED)
- .then(() -> {
- connection.dispose();
- })
+ .then(() -> connection.dispose())
.verifyComplete();
}
@@ -362,9 +360,4 @@ void cannotCreateResourcesOnFailure() {
verify(transport, times(1)).unbind();
}
-
- @Test
- void cannotCreateSessionWhenDisposed() {
-
- }
}
diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorReceiverTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorReceiverTest.java
index cfbc5b53546c0..b60f00bf991ec 100644
--- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorReceiverTest.java
+++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorReceiverTest.java
@@ -144,7 +144,6 @@ void updateEndpointState() {
.expectNext(AmqpEndpointState.ACTIVE)
.then(() -> receiverHandler.close())
.expectNext(AmqpEndpointState.CLOSED)
- .then(() -> reactorReceiver.dispose())
.verifyComplete();
}
diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSenderTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSenderTest.java
index 356bd1d5e0e97..c26b73e7c5e55 100644
--- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSenderTest.java
+++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSenderTest.java
@@ -3,28 +3,11 @@
package com.azure.core.amqp.implementation;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.isNull;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.ExponentialAmqpRetryPolicy;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.exception.AmqpResponseCode;
import com.azure.core.amqp.implementation.handler.SendLinkHandler;
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.List;
-
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.messaging.Accepted;
@@ -52,6 +35,23 @@
import reactor.core.publisher.ReplayProcessor;
import reactor.test.StepVerifier;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
/**
* Unit tests for {@link ReactorSender}
*/
@@ -98,7 +98,6 @@ public void setup() throws IOException {
FluxSink sink1 = endpointStateReplayProcessor.sink();
sink1.next(EndpointState.ACTIVE);
- when(handler.getErrors()).thenReturn(Flux.empty());
when(tokenManager.getAuthorizationResults()).thenReturn(Flux.just(AmqpResponseCode.ACCEPTED));
when(sender.getCredit()).thenReturn(100);
when(sender.advance()).thenReturn(true);
diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java
index e88388d2e44dd..061d2fa25c358 100644
--- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java
+++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java
@@ -4,15 +4,23 @@
package com.azure.core.amqp.implementation;
import com.azure.core.amqp.AmqpEndpointState;
+import com.azure.core.amqp.AmqpLink;
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.ClaimsBasedSecurityNode;
+import com.azure.core.amqp.exception.AmqpErrorCondition;
+import com.azure.core.amqp.exception.AmqpResponseCode;
+import com.azure.core.amqp.implementation.handler.SendLinkHandler;
import com.azure.core.amqp.implementation.handler.SessionHandler;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Record;
+import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.reactor.Reactor;
-import org.apache.qpid.proton.reactor.Selectable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@@ -20,12 +28,20 @@
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.io.IOException;
import java.time.Duration;
-
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -39,52 +55,61 @@ public class ReactorSessionTest {
private SessionHandler handler;
private ReactorSession reactorSession;
- private AmqpRetryPolicy retryPolicy;
@Mock
private Session session;
@Mock
private Reactor reactor;
@Mock
- private Selectable selectable;
- @Mock
private Event event;
@Mock
+ private Receiver receiver;
+ @Mock
+ private Sender sender;
+ @Mock
+ private Record record;
+ @Mock
private ClaimsBasedSecurityNode cbsNode;
@Mock
private MessageSerializer serializer;
@Mock
private ReactorProvider reactorProvider;
+ @Mock
+ private ReactorHandlerProvider reactorHandlerProvider;
+ @Mock
+ private ReactorDispatcher reactorDispatcher;
+ @Mock
+ private TokenManagerProvider tokenManagerProvider;
+
+ private Mono cbsNodeSupplier;
@BeforeEach
public void setup() throws IOException {
MockitoAnnotations.initMocks(this);
- when(reactor.selectable()).thenReturn(selectable);
- when(event.getSession()).thenReturn(session);
- ReactorDispatcher dispatcher = new ReactorDispatcher(reactor);
- this.handler = new SessionHandler(ID, HOST, ENTITY_PATH, dispatcher, Duration.ofSeconds(60));
+ this.handler = new SessionHandler(ID, HOST, ENTITY_PATH, reactorDispatcher, Duration.ofSeconds(60));
+ this.cbsNodeSupplier = Mono.just(cbsNode);
when(reactorProvider.getReactor()).thenReturn(reactor);
- when(reactorProvider.getReactorDispatcher()).thenReturn(dispatcher);
-
- MockReactorHandlerProvider handlerProvider = new MockReactorHandlerProvider(reactorProvider, null, handler, null, null);
- AzureTokenManagerProvider azureTokenManagerProvider = new AzureTokenManagerProvider(
- CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, HOST, "a-test-scope");
- this.retryPolicy = RetryUtil.getRetryPolicy(new AmqpRetryOptions());
- this.reactorSession = new ReactorSession(session, handler, NAME, reactorProvider, handlerProvider,
- Mono.just(cbsNode), azureTokenManagerProvider, serializer, TIMEOUT, retryPolicy);
+ when(reactorProvider.getReactorDispatcher()).thenReturn(reactorDispatcher);
+ when(event.getSession()).thenReturn(session);
+ when(sender.attachments()).thenReturn(record);
+ when(receiver.attachments()).thenReturn(record);
+
+ doAnswer(invocation -> {
+ final Runnable runnable = invocation.getArgument(0);
+ runnable.run();
+ return null;
+ }).when(reactorDispatcher).invoke(any());
+
+ AmqpRetryPolicy retryPolicy = RetryUtil.getRetryPolicy(new AmqpRetryOptions());
+ this.reactorSession = new ReactorSession(session, handler, NAME, reactorProvider, reactorHandlerProvider,
+ cbsNodeSupplier, tokenManagerProvider, serializer, TIMEOUT, retryPolicy);
}
@AfterEach
public void teardown() {
- session = null;
- reactor = null;
- selectable = null;
- event = null;
- cbsNode = null;
-
Mockito.framework().clearInlineMocks();
}
@@ -108,7 +133,6 @@ public void verifyEndpointStates() {
.expectNext(AmqpEndpointState.ACTIVE)
.then(() -> handler.close())
.expectNext(AmqpEndpointState.CLOSED)
- .then(() -> reactorSession.dispose())
.expectComplete()
.verify(Duration.ofSeconds(10));
}
@@ -116,6 +140,92 @@ public void verifyEndpointStates() {
@Test
public void verifyDispose() {
reactorSession.dispose();
- Assertions.assertTrue(reactorSession.isDisposed());
+ assertTrue(reactorSession.isDisposed());
+ }
+
+ /**
+ * Verifies that we can create the producer.
+ */
+ @Test
+ void createProducer() {
+ // Arrange
+ final String linkName = "test-link-name";
+ final String entityPath = "test-entity-path";
+ final AmqpRetryPolicy amqpRetryPolicy = mock(AmqpRetryPolicy.class);
+ final Map linkProperties = new HashMap<>();
+ final Duration timeout = Duration.ofSeconds(30);
+ final TokenManager tokenManager = mock(TokenManager.class);
+ final SendLinkHandler sendLinkHandler = new SendLinkHandler(ID, HOST, linkName, entityPath);
+
+ when(session.sender(linkName)).thenReturn(sender);
+ when(tokenManagerProvider.getTokenManager(cbsNodeSupplier, entityPath)).thenReturn(tokenManager);
+ when(tokenManager.authorize()).thenReturn(Mono.just(1000L));
+ when(tokenManager.getAuthorizationResults())
+ .thenReturn(Flux.create(sink -> sink.next(AmqpResponseCode.ACCEPTED)));
+ when(reactorHandlerProvider.createSendLinkHandler(ID, HOST, linkName, entityPath))
+ .thenReturn(sendLinkHandler);
+
+ StepVerifier.create(
+ reactorSession.createProducer(linkName, entityPath, timeout, amqpRetryPolicy, linkProperties))
+ .then(() -> handler.onSessionRemoteOpen(event))
+ .thenAwait(Duration.ofSeconds(2))
+ .assertNext(producer -> assertTrue(producer instanceof ReactorSender))
+ .verifyComplete();
+
+ final AmqpLink sendLink = reactorSession.createProducer(linkName, entityPath, timeout, amqpRetryPolicy,
+ linkProperties)
+ .block(TIMEOUT);
+
+ assertNotNull(sendLink);
+ }
+
+ /**
+ * Verifies that we can create the producer.
+ */
+ @Test
+ void createProducerAgainAfterException() {
+ // Arrange
+ final String linkName = "test-link-name";
+ final String entityPath = "test-entity-path";
+ final AmqpRetryPolicy amqpRetryPolicy = mock(AmqpRetryPolicy.class);
+ final Map linkProperties = new HashMap<>();
+ final Duration timeout = Duration.ofSeconds(30);
+ final TokenManager tokenManager = mock(TokenManager.class);
+ final SendLinkHandler sendLinkHandler = new SendLinkHandler(ID, HOST, linkName, entityPath);
+
+ final Event closeSendEvent = mock(Event.class);
+ when(closeSendEvent.getLink()).thenReturn(sender);
+
+ final ErrorCondition errorCondition = new ErrorCondition(
+ Symbol.valueOf(AmqpErrorCondition.SERVER_BUSY_ERROR.getErrorCondition()), "test-busy");
+ when(sender.getRemoteCondition()).thenReturn(errorCondition);
+
+ when(session.sender(linkName)).thenReturn(sender);
+ when(tokenManagerProvider.getTokenManager(cbsNodeSupplier, entityPath)).thenReturn(tokenManager);
+ when(tokenManager.authorize()).thenReturn(Mono.just(1000L));
+ when(tokenManager.getAuthorizationResults())
+ .thenReturn(Flux.create(sink -> sink.next(AmqpResponseCode.ACCEPTED)));
+ when(reactorHandlerProvider.createSendLinkHandler(ID, HOST, linkName, entityPath))
+ .thenReturn(sendLinkHandler);
+
+ handler.onSessionRemoteOpen(event);
+
+ final AmqpLink sendLink = reactorSession.createProducer(linkName, entityPath, timeout, amqpRetryPolicy,
+ linkProperties)
+ .block(TIMEOUT);
+
+ assertNotNull(sendLink);
+ assertTrue(sendLink instanceof AmqpSendLink);
+
+ // Act
+ sendLinkHandler.onLinkRemoteClose(closeSendEvent);
+ }
+
+ @Test
+ void createConsumer() {
+ // Arrange
+ final String linkName = "test-link-name";
+ final String entityPath = "test-entity-path";
+ final AmqpRetryPolicy amqpRetryPolicy = mock(AmqpRetryPolicy.class);
}
}
diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RequestResponseChannelTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RequestResponseChannelTest.java
index 80cfc91f29852..665bbeada14d6 100644
--- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RequestResponseChannelTest.java
+++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RequestResponseChannelTest.java
@@ -31,7 +31,6 @@
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import reactor.core.publisher.DirectProcessor;
-import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.ReplayProcessor;
import reactor.test.StepVerifier;
@@ -121,11 +120,9 @@ void beforeEach() {
FluxSink sink1 = endpointStateReplayProcessor.sink();
sink1.next(EndpointState.ACTIVE);
when(receiveLinkHandler.getEndpointStates()).thenReturn(endpointStateReplayProcessor);
- when(receiveLinkHandler.getErrors()).thenReturn(Flux.never());
when(receiveLinkHandler.getDeliveredMessages()).thenReturn(deliveryProcessor);
when(sendLinkHandler.getEndpointStates()).thenReturn(endpointStateReplayProcessor);
- when(sendLinkHandler.getErrors()).thenReturn(Flux.never());
}
@AfterEach
diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/HandlerTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/HandlerTest.java
index 78895e08989aa..c0a73a451ae21 100644
--- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/HandlerTest.java
+++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/HandlerTest.java
@@ -24,14 +24,7 @@ public void initialHandlerState() {
StepVerifier.create(handler.getEndpointStates())
.expectNext(EndpointState.UNINITIALIZED)
.then(handler::close)
- .verifyComplete();
- }
-
- @Test
- public void initialErrors() {
- // Act & Assert
- StepVerifier.create(handler.getErrors())
- .then(handler::close)
+ .expectNext(EndpointState.CLOSED)
.verifyComplete();
}
@@ -44,6 +37,7 @@ public void propagatesStates() {
.expectNext(EndpointState.ACTIVE)
.then(() -> handler.onNext(EndpointState.ACTIVE))
.then(handler::close)
+ .expectNext(EndpointState.CLOSED)
.verifyComplete();
}
@@ -54,11 +48,55 @@ public void propagatesErrors() {
final Throwable exception = new AmqpException(false, "Some test message.", context);
// Act & Assert
- StepVerifier.create(handler.getErrors())
- .then(() -> handler.onNext(exception))
- .expectNext(exception)
- .then(handler::close)
- .verifyComplete();
+ StepVerifier.create(handler.getEndpointStates())
+ .expectNext(EndpointState.UNINITIALIZED)
+ .then(() -> handler.onError(exception))
+ .expectNext(EndpointState.CLOSED)
+ .expectErrorMatches(e -> e.equals(exception))
+ .verify();
+ }
+
+ @Test
+ public void propagatesErrorsOnce() {
+ // Arrange
+ final AmqpErrorContext context = new AmqpErrorContext("test namespace.");
+ final Throwable exception = new AmqpException(false, "Some test message.", context);
+ final Throwable exception2 = new AmqpException(false, "Some test message2.", context);
+
+ // Act & Assert
+ StepVerifier.create(handler.getEndpointStates())
+ .expectNext(EndpointState.UNINITIALIZED)
+ .then(() -> {
+ handler.onError(exception);
+ handler.onError(exception2);
+ })
+ .expectNext(EndpointState.CLOSED)
+ .expectErrorMatches(e -> e.equals(exception))
+ .verify();
+
+ StepVerifier.create(handler.getEndpointStates())
+ .expectNext(EndpointState.CLOSED)
+ .expectErrorMatches(e -> e.equals(exception))
+ .verify();
+ }
+
+ @Test
+ public void completesOnce() {
+ // Act & Assert
+ StepVerifier.create(handler.getEndpointStates())
+ .expectNext(EndpointState.UNINITIALIZED)
+ .then(() -> handler.onNext(EndpointState.ACTIVE))
+ .expectNext(EndpointState.ACTIVE)
+ .then(() -> handler.close())
+ .expectNext(EndpointState.CLOSED)
+ .expectComplete()
+ .verify();
+
+ // The last state is always replayed before it is closed.
+ StepVerifier.create(handler.getEndpointStates())
+ .expectNext(EndpointState.CLOSED)
+ .expectComplete()
+ .verify();
}
private static class TestHandler extends Handler {
diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/LinkHandlerTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/LinkHandlerTest.java
index f1ff19e8f33c3..d0060c447f36a 100644
--- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/LinkHandlerTest.java
+++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/LinkHandlerTest.java
@@ -28,6 +28,7 @@
import static com.azure.core.amqp.exception.AmqpErrorCondition.LINK_STOLEN;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
@@ -51,7 +52,6 @@ class LinkHandlerTest {
private final String description = "test-description";
private final LinkHandler handler = new MockLinkHandler(CONNECTION_ID, HOSTNAME, ENTITY_PATH, logger);
-
@BeforeAll
static void beforeAll() {
StepVerifier.setDefaultTimeout(Duration.ofSeconds(30));
@@ -147,18 +147,17 @@ void onLinkRemoteClose() {
when(session.getLocalState()).thenReturn(EndpointState.ACTIVE);
// Act
- StepVerifier.Step endpointState = StepVerifier.create(handler.getEndpointStates())
- .expectNext(EndpointState.CLOSED);
-
- StepVerifier.Step throwableStep = StepVerifier.create(handler.getErrors())
- .assertNext(error -> {
+ StepVerifier.create(handler.getEndpointStates())
+ .expectNext(EndpointState.UNINITIALIZED)
+ .then(() -> handler.onLinkRemoteClose(event))
+ .expectNext(EndpointState.CLOSED)
+ .expectErrorSatisfies(error -> {
Assertions.assertTrue(error instanceof AmqpException);
AmqpException exception = (AmqpException) error;
Assertions.assertEquals(LINK_STOLEN, exception.getErrorCondition());
- });
-
- handler.onLinkRemoteClose(event);
+ })
+ .verify();
// Assert
verify(link).setCondition(errorCondition);
@@ -166,17 +165,13 @@ void onLinkRemoteClose() {
verify(session, never()).setCondition(errorCondition);
verify(session, never()).close();
-
- endpointState.thenCancel().verify();
- throwableStep.then(() -> handler.close())
- .verifyComplete();
}
/**
- * Verifies that it does not close the link when the link is already in a closed endpoint state.
+ * Verifies that an error is propagated if there is an error condition on close.
*/
@Test
- void onLinkRemoteCloseNoException() {
+ void onLinkRemoteCloseWithErrorCondition() {
// Arrange
final ErrorCondition errorCondition = new ErrorCondition(symbol, description);
@@ -185,19 +180,17 @@ void onLinkRemoteCloseNoException() {
when(link.getLocalState()).thenReturn(EndpointState.CLOSED);
// Act & Assert
- StepVerifier.Step endpointState = StepVerifier.create(handler.getEndpointStates())
+ StepVerifier.create(handler.getEndpointStates())
+ .expectNext(EndpointState.UNINITIALIZED)
+ .then(() -> handler.onLinkRemoteClose(event))
.expectNext(EndpointState.CLOSED)
- .expectNoEvent(Duration.ofSeconds(2));
-
- StepVerifier.Step throwableStep = StepVerifier.create(handler.getErrors())
- .assertNext(error -> {
+ .expectErrorSatisfies(error -> {
Assertions.assertTrue(error instanceof AmqpException);
AmqpException exception = (AmqpException) error;
Assertions.assertEquals(LINK_STOLEN, exception.getErrorCondition());
- });
-
- handler.onLinkRemoteClose(event);
+ })
+ .verify();
// Assert
verify(link, never()).setCondition(errorCondition);
@@ -205,11 +198,39 @@ void onLinkRemoteCloseNoException() {
verify(session, never()).setCondition(errorCondition);
verify(session, never()).close();
+ }
- endpointState.thenCancel().verify();
- throwableStep.thenCancel().verify();
+ /**
+ * Verifies that no error is propagated. And it is closed instead.
+ */
+ @Test
+ void onLinkRemoteCloseNoErrorCondition() {
+ // Arrange
+ final ErrorCondition errorCondition = new ErrorCondition(null, description);
+ final Event finalEvent = mock(Event.class);
+
+ when(link.getRemoteCondition()).thenReturn(errorCondition);
+ when(link.getSession()).thenReturn(session);
+ when(link.getLocalState()).thenReturn(EndpointState.CLOSED);
+
+ // Act & Assert
+ StepVerifier.create(handler.getEndpointStates())
+ .expectNext(EndpointState.UNINITIALIZED)
+ .then(() -> handler.onLinkRemoteClose(event))
+ .expectNext(EndpointState.CLOSED)
+ .then(() -> handler.onLinkFinal(finalEvent))
+ .expectComplete()
+ .verify();
+
+ // Assert
+ verify(link, never()).setCondition(errorCondition);
+ verify(link, never()).close();
+
+ verify(session, never()).setCondition(errorCondition);
+ verify(session, never()).close();
}
+
private static final class MockLinkHandler extends LinkHandler {
MockLinkHandler(String connectionId, String hostname, String entityPath, ClientLogger logger) {
super(connectionId, hostname, entityPath, logger);
diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/models/AmqpAnnotatedMessageTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/models/AmqpAnnotatedMessageTest.java
new file mode 100644
index 0000000000000..aea81dc2a7fa2
--- /dev/null
+++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/models/AmqpAnnotatedMessageTest.java
@@ -0,0 +1,185 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.core.amqp.models;
+
+import com.azure.core.util.logging.ClientLogger;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+
+/**
+ * Test class for {@link AmqpAnnotatedMessage}
+ */
+public class AmqpAnnotatedMessageTest {
+
+ private static final byte[] CONTENTS_BYTES = "Some-contents".getBytes(StandardCharsets.UTF_8);
+ private static final BinaryData DATA_BYTES = new BinaryData(CONTENTS_BYTES);
+ private final ClientLogger logger = new ClientLogger(AmqpAnnotatedMessageTest.class);
+
+ /**
+ * Verifies we correctly set values via copy constructor for {@link AmqpAnnotatedMessage} and create new
+ * instances of the properties.
+ */
+ @Test
+ public void copyConstructorTest() {
+ // Arrange
+ final int expectedBinaryDataSize = 1;
+ List expectedBinaryData = new ArrayList<>();
+ expectedBinaryData.add(DATA_BYTES);
+
+ final AmqpDataBody amqpDataBody = new AmqpDataBody(expectedBinaryData);
+ final AmqpAnnotatedMessage expected = new AmqpAnnotatedMessage(amqpDataBody);
+ final Map expectedMessageAnnotations = expected.getMessageAnnotations();
+ expectedMessageAnnotations.put("ma-1", "ma-value1");
+
+ final Map expectedDeliveryAnnotations = expected.getDeliveryAnnotations();
+ expectedDeliveryAnnotations.put("da-1", "da-value1");
+
+ final Map expectedApplicationProperties = expected.getApplicationProperties();
+ expectedApplicationProperties.put("ap-1", "ap-value1");
+
+ final Map expectedFooter = expected.getFooter();
+ expectedFooter.put("foo-1", "foo-value1");
+
+ final AmqpMessageProperties expectedMessageProperties = expected.getProperties();
+ expectedMessageProperties.setGroupSequence(2L);
+ expectedMessageProperties.setContentEncoding("content-enc");
+ expectedMessageProperties.setReplyToGroupId("a");
+ expectedMessageProperties.setReplyTo("b");
+ expectedMessageProperties.setCorrelationId("c");
+ expectedMessageProperties.setSubject("d");
+ expectedMessageProperties.setMessageId("id");
+
+ final AmqpMessageHeader expectedMessageHeader = expected.getHeader();
+ expectedMessageHeader.setDeliveryCount(5L);
+ expectedMessageHeader.setTimeToLive(Duration.ofSeconds(20));
+ expectedMessageHeader.setPriority(Short.valueOf("4"));
+
+ final AmqpAnnotatedMessage actual = new AmqpAnnotatedMessage(expected);
+
+ // Act
+ // Now update the values after we have created AmqpAnnotatedMessage using copy constructor.
+ expectedDeliveryAnnotations.remove("da-1");
+ expectedApplicationProperties.put("ap-2", "ap-value2");
+ expectedFooter.remove("foo-1");
+ expected.getHeader().setDeliveryCount(Long.valueOf(100));
+ expectedBinaryData = new ArrayList<>();
+
+ // Assert
+ // Ensure the memory references are not same.
+ assertNotSame(expected.getProperties(), actual.getProperties());
+ assertNotSame(expected.getApplicationProperties(), actual.getApplicationProperties());
+ assertNotSame(expected.getDeliveryAnnotations(), actual.getDeliveryAnnotations());
+ assertNotSame(expected.getFooter(), actual.getFooter());
+ assertNotSame(expected.getHeader(), actual.getHeader());
+ assertNotSame(expected.getMessageAnnotations(), actual.getMessageAnnotations());
+ assertNotSame(expected.getProperties().getUserId(), actual.getProperties().getUserId());
+ assertNotSame(expected.getHeader().getDeliveryCount(), actual.getHeader().getDeliveryCount());
+
+ assertEquals(1, actual.getDeliveryAnnotations().size());
+ assertEquals(1, actual.getApplicationProperties().size());
+ assertEquals(1, actual.getFooter().size());
+
+ assertEquals(expectedMessageProperties.getGroupSequence(), actual.getProperties().getGroupSequence());
+ assertEquals(expectedMessageProperties.getContentEncoding(), actual.getProperties().getContentEncoding());
+ assertEquals(expectedMessageProperties.getReplyToGroupId(), actual.getProperties().getReplyToGroupId());
+ assertEquals(expectedMessageProperties.getReplyTo(), actual.getProperties().getReplyTo());
+ assertEquals(expectedMessageProperties.getCorrelationId(), actual.getProperties().getCorrelationId());
+ assertEquals(expectedMessageProperties.getSubject(), actual.getProperties().getSubject());
+ assertEquals(expectedMessageProperties.getMessageId(), actual.getProperties().getMessageId());
+
+ assertEquals(expectedMessageHeader.getTimeToLive(), actual.getHeader().getTimeToLive());
+ assertEquals(expectedMessageHeader.getPriority(), actual.getHeader().getPriority());
+
+ assertMessageBody(expectedBinaryDataSize, CONTENTS_BYTES, actual);
+ }
+
+ /**
+ * Verifies we correctly set values via constructor for {@link AmqpAnnotatedMessage}.
+ */
+ @Test
+ public void constructorValidValues() {
+ // Arrange
+ final List expectedBinaryData = Collections.singletonList(DATA_BYTES);
+ final AmqpDataBody amqpDataBody = new AmqpDataBody(expectedBinaryData);
+
+ // Act
+ final AmqpAnnotatedMessage actual = new AmqpAnnotatedMessage(amqpDataBody);
+
+ // Assert
+ assertMessageCreation(AmqpBodyType.DATA, expectedBinaryData.size(), actual);
+ }
+
+ /**
+ * Verifies we correctly set values via constructor for {@link AmqpAnnotatedMessage}.
+ */
+ @Test
+ public void constructorAmqpValidValues() {
+ // Arrange
+ final List expectedBinaryData = Collections.singletonList(DATA_BYTES);
+ final AmqpDataBody amqpDataBody = new AmqpDataBody(expectedBinaryData);
+ final AmqpAnnotatedMessage expected = new AmqpAnnotatedMessage(amqpDataBody);
+
+ // Act
+ final AmqpAnnotatedMessage actual = new AmqpAnnotatedMessage(expected);
+
+ // Assert
+ assertMessageCreation(AmqpBodyType.DATA, expectedBinaryData.size(), actual);
+ }
+
+ /**
+ * Verifies {@link AmqpAnnotatedMessage} constructor for null values.
+ */
+ @Test
+ public void constructorNullValidValues() {
+ // Arrange
+ final AmqpDataBody body = null;
+
+ // Act & Assert
+ Assertions.assertThrows(NullPointerException.class, () -> new AmqpAnnotatedMessage(body));
+ }
+
+ private void assertMessageCreation(AmqpBodyType expectedType, int expectedMessageSize, AmqpAnnotatedMessage actual) {
+ assertEquals(expectedType, actual.getBody().getBodyType());
+ assertNotNull(actual.getProperties());
+ assertNotNull(actual.getHeader());
+ assertNotNull(actual.getFooter());
+ assertNotNull(actual.getApplicationProperties());
+ assertNotNull(actual.getDeliveryAnnotations());
+ assertNotNull(actual.getMessageAnnotations());
+ assertNotNull(actual.getApplicationProperties());
+
+ // Validate Message Body
+ assertNotNull(actual.getBody());
+ assertMessageBody(expectedMessageSize, CONTENTS_BYTES, actual);
+ }
+
+ private void assertMessageBody(int expectedMessageSize, byte[] expectedbody, AmqpAnnotatedMessage actual) {
+ final AmqpBodyType actualType = actual.getBody().getBodyType();
+ switch (actualType) {
+ case DATA:
+ List actualData = ((AmqpDataBody) actual.getBody()).getData().stream().collect(Collectors.toList());
+ assertEquals(expectedMessageSize, actualData.size());
+ assertArrayEquals(expectedbody, actualData.get(0).getData());
+ break;
+ case VALUE:
+ case SEQUENCE:
+ throw logger.logExceptionAsError(new UnsupportedOperationException("type not supported yet :" + actualType));
+ default:
+ throw logger.logExceptionAsError(new IllegalStateException("Invalid type :" + actualType));
+ }
+ }
+}
diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/models/AmqpDataBodyTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/models/AmqpDataBodyTest.java
new file mode 100644
index 0000000000000..1b9b6ecd752f4
--- /dev/null
+++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/models/AmqpDataBodyTest.java
@@ -0,0 +1,54 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.core.amqp.models;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Test for {@link AmqpDataBody}.
+ */
+public class AmqpDataBodyTest {
+
+ /**
+ * Verifies we correctly set values via constructor for {@link AmqpAnnotatedMessage}.
+ */
+ @Test
+ public void constructorValidValues() {
+ // Arrange
+ final List expectedDataList = new ArrayList<>();
+ expectedDataList.add(new BinaryData("some data 1".getBytes()));
+ expectedDataList.add(new BinaryData("some data 2".getBytes()));
+
+ // Act
+ final AmqpDataBody actual = new AmqpDataBody(expectedDataList);
+
+ // Assert
+ assertEquals(AmqpBodyType.DATA, actual.getBodyType());
+
+ // Validate Message Body
+ final List dataList = actual.getData().stream().collect(Collectors.toList());
+ assertEquals(expectedDataList.size(), dataList.size());
+ assertArrayEquals(expectedDataList.toArray(), dataList.toArray());
+ }
+
+ /**
+ * Verifies {@link BinaryData} constructor for null values.
+ */
+ @Test
+ public void constructorNullValidValues() {
+ // Arrange
+ final List listBinaryData = null;
+
+ // Act & Assert
+ Assertions.assertThrows(NullPointerException.class, () -> new AmqpDataBody(listBinaryData));
+ }
+}
diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/models/BinaryDataTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/models/BinaryDataTest.java
new file mode 100644
index 0000000000000..ba52eb9c996c6
--- /dev/null
+++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/models/BinaryDataTest.java
@@ -0,0 +1,40 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.core.amqp.models;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+
+/**
+ * Test for {@link BinaryData}.
+ */
+public class BinaryDataTest {
+
+ private static final byte[] CONTENTS_BYTES = "Some-contents".getBytes(StandardCharsets.UTF_8);
+
+ /**
+ * Verifies we correctly set values via constructor for {@link BinaryData}.
+ */
+ @Test
+ public void constructorValidValues() {
+ // Arrange & Act
+ final BinaryData actual = new BinaryData(CONTENTS_BYTES);
+
+ // Assert
+ assertArrayEquals(CONTENTS_BYTES, actual.getData());
+ }
+
+ /**
+ * Verifies {@link BinaryData} constructor for null valeus.
+ */
+ @Test
+ public void constructorNullValidValues() {
+ // Arrange, Act & Assert
+ Assertions.assertThrows(NullPointerException.class, () -> new BinaryData(null));
+ }
+}
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/pom.xml b/sdk/eventhubs/azure-messaging-eventhubs/pom.xml
index a84ca5e9e0761..46d349f82914f 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/pom.xml
+++ b/sdk/eventhubs/azure-messaging-eventhubs/pom.xml
@@ -42,7 +42,7 @@
com.azureazure-core-amqp
- 1.5.0-beta.1
+ 1.5.0
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java
index f752e7a06cfa6..1c222fc6348db 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java
@@ -659,5 +659,4 @@ private ProxyOptions getProxyOptions(ProxyAuthenticationType authentication, Str
coreProxyOptions.getAddress()), coreProxyOptions.getUsername(), coreProxyOptions.getPassword());
}
}
-
}
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java
index aa41cdd1b7ce3..c673fd322176e 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java
@@ -127,7 +127,7 @@ void stopAllPartitionPumps() {
*/
void startPartitionPump(PartitionOwnership claimedOwnership, Checkpoint checkpoint) {
if (partitionPumps.containsKey(claimedOwnership.getPartitionId())) {
- logger.verbose("Consumer is already running for this partition {}", claimedOwnership.getPartitionId());
+ logger.verbose("Consumer is already running for this partition {}", claimedOwnership.getPartitionId());
return;
}
diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/README.md b/sdk/eventhubs/microsoft-azure-eventhubs/README.md
index 95b9c61dbed2f..1df6ef7457e4b 100644
--- a/sdk/eventhubs/microsoft-azure-eventhubs/README.md
+++ b/sdk/eventhubs/microsoft-azure-eventhubs/README.md
@@ -1,7 +1,7 @@
# Azure Event Hubs (Track 1) client library for Java
-
Microsoft Azure Event Hubs Client for Java
+
Microsoft Azure Event Hubs Client for Java
Azure Event Hubs is a hyper-scale data ingestion service, fully-managed by Microsoft, that enables you to collect, store
and process trillions of events from websites, apps, IoT devices, and any stream of data.
@@ -15,7 +15,7 @@ general and for an overview of Event Hubs Client for Java.
- An **Event Hub producer** is a source of telemetry data, diagnostics information, usage logs, or other log data, as
part of an embedded device solution, a mobile device application, a game title running on a console or other device,
- some client or server based business solution, or a web site.
+ some client or server based business solution, or a website.
- An **Event Hub consumer** picks up such information from the Event Hub and processes it. Processing may involve
aggregation, complex computation, and filtering. Processing may also involve distribution or storage of the
@@ -29,13 +29,15 @@ general and for an overview of Event Hubs Client for Java.
- A **consumer group** is a view of an entire Event Hub. Consumer groups enable multiple consuming applications to each
have a separate view of the event stream, and to read the stream independently at their own pace and from their own
- position. There can be at most 5 concurrent readers on a partition per consumer group; however it is recommended that
- there is only one active consumer for a given partition and consumer group pairing. Each active reader receives all of
+ position. There can be at most 5 concurrent readers on a partition per consumer group; however it is recommended
+ there is only one active consumer for a given partition and consumer group pairing. Each active reader receives all
the events from its partition; if there are multiple readers on the same partition, then they will receive duplicate
events.
-For more concepts and deeper discussion, see: [Event Hubs Features][event_hubs_features]. Also, the concepts for AMQP
-are well documented in [OASIS Advanced Messaging Queuing Protocol (AMQP) Version 1.0][oasis_amqp_v1].
+For more concepts and deeper discussion, see:
+[Event Hubs Features](https://docs.microsoft.com/azure/event-hubs/event-hubs-features). Also, the concepts for AMQP
+are well documented in [OASIS Advanced Messaging Queuing Protocol (AMQP) Version
+1.0](http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-overview-v1.0-os.html).
### Referencing the library
@@ -50,12 +52,12 @@ the required versions of Apache Qpid Proton-J, and the cryptography library BCPK
|--------|------------------|
|azure-eventhubs|[![Maven Central](https://maven-badges.herokuapp.com/maven-central/com.microsoft.azure/azure-eventhubs/badge.svg)](https://maven-badges.herokuapp.com/maven-central/com.microsoft.azure/azure-eventhubs)
-```XML
-
- com.microsoft.azure
- azure-eventhubs
- 2.3.1
-
+```xml
+
+ com.microsoft.azure
+ azure-eventhubs
+ 2.3.1
+
```
#### Microsoft Azure EventHubs Java Event Processor Host library
@@ -80,14 +82,14 @@ It pulls the required versions of Event Hubs, Azure Storage and GSon libraries.
First, if you experience any issues with the runtime behavior of the Azure Event Hubs service, please consider filing a
support request right away. Your options for [getting support are enumerated
here](https://azure.microsoft.com/support/options/). In the Azure portal, you can file a support request from the "Help
-and support" menu in the upper right hand corner of the page.
+and support" menu in the upper right-hand corner of the page.
If you find issues in this library or have suggestions for improvement of code or documentation, you can [file an issue
in the project's GitHub repository](https://github.com/Azure/azure-sdk-for-java/issues) or send across a pull request -
see our [Contribution Guidelines](../azure-messaging-eventhubs/CONTRIBUTING.md).
Issues related to runtime behavior of the service, such as sporadic exceptions or apparent service-side performance or
-reliability issues can not be handled here.
+reliability issues cannot be handled here.
Generally, if you want to discuss Azure Event Hubs or this client library with the community and the maintainers, you
can turn to [stackoverflow.com under the #azure-eventhub tag](http://stackoverflow.com/questions/tagged/azure-eventhub)
diff --git a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md
index c6e7114bad31a..609ebe8b9d750 100644
--- a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md
+++ b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md
@@ -1,11 +1,13 @@
# Release History
## 7.0.0-beta.6 (Unreleased)
+- Exposing Amqp Message envelope in form of `AmqpAnnotatedMessage` as a property of `ServiceBusReceivedMessage` and `ServiceBusMessage`.
- Removed `ServiceBusReceiverClientBuilder.maxAutoLockRenewalDuration`. Use method `getAutoRenewMessageLock` of classes `ServiceBusReceiverClient`
and `ServiceBusReceiverAsyncClient` to lock messages and sessions.
- Updated datetime related APIs to use `java.time.OffsetDateTime` instead of `java.time.Instant`.
- Removed `scheduledMessageCount` from `SubscriptionRuntimeInfo` and added it to `TopicRuntimeInfo`.
- Changed `QueueRuntimeInfo`, `TopicRuntimeInfo` and `SubscriptionRuntimeInfo` to `QueueRuntimeProperties`, `TopicRuntimeProperties`
and `SubscriptionRuntimeProperties` respectively.
+
## 7.0.0-beta.5 (2020-08-11)
- Remove public constructor for QueueDescription, TopicDescription, SubscriptionDescription.
diff --git a/sdk/servicebus/azure-messaging-servicebus/pom.xml b/sdk/servicebus/azure-messaging-servicebus/pom.xml
index 816ceae3df891..081a69fd90a35 100644
--- a/sdk/servicebus/azure-messaging-servicebus/pom.xml
+++ b/sdk/servicebus/azure-messaging-servicebus/pom.xml
@@ -47,7 +47,7 @@
com.azureazure-core-amqp
- 1.5.0-beta.1
+ 1.5.0com.azure
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessage.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessage.java
index aa54261b7dbd4..1b64b77a28978 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessage.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessage.java
@@ -3,16 +3,35 @@
package com.azure.messaging.servicebus;
+import com.azure.core.amqp.AmqpMessageConstant;
+import com.azure.core.amqp.models.AmqpAnnotatedMessage;
+import com.azure.core.amqp.models.AmqpBodyType;
+import com.azure.core.amqp.models.AmqpDataBody;
+import com.azure.core.amqp.models.BinaryData;
import com.azure.core.util.Context;
+import com.azure.core.util.logging.ClientLogger;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
import java.util.Arrays;
-import java.util.HashMap;
+import java.util.Collections;
+import java.util.Date;
import java.util.Map;
import java.util.Objects;
+import static com.azure.core.amqp.AmqpMessageConstant.DEAD_LETTER_DESCRIPTION_ANNOTATION_NAME;
+import static com.azure.core.amqp.AmqpMessageConstant.DEAD_LETTER_REASON_ANNOTATION_NAME;
+import static com.azure.core.amqp.AmqpMessageConstant.DEAD_LETTER_SOURCE_KEY_ANNOTATION_NAME;
+import static com.azure.core.amqp.AmqpMessageConstant.ENQUEUED_SEQUENCE_NUMBER_ANNOTATION_NAME;
+import static com.azure.core.amqp.AmqpMessageConstant.ENQUEUED_TIME_UTC_ANNOTATION_NAME;
+import static com.azure.core.amqp.AmqpMessageConstant.LOCKED_UNTIL_KEY_ANNOTATION_NAME;
+import static com.azure.core.amqp.AmqpMessageConstant.PARTITION_KEY_ANNOTATION_NAME;
+import static com.azure.core.amqp.AmqpMessageConstant.SCHEDULED_ENQUEUE_UTC_TIME_NAME;
+import static com.azure.core.amqp.AmqpMessageConstant.SEQUENCE_NUMBER_ANNOTATION_NAME;
+import static com.azure.core.amqp.AmqpMessageConstant.VIA_PARTITION_KEY_ANNOTATION_NAME;
+
/**
* The data structure encapsulating the message being sent-to Service Bus.
*
@@ -22,7 +41,7 @@
* AMQP 1.0 specification
*
*
- *
{@link #getBody()} - if AMQPMessage.Body has Data section
*
*
@@ -34,21 +53,11 @@
* @see ServiceBusMessageBatch
*/
public class ServiceBusMessage {
- private final Map properties = new HashMap<>();
- private final byte[] body;
+ private final AmqpAnnotatedMessage amqpAnnotatedMessage;
+ private final ClientLogger logger = new ClientLogger(ServiceBusMessage.class);
+
+ private final byte[] binaryData;
private Context context;
- private String contentType;
- private String correlationId;
- private String label;
- private String messageId;
- private String partitionKey;
- private String replyTo;
- private String replyToSessionId;
- private OffsetDateTime scheduledEnqueueTime;
- private String sessionId;
- private Duration timeToLive;
- private String to;
- private String viaPartitionKey;
/**
* Creates a {@link ServiceBusMessage} with a {@link java.nio.charset.StandardCharsets#UTF_8 UTF_8} encoded body.
@@ -69,8 +78,10 @@ public ServiceBusMessage(String body) {
* @throws NullPointerException if {@code body} is {@code null}.
*/
public ServiceBusMessage(byte[] body) {
- this.body = Objects.requireNonNull(body, "'body' cannot be null.");
+ this.binaryData = Objects.requireNonNull(body, "'body' cannot be null.");
this.context = Context.NONE;
+ this.amqpAnnotatedMessage = new AmqpAnnotatedMessage(new AmqpDataBody(Collections.singletonList(
+ new BinaryData(binaryData))));
}
/**
@@ -82,32 +93,43 @@ public ServiceBusMessage(byte[] body) {
* @throws NullPointerException if {@code receivedMessage} is {@code null}.
*/
public ServiceBusMessage(ServiceBusReceivedMessage receivedMessage) {
- this.body = receivedMessage.getBody();
+ Objects.requireNonNull(receivedMessage, "'receivedMessage' cannot be null.");
+
+ this.amqpAnnotatedMessage = new AmqpAnnotatedMessage(receivedMessage.getAmqpAnnotatedMessage());
this.context = Context.NONE;
- setMessageId(receivedMessage.getMessageId());
- setScheduledEnqueueTime(receivedMessage.getScheduledEnqueueTime());
- setContentType(receivedMessage.getContentType());
- setCorrelationId(receivedMessage.getCorrelationId());
- setLabel(receivedMessage.getLabel());
- setPartitionKey(receivedMessage.getPartitionKey());
- setReplyTo(receivedMessage.getReplyTo());
- setReplyToSessionId(receivedMessage.getReplyToSessionId());
- setTimeToLive(receivedMessage.getTimeToLive());
- setTo(receivedMessage.getTo());
- setSessionId(receivedMessage.getSessionId());
- setViaPartitionKey(receivedMessage.getViaPartitionKey());
+ this.binaryData = receivedMessage.getBody();
+
+ // clean up data which user is not allowed to set.
+ amqpAnnotatedMessage.getHeader().setDeliveryCount(null);
+
+ removeValues(amqpAnnotatedMessage.getMessageAnnotations(), LOCKED_UNTIL_KEY_ANNOTATION_NAME,
+ SEQUENCE_NUMBER_ANNOTATION_NAME, DEAD_LETTER_SOURCE_KEY_ANNOTATION_NAME,
+ ENQUEUED_SEQUENCE_NUMBER_ANNOTATION_NAME, ENQUEUED_TIME_UTC_ANNOTATION_NAME);
+
+ removeValues(amqpAnnotatedMessage.getApplicationProperties(), DEAD_LETTER_DESCRIPTION_ANNOTATION_NAME,
+ DEAD_LETTER_REASON_ANNOTATION_NAME);
+
+ }
+
+ /**
+ * Gets the {@link AmqpAnnotatedMessage}.
+ *
+ * @return the amqp message.
+ */
+ public AmqpAnnotatedMessage getAmqpAnnotatedMessage() {
+ return amqpAnnotatedMessage;
}
/**
* Gets the set of free-form {@link ServiceBusMessage} properties which may be used for passing metadata associated
- * with the {@link ServiceBusMessage} during Service Bus operations. A common use-case for {@code properties()} is
- * to associate serialization hints for the {@link #getBody()} as an aid to consumers who wish to deserialize the
- * binary data.
+ * with the {@link ServiceBusMessage} during Service Bus operations. A common use-case for
+ * {@code getApplicationProperties()} is to associate serialization hints for the {@link #getBody()} as an aid to
+ * consumers who wish to deserialize the binary data.
*
* @return Application properties associated with this {@link ServiceBusMessage}.
*/
- public Map getProperties() {
- return properties;
+ public Map getApplicationProperties() {
+ return amqpAnnotatedMessage.getApplicationProperties();
}
/**
@@ -115,14 +137,25 @@ public Map getProperties() {
*
*
* If the means for deserializing the raw data is not apparent to consumers, a common technique is to make use of
- * {@link #getProperties()} when creating the event, to associate serialization hints as an aid to consumers who
- * wish to deserialize the binary data.
+ * {@link #getApplicationProperties()} when creating the event, to associate serialization hints as an aid to
+ * consumers who wish to deserialize the binary data.
*
*
* @return A byte array representing the data.
*/
public byte[] getBody() {
- return Arrays.copyOf(body, body.length);
+ final AmqpBodyType type = amqpAnnotatedMessage.getBody().getBodyType();
+ switch (type) {
+ case DATA:
+ return Arrays.copyOf(binaryData, binaryData.length);
+ case SEQUENCE:
+ case VALUE:
+ throw logger.logExceptionAsError(new UnsupportedOperationException("Not supported AmqpBodyType: "
+ + type.toString()));
+ default:
+ throw logger.logExceptionAsError(new IllegalArgumentException("Unknown AmqpBodyType: "
+ + type.toString()));
+ }
}
/**
@@ -131,7 +164,7 @@ public byte[] getBody() {
* @return the contentType of the {@link ServiceBusMessage}.
*/
public String getContentType() {
- return contentType;
+ return amqpAnnotatedMessage.getProperties().getContentType();
}
/**
@@ -142,7 +175,7 @@ public String getContentType() {
* @return The updated {@link ServiceBusMessage}.
*/
public ServiceBusMessage setContentType(String contentType) {
- this.contentType = contentType;
+ amqpAnnotatedMessage.getProperties().setContentType(contentType);
return this;
}
@@ -158,7 +191,7 @@ public ServiceBusMessage setContentType(String contentType) {
* Routing and Correlation
*/
public String getCorrelationId() {
- return correlationId;
+ return amqpAnnotatedMessage.getProperties().getCorrelationId();
}
/**
@@ -170,28 +203,28 @@ public String getCorrelationId() {
* @see #getCorrelationId()
*/
public ServiceBusMessage setCorrelationId(String correlationId) {
- this.correlationId = correlationId;
+ amqpAnnotatedMessage.getProperties().setCorrelationId(correlationId);
return this;
}
/**
- * Gets the label for the message.
+ * Gets the subject for the message.
*
- * @return The label for the message.
+ * @return The subject for the message.
*/
- public String getLabel() {
- return label;
+ public String getSubject() {
+ return amqpAnnotatedMessage.getProperties().getSubject();
}
/**
- * Sets the label for the message.
+ * Sets the subject for the message.
*
- * @param label The label to set.
+ * @param subject The subject to set.
*
* @return The updated {@link ServiceBusMessage} object.
*/
- public ServiceBusMessage setLabel(String label) {
- this.label = label;
+ public ServiceBusMessage setSubject(String subject) {
+ amqpAnnotatedMessage.getProperties().setSubject(subject);
return this;
}
@@ -199,7 +232,7 @@ public ServiceBusMessage setLabel(String label) {
* @return Id of the {@link ServiceBusMessage}.
*/
public String getMessageId() {
- return messageId;
+ return amqpAnnotatedMessage.getProperties().getMessageId();
}
/**
@@ -210,7 +243,7 @@ public String getMessageId() {
* @return The updated {@link ServiceBusMessage}.
*/
public ServiceBusMessage setMessageId(String messageId) {
- this.messageId = messageId;
+ amqpAnnotatedMessage.getProperties().setMessageId(messageId);
return this;
}
@@ -228,7 +261,7 @@ public ServiceBusMessage setMessageId(String messageId) {
* entities
*/
public String getPartitionKey() {
- return partitionKey;
+ return (String) amqpAnnotatedMessage.getMessageAnnotations().get(PARTITION_KEY_ANNOTATION_NAME.getValue());
}
/**
@@ -240,7 +273,7 @@ public String getPartitionKey() {
* @see #getPartitionKey()
*/
public ServiceBusMessage setPartitionKey(String partitionKey) {
- this.partitionKey = partitionKey;
+ amqpAnnotatedMessage.getMessageAnnotations().put(PARTITION_KEY_ANNOTATION_NAME.getValue(), partitionKey);
return this;
}
@@ -256,7 +289,7 @@ public ServiceBusMessage setPartitionKey(String partitionKey) {
* Routing and Correlation
*/
public String getReplyTo() {
- return replyTo;
+ return amqpAnnotatedMessage.getProperties().getReplyTo();
}
/**
@@ -268,7 +301,7 @@ public String getReplyTo() {
* @see #getReplyTo()
*/
public ServiceBusMessage setReplyTo(String replyTo) {
- this.replyTo = replyTo;
+ amqpAnnotatedMessage.getProperties().setReplyTo(replyTo);
return this;
}
@@ -278,7 +311,7 @@ public ServiceBusMessage setReplyTo(String replyTo) {
* @return "To" property value of this message
*/
public String getTo() {
- return to;
+ return amqpAnnotatedMessage.getProperties().getTo();
}
/**
@@ -294,7 +327,7 @@ public String getTo() {
* @return The updated {@link ServiceBusMessage}.
*/
public ServiceBusMessage setTo(String to) {
- this.to = to;
+ amqpAnnotatedMessage.getProperties().setTo(to);
return this;
}
@@ -311,7 +344,7 @@ public ServiceBusMessage setTo(String to) {
* @see Message Expiration
*/
public Duration getTimeToLive() {
- return timeToLive;
+ return amqpAnnotatedMessage.getHeader().getTimeToLive();
}
/**
@@ -323,7 +356,7 @@ public Duration getTimeToLive() {
* @see #getTimeToLive()
*/
public ServiceBusMessage setTimeToLive(Duration timeToLive) {
- this.timeToLive = timeToLive;
+ amqpAnnotatedMessage.getHeader().setTimeToLive(timeToLive);
return this;
}
@@ -341,11 +374,16 @@ public ServiceBusMessage setTimeToLive(Duration timeToLive) {
* Timestamps
*/
public OffsetDateTime getScheduledEnqueueTime() {
- return scheduledEnqueueTime;
+ Object value = amqpAnnotatedMessage.getMessageAnnotations().get(SCHEDULED_ENQUEUE_UTC_TIME_NAME.getValue());
+ return value != null
+ ? ((Date) value).toInstant().atOffset(ZoneOffset.UTC)
+ : null;
}
/**
- * Sets the scheduled enqueue time of this message.
+ * Sets the scheduled enqueue time of this message. A {@code null} will not be set. If this value needs to be unset
+ * it could be done by value removing from {@link AmqpAnnotatedMessage#getMessageAnnotations()} using key
+ * {@link AmqpMessageConstant#SCHEDULED_ENQUEUE_UTC_TIME_NAME}.
*
* @param scheduledEnqueueTime the datetime at which this message should be enqueued in Azure Service Bus.
*
@@ -353,7 +391,10 @@ public OffsetDateTime getScheduledEnqueueTime() {
* @see #getScheduledEnqueueTime()
*/
public ServiceBusMessage setScheduledEnqueueTime(OffsetDateTime scheduledEnqueueTime) {
- this.scheduledEnqueueTime = scheduledEnqueueTime;
+ if (scheduledEnqueueTime != null) {
+ amqpAnnotatedMessage.getMessageAnnotations().put(SCHEDULED_ENQUEUE_UTC_TIME_NAME.getValue(),
+ scheduledEnqueueTime);
+ }
return this;
}
@@ -368,7 +409,7 @@ public ServiceBusMessage setScheduledEnqueueTime(OffsetDateTime scheduledEnqueue
* Routing and Correlation
*/
public String getReplyToSessionId() {
- return replyToSessionId;
+ return amqpAnnotatedMessage.getProperties().getReplyToGroupId();
}
/**
@@ -379,7 +420,7 @@ public String getReplyToSessionId() {
* @return The updated {@link ServiceBusMessage}.
*/
public ServiceBusMessage setReplyToSessionId(String replyToSessionId) {
- this.replyToSessionId = replyToSessionId;
+ amqpAnnotatedMessage.getProperties().setReplyToGroupId(replyToSessionId);
return this;
}
@@ -395,7 +436,7 @@ public ServiceBusMessage setReplyToSessionId(String replyToSessionId) {
* and Send Via
*/
public String getViaPartitionKey() {
- return viaPartitionKey;
+ return (String) amqpAnnotatedMessage.getMessageAnnotations().get(VIA_PARTITION_KEY_ANNOTATION_NAME.getValue());
}
/**
@@ -407,7 +448,7 @@ public String getViaPartitionKey() {
* @see #getViaPartitionKey()
*/
public ServiceBusMessage setViaPartitionKey(String viaPartitionKey) {
- this.viaPartitionKey = viaPartitionKey;
+ amqpAnnotatedMessage.getMessageAnnotations().put(VIA_PARTITION_KEY_ANNOTATION_NAME.getValue(), viaPartitionKey);
return this;
}
@@ -417,7 +458,7 @@ public ServiceBusMessage setViaPartitionKey(String viaPartitionKey) {
* @return Session Id of the {@link ServiceBusMessage}.
*/
public String getSessionId() {
- return sessionId;
+ return amqpAnnotatedMessage.getProperties().getGroupId();
}
/**
@@ -428,7 +469,7 @@ public String getSessionId() {
* @return The updated {@link ServiceBusMessage}.
*/
public ServiceBusMessage setSessionId(String sessionId) {
- this.sessionId = sessionId;
+ amqpAnnotatedMessage.getProperties().setGroupId(sessionId);
return this;
}
@@ -458,4 +499,13 @@ public ServiceBusMessage addContext(String key, Object value) {
return this;
}
+
+ /*
+ * Gets value from given map.
+ */
+ private void removeValues(Map dataMap, AmqpMessageConstant... keys) {
+ for (AmqpMessageConstant key : keys) {
+ dataMap.remove(key.getValue());
+ }
+ }
}
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessageBatch.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessageBatch.java
index d4c94b1dcb775..57d5f363fc8da 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessageBatch.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessageBatch.java
@@ -144,7 +144,8 @@ private ServiceBusMessage traceMessageSpan(ServiceBusMessage serviceBusMessage)
Context eventSpanContext = tracerProvider.startSpan(serviceBusMessage.getContext(), ProcessKind.MESSAGE);
Optional