diff --git a/eng/code-quality-reports/src/main/resources/revapi/revapi.json b/eng/code-quality-reports/src/main/resources/revapi/revapi.json index 26e55d5d88a7c..fa2c1d3d77397 100644 --- a/eng/code-quality-reports/src/main/resources/revapi/revapi.json +++ b/eng/code-quality-reports/src/main/resources/revapi/revapi.json @@ -269,6 +269,92 @@ "code": "java.annotation.added", "new": "class com.azure.storage.blob.models.PageList", "justification": "Annotation required to resolve deserialization bug." + },{ + "code": "java.method.returnTypeChanged", + "old": "method com.azure.core.util.IterableStream com.azure.core.amqp.models.AmqpDataBody::getData()", + "new": "method java.util.List com.azure.core.amqp.models.AmqpDataBody::getData()", + "justification": "Updated " + }, + { + "code": "java.class.removed", + "old": "class com.azure.core.amqp.models.AmqpDataBody", + "justification": "Renamed as AmqpMessageBody." + }, + { + "code": "java.class.kindChanged", + "old": "interface com.azure.core.amqp.models.AmqpMessageBody", + "new": "class com.azure.core.amqp.models.AmqpMessageBody", + "justification": "AmqpMessageBody is class representing all the new AMQP data types." + }, + { + "code": "java.class.nowFinal", + "old": "interface com.azure.core.amqp.models.AmqpMessageBody", + "new": "class com.azure.core.amqp.models.AmqpMessageBody", + "justification": "Made it final." + }, + { + "code": "java.class.removed", + "old": "enum com.azure.core.amqp.models.AmqpBodyType", + "justification": "Because It is renamed to AmqpMessageBodyType" + }, + { + "code": "java.method.returnTypeChanged", + "old": "method com.azure.core.amqp.models.AmqpBodyType com.azure.core.amqp.models.AmqpMessageBody::getBodyType()", + "new": "method com.azure.core.amqp.models.AmqpMessageBodyType com.azure.core.amqp.models.AmqpMessageBody::getBodyType()", + "justification": "Renamed to match AmqpMessage prefix." + }, + { + "code": "java.method.returnTypeChanged", + "old": "method java.lang.String com.azure.core.amqp.models.AmqpMessageProperties::getCorrelationId()", + "new": "method com.azure.core.amqp.models.AmqpMessageId com.azure.core.amqp.models.AmqpMessageProperties::getCorrelationId()", + "justification": "New return type AmqpMessageId." + }, + { + "code": "java.method.returnTypeChanged", + "old": "method java.lang.String com.azure.core.amqp.models.AmqpMessageProperties::getMessageId()", + "new": "method com.azure.core.amqp.models.AmqpMessageId com.azure.core.amqp.models.AmqpMessageProperties::getMessageId()", + "justification": "New return type." + }, + { + "code": "java.method.returnTypeChanged", + "old": "method java.lang.String com.azure.core.amqp.models.AmqpMessageProperties::getReplyTo()", + "new": "method com.azure.core.amqp.models.AmqpAddress com.azure.core.amqp.models.AmqpMessageProperties::getReplyTo()", + "justification": "New return type." + }, + { + "code": "java.method.returnTypeChanged", + "old": "method java.lang.String com.azure.core.amqp.models.AmqpMessageProperties::getTo()", + "new": "method com.azure.core.amqp.models.AmqpAddress com.azure.core.amqp.models.AmqpMessageProperties::getTo()", + "justification": "New return type." + }, + { + "code": "java.method.parameterTypeChanged", + "old": "parameter com.azure.core.amqp.models.AmqpMessageProperties com.azure.core.amqp.models.AmqpMessageProperties::setCorrelationId(===java.lang.String===)", + "new": "parameter com.azure.core.amqp.models.AmqpMessageProperties com.azure.core.amqp.models.AmqpMessageProperties::setCorrelationId(===com.azure.core.amqp.models.AmqpMessageId===)", + "justification": "Introduced new type AmqpMessageId." + }, + { + "code": "java.method.parameterTypeChanged", + "old": "parameter com.azure.core.amqp.models.AmqpMessageProperties com.azure.core.amqp.models.AmqpMessageProperties::setMessageId(===java.lang.String===)", + "new": "parameter com.azure.core.amqp.models.AmqpMessageProperties com.azure.core.amqp.models.AmqpMessageProperties::setMessageId(===com.azure.core.amqp.models.AmqpMessageId===)", + "justification":"Introduced new type AmqpMessageId." + }, + { + "code": "java.method.parameterTypeChanged", + "old": "parameter com.azure.core.amqp.models.AmqpMessageProperties com.azure.core.amqp.models.AmqpMessageProperties::setReplyTo(===java.lang.String===)", + "new": "parameter com.azure.core.amqp.models.AmqpMessageProperties com.azure.core.amqp.models.AmqpMessageProperties::setReplyTo(===com.azure.core.amqp.models.AmqpAddress===)", + "justification": "Introduced new type AmqpAddress." + }, + { + "code": "java.method.parameterTypeChanged", + "old": "parameter com.azure.core.amqp.models.AmqpMessageProperties com.azure.core.amqp.models.AmqpMessageProperties::setTo(===java.lang.String===)", + "new": "parameter com.azure.core.amqp.models.AmqpMessageProperties com.azure.core.amqp.models.AmqpMessageProperties::setTo(===com.azure.core.amqp.models.AmqpAddress===)", + "justification": "Introduced new type AmqpAddress." + }, + { + "code": "java.method.removed", + "old": "method void com.azure.core.amqp.models.AmqpAnnotatedMessage::(com.azure.core.amqp.models.AmqpAnnotatedMessage)", + "justification": "Removed copy constructor, It is not required for Service bus message." } ] } diff --git a/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml b/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml index eb0ead6266a73..5f2bbda2112b8 100755 --- a/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml +++ b/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml @@ -2297,7 +2297,7 @@ - + @@ -2430,4 +2430,12 @@ + + + + + + diff --git a/eng/versioning/version_client.txt b/eng/versioning/version_client.txt index 8afac97e57369..23fc1076eabcc 100644 --- a/eng/versioning/version_client.txt +++ b/eng/versioning/version_client.txt @@ -177,6 +177,7 @@ com.microsoft:microsoft-opentelemetry-exporter-azuremonitor;1.0.0-beta.1;1.0.0-b # unreleased_:;dependency-version # note: The unreleased dependencies will not be manipulated with the automatic PR creation code. unreleased_com.azure:azure-core-experimental;1.0.0-beta.9 +unreleased_com.azure:azure-core-amqp;1.7.0-beta.3 unreleased_com.azure:azure-messaging-servicebus;7.0.0-beta.7 # Released Beta dependencies: Copy the entry from above, prepend "beta_", remove the current diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpAddress.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpAddress.java new file mode 100644 index 0000000000000..4d2e33a2f5ef4 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpAddress.java @@ -0,0 +1,66 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.models; + +import java.util.Objects; + +/** + * This represents amqp address information. This will be used in populating information like 'To', 'ReplyTo' etc. + * + *

Create and retrieve address

+ * {@codesnippet com.azure.core.amqp.models.AmqpAddress.createAndGet} + * + * @see + * Address type Format. + */ +public final class AmqpAddress { + + private final String address; + + /** + * Creates the {@link AmqpAddress} with given {@code address}. + * + * @param address The address to set for this instance. + * @throws NullPointerException if {@code address} is null. + */ + public AmqpAddress(String address) { + this.address = Objects.requireNonNull(address, "'address' cannot be null."); + } + + /** + * {@inheritDoc} + */ + @Override + public int hashCode() { + return address.hashCode(); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + + if (this.getClass() != other.getClass()) { + return false; + } + + if (this == other) { + return true; + } + + return Objects.equals(address, other.toString()); + } + + /** + * {@inheritDoc} + */ + @Override + public String toString() { + return this.address; + } +} diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpAnnotatedMessage.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpAnnotatedMessage.java index a07e930123180..6e2d2a8b394e7 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpAnnotatedMessage.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpAnnotatedMessage.java @@ -10,8 +10,9 @@ /** * The representation of message as defined by AMQP protocol. * - * @see - * Amqp Message Format. + * @see + * Amqp Message Format + * @see AmqpMessageBody */ public final class AmqpAnnotatedMessage { private final AmqpMessageBody amqpMessageBody; @@ -31,7 +32,6 @@ public final class AmqpAnnotatedMessage { */ public AmqpAnnotatedMessage(AmqpMessageBody body) { amqpMessageBody = Objects.requireNonNull(body, "'body' cannot be null."); - applicationProperties = new HashMap<>(); deliveryAnnotations = new HashMap<>(); messageAnnotations = new HashMap<>(); @@ -40,24 +40,6 @@ public AmqpAnnotatedMessage(AmqpMessageBody body) { properties = new AmqpMessageProperties(); } - /** - * Creates instance of {@link AmqpAnnotatedMessage} with given {@link AmqpAnnotatedMessage} instance. - * - * @param message used to create another instance of {@link AmqpAnnotatedMessage}. - * - * @throws NullPointerException if {@code message} or {@link AmqpAnnotatedMessage#getBody() body} is null. - */ - public AmqpAnnotatedMessage(AmqpAnnotatedMessage message) { - Objects.requireNonNull(message, "'message' cannot be null."); - amqpMessageBody = Objects.requireNonNull(message.getBody(), "'message.body' cannot be null."); - applicationProperties = new HashMap<>(message.getApplicationProperties()); - deliveryAnnotations = new HashMap<>(message.getDeliveryAnnotations()); - messageAnnotations = new HashMap<>(message.getMessageAnnotations()); - footer = new HashMap<>(message.getFooter()); - header = new AmqpMessageHeader(message.getHeader()); - properties = new AmqpMessageProperties(message.getProperties()); - } - /** * Gets the {@link Map} of application properties. * @@ -71,6 +53,7 @@ public Map getApplicationProperties() { * Gets the {@link AmqpMessageBody} of an amqp message. * * @return the {@link AmqpMessageBody} object. + * @see AmqpMessageBody */ public AmqpMessageBody getBody() { return amqpMessageBody; diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpBodyType.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpBodyType.java deleted file mode 100644 index 80c66af356087..0000000000000 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpBodyType.java +++ /dev/null @@ -1,23 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.azure.core.amqp.models; - -/** - * All AmqpBodyType available for AMQP Message. - */ -public enum AmqpBodyType { - /** - * Represent Amqp Data type - */ - DATA, - /** - * Represent Amqp Value type - */ - VALUE, - /** - * Represent Amqp Sequence type - */ - SEQUENCE; - -} diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpDataBody.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpDataBody.java deleted file mode 100644 index c26eb2f1e09ae..0000000000000 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpDataBody.java +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.azure.core.amqp.models; - -import com.azure.core.util.IterableStream; - -import java.util.Objects; - -/** - * This is amqp message body which represents {@link AmqpBodyType#DATA} type. - */ -public final class AmqpDataBody implements AmqpMessageBody { - private final IterableStream data; - - /** - * Creates instance of {@link AmqpDataBody} with given {@link Iterable} of byte array. - * - * @param data to be set on amqp body. - * - * @throws NullPointerException if {@code data} is null. - */ - public AmqpDataBody(Iterable data) { - Objects.requireNonNull(data, "'data' cannot be null."); - this.data = new IterableStream<>(data); - } - - @Override - public AmqpBodyType getBodyType() { - return AmqpBodyType.DATA; - } - - /** - * Gets byte array set on this {@link AmqpDataBody}. - * - * @return data set on {@link AmqpDataBody}. - */ - public IterableStream getData() { - return data; - } -} diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageBody.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageBody.java index a984fb566720d..583755fa9396a 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageBody.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageBody.java @@ -3,14 +3,114 @@ package com.azure.core.amqp.models; +import com.azure.core.util.IterableStream; +import com.azure.core.util.logging.ClientLogger; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + /** - * Interface representing Amqp Message Body. + * This class encapsulates the body of a message. The {@link AmqpMessageBodyType} map to an AMQP specification message + * body types. Current implementation only support {@link AmqpMessageBodyType#DATA DATA} AMQP data type. Track this + * issue to find out support for + * other AMQP types. + * + *

Client should test for {@link AmqpMessageBodyType} before calling corresponding get method. Get methods not + * corresponding to the type of the body throws exception.

+ * + *

How to check for {@link AmqpMessageBodyType}

+ * {@codesnippet com.azure.core.amqp.models.AmqpBodyType.checkBodyType} + * + * @see AmqpMessageBodyType */ -public interface AmqpMessageBody { +public final class AmqpMessageBody { + private final ClientLogger logger = new ClientLogger(AmqpMessageBody.class); + private AmqpMessageBodyType bodyType; + + // We expect user to call `getFirstData()` more because we support one byte[] as present. + // This the priority here to store payload as `byte[] data` and + private byte[] data; + private List dataList; + + private AmqpMessageBody() { + // private constructor so no one outside can create instance of this except classes im this package. + } + + /** + * Creates instance of {@link AmqpMessageBody} with given byte array. + * + * @param data used to create another instance of {@link AmqpMessageBody}. + * + * @return AmqpMessageBody Newly created instance. + * + * @throws NullPointerException if {@code data} is null. + */ + public static AmqpMessageBody fromData(byte[] data) { + Objects.requireNonNull(data, "'data' cannot be null."); + AmqpMessageBody body = new AmqpMessageBody(); + body.bodyType = AmqpMessageBodyType.DATA; + body.data = data; + return body; + } + + /** + * Gets the {@link AmqpMessageBodyType} of the message. + *

How to check for {@link AmqpMessageBodyType}

+ * {@codesnippet com.azure.core.amqp.models.AmqpBodyType.checkBodyType} + * @return AmqpBodyType type of the message. + */ + public AmqpMessageBodyType getBodyType() { + return bodyType; + } + + /** + * Gets an {@link IterableStream} of byte array containing only first byte array set on this + * {@link AmqpMessageBody}. This library only support one byte array at present, so the returned list will have only + * one element. + *

Client should test for {@link AmqpMessageBodyType} before calling corresponding get method. Get methods not + * corresponding to the type of the body throws exception.

+ * + *

How to check for {@link AmqpMessageBodyType}

+ * {@codesnippet com.azure.core.amqp.models.AmqpBodyType.checkBodyType} + * @return data set on {@link AmqpMessageBody}. + * + * @throws IllegalArgumentException If {@link AmqpMessageBodyType} is not {@link AmqpMessageBodyType#DATA DATA}. + */ + public IterableStream getData() { + if (bodyType != AmqpMessageBodyType.DATA) { + throw logger.logExceptionAsError(new IllegalArgumentException( + "This method can only be called for AMQP Data body type at present. Track this issue, " + + "https://github.com/Azure/azure-sdk-for-java/issues/17614 for other body type support in " + + "future.")); + } + if (dataList == null) { + dataList = Collections.singletonList(data); + } + return new IterableStream<>(dataList); + } + /** - * Type representing various supported amqp body types. + * Gets first byte array set on this {@link AmqpMessageBody}. This library only support one byte array on Amqp + * Message at present. + *

Client should test for {@link AmqpMessageBodyType} before calling corresponding get method. Get methods not + * corresponding to the type of the body throws exception.

+ * + *

How to check for {@link AmqpMessageBodyType}

+ * {@codesnippet com.azure.core.amqp.models.AmqpBodyType.checkBodyType} + * @return data set on {@link AmqpMessageBody}. * - * @return The {@link AmqpBodyType}. + * @throws IllegalArgumentException If {@link AmqpMessageBodyType} is not {@link AmqpMessageBodyType#DATA DATA}. + * @see + * Amqp Message Format. */ - AmqpBodyType getBodyType(); + public byte[] getFirstData() { + if (bodyType != AmqpMessageBodyType.DATA) { + throw logger.logExceptionAsError(new IllegalArgumentException( + "This method can only be called for AMQP Data body type at present. Track this issue, " + + "https://github.com/Azure/azure-sdk-for-java/issues/17614 for other body type support in " + + "future.")); + } + return data; + } } diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageBodyType.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageBodyType.java new file mode 100644 index 0000000000000..55119fd7edb33 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageBodyType.java @@ -0,0 +1,33 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.models; + +/** + * Represents all valid {@link AmqpMessageBodyType} for an AMQP Message. Current SDK only support + * {@link AmqpMessageBodyType#DATA DATA} AMQP data type. Track this issue + * to find out support for other AMQP types. + * + *

Types of Amqp message body

+ * + + */ +public enum AmqpMessageBodyType { + /** + * Message content is byte array, equivalent to AMQP Data. + */ + DATA, + /** + * Message content is a single object, equivalent to AMQP Value. The object must be of a type supported by AMQP. + */ + VALUE, + /** + * Message content is a list of objects, equivalent to AMQP Sequence. Each object must be of a type supported + * by AMQP. + */ + SEQUENCE +} diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageHeader.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageHeader.java index e15a482558f3a..cc26df5f755f3 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageHeader.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageHeader.java @@ -6,11 +6,10 @@ import com.azure.core.annotation.Fluent; import java.time.Duration; -import java.util.Objects; /** * The representation of message header as defined by AMQP protocol. - * @see + * @see * Amqp Message Format. */ @Fluent @@ -26,19 +25,6 @@ public class AmqpMessageHeader { // This class does not have any public constructors, and is not able to be instantiated using 'new'. } - /** - * The constructor is used to clone the values. - */ - AmqpMessageHeader(AmqpMessageHeader header) { - super(); - Objects.requireNonNull(header, "'header' cannot be null."); - deliveryCount = header.getDeliveryCount(); - durable = header.isDurable(); - firstAcquirer = header.isFirstAcquirer(); - timeToLive = header.getTimeToLive(); - priority = header.getPriority(); - } - /** * Gets the delivery count from amqp message header. * diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageId.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageId.java new file mode 100644 index 0000000000000..7145716328487 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageId.java @@ -0,0 +1,66 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.models; + +import java.util.Objects; + +/** + * This represents Message id. Amqp specification support message id in various types. This class only implements + * {@link String} representation at present. + * + *

Create and retrieve message id

+ * {@codesnippet com.azure.core.amqp.models.AmqpMessageId.createAndGet} + * + * @see + * String + */ +public final class AmqpMessageId { + private final String messageId; + + /** + * Creates the {@link AmqpMessageId} with given {@code messageId}. + * + * @param messageId representing id of the message. + * @throws NullPointerException if {@code messageId} is null. + */ + public AmqpMessageId(String messageId) { + this.messageId = Objects.requireNonNull(messageId, "'messageId' cannot be null."); + } + + /** + * {@inheritDoc} + */ + @Override + public int hashCode() { + return messageId.hashCode(); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + + if (this.getClass() != other.getClass()) { + return false; + } + + if (this == other) { + return true; + } + + return messageId.equals(other.toString()); + } + + /** + * {@inheritDoc} + */ + @Override + public String toString() { + return this.messageId; + } +} diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageProperties.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageProperties.java index 29e7595e5b417..db716e4338406 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageProperties.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageProperties.java @@ -7,12 +7,11 @@ import java.time.OffsetDateTime; import java.util.Arrays; -import java.util.Objects; /** * The representation of message properties as defined by AMQP protocol. * - * @see + * @see * Amqp Message Format. */ @Fluent @@ -21,14 +20,14 @@ public class AmqpMessageProperties { private OffsetDateTime absoluteExpiryTime; private String contentEncoding; private String contentType; - private String correlationId; + private AmqpMessageId correlationId; private OffsetDateTime creationTime; private String groupId; private Long groupSequence; - private String messageId; + private AmqpMessageId messageId; private String replyToGroupId; - private String replyTo; - private String to; + private AmqpAddress replyTo; + private AmqpAddress to; private String subject; private byte[] userId; @@ -36,27 +35,6 @@ public class AmqpMessageProperties { // This class does not have any public constructors, and is not able to be instantiated using 'new'. } - /** - * The constructor is used to clone the values. - */ - AmqpMessageProperties(AmqpMessageProperties properties) { - super(); - Objects.requireNonNull(properties, "'properties' cannot be null."); - absoluteExpiryTime = properties.getAbsoluteExpiryTime(); - contentEncoding = properties.getContentEncoding(); - contentType = properties.getContentType(); - correlationId = properties.getCorrelationId(); - creationTime = properties.getCreationTime(); - groupId = properties.getGroupId(); - groupSequence = properties.getGroupSequence(); - messageId = properties.getMessageId(); - replyToGroupId = properties.getReplyToGroupId(); - replyTo = properties.getReplyTo(); - to = properties.getTo(); - subject = properties.getSubject(); - userId = properties.getUserId(); - } - /** * Gets {@code absoluteExpiryTime} from amqp message properties. * @@ -124,7 +102,7 @@ public AmqpMessageProperties setContentType(String contentType) { * * @return the {@code correlationId} value. */ - public String getCorrelationId() { + public AmqpMessageId getCorrelationId() { return correlationId; } @@ -135,7 +113,7 @@ public String getCorrelationId() { * * @return updated {@link AmqpMessageProperties} object. */ - public AmqpMessageProperties setCorrelationId(String correlationId) { + public AmqpMessageProperties setCorrelationId(AmqpMessageId correlationId) { this.correlationId = correlationId; return this; } @@ -208,7 +186,7 @@ public AmqpMessageProperties setGroupSequence(Long groupSequence) { * * @return the {@code messageId} value. */ - public String getMessageId() { + public AmqpMessageId getMessageId() { return messageId; } @@ -219,7 +197,7 @@ public String getMessageId() { * * @return updated {@link AmqpMessageProperties} object. */ - public AmqpMessageProperties setMessageId(String messageId) { + public AmqpMessageProperties setMessageId(AmqpMessageId messageId) { this.messageId = messageId; return this; } @@ -229,7 +207,7 @@ public AmqpMessageProperties setMessageId(String messageId) { * * @return The {@code replyTo} value. */ - public String getReplyTo() { + public AmqpAddress getReplyTo() { return replyTo; } @@ -240,7 +218,7 @@ public String getReplyTo() { * * @return updated {@link AmqpMessageProperties} object. */ - public AmqpMessageProperties setReplyTo(String replyTo) { + public AmqpMessageProperties setReplyTo(AmqpAddress replyTo) { this.replyTo = replyTo; return this; } @@ -292,7 +270,7 @@ public AmqpMessageProperties setSubject(String subject) { * * @return the {@code to} value. */ - public String getTo() { + public AmqpAddress getTo() { return to; } @@ -303,7 +281,7 @@ public String getTo() { * * @return updated {@link AmqpMessageProperties} object. */ - public AmqpMessageProperties setTo(String to) { + public AmqpMessageProperties setTo(AmqpAddress to) { this.to = to; return this; } diff --git a/sdk/core/azure-core-amqp/src/samples/java/com/azure/core/amqp/models/AmqpAnnotatedMessageJavaDocCodeSamples.java b/sdk/core/azure-core-amqp/src/samples/java/com/azure/core/amqp/models/AmqpAnnotatedMessageJavaDocCodeSamples.java new file mode 100644 index 0000000000000..a37e5b9a10c6d --- /dev/null +++ b/sdk/core/azure-core-amqp/src/samples/java/com/azure/core/amqp/models/AmqpAnnotatedMessageJavaDocCodeSamples.java @@ -0,0 +1,58 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.models; + +/** + * Class contains sample code snippets that will be used in javadocs. + */ +public class AmqpAnnotatedMessageJavaDocCodeSamples { + /** + * Get message body from {@link AmqpAnnotatedMessage}. + */ + public void checkBodyType() { + AmqpAnnotatedMessage amqpAnnotatedMessage = null; + // BEGIN: com.azure.core.amqp.models.AmqpBodyType.checkBodyType + // If client do not check `AmqpMessageBody.getBodyType()` and payload is not of type `AmqpMessageBodyType.DATA`, + // calling `getFirstData()` or `getData()` on `AmqpMessageBody` will throw Runtime exception. + // https://github.com/Azure/azure-sdk-for-java/issues/17614 : This issue tracks additional AMQP body type + // support in future. + + byte[] payload = null; + AmqpMessageBodyType bodyType = amqpAnnotatedMessage.getBody().getBodyType(); + switch (bodyType) { + case DATA: + payload = amqpAnnotatedMessage.getBody().getFirstData(); + break; + case SEQUENCE: + case VALUE: + throw new RuntimeException("Body type not supported yet."); + default: + throw new RuntimeException("Body type not valid."); + } + System.out.println(new String(payload)); + // END: com.azure.core.amqp.models.AmqpBodyType.checkBodyType + } + + public void address() { + // BEGIN: com.azure.core.amqp.models.AmqpAddress.createAndGet + AmqpAddress amqpAddress = new AmqpAddress("my-address"); + // Retrieve Adderss + String address = amqpAddress.toString(); + System.out.println("Address " + address); + // END: com.azure.core.amqp.models.AmqpAddress.createAndGet + } + + /** + * Get message body from {@link AmqpMessageId}. + */ + public void messageId() { + // BEGIN: com.azure.core.amqp.models.AmqpMessageId.createAndGet + AmqpMessageId messageId = new AmqpMessageId("my-message-id"); + // Retrieve Message id + String id = messageId.toString(); + System.out.println("Message Id " + id); + // END: com.azure.core.amqp.models.AmqpMessageId.createAndGet + } + +} diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/models/AmqpAddressTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/models/AmqpAddressTest.java new file mode 100644 index 0000000000000..05d8e50c2bf81 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/models/AmqpAddressTest.java @@ -0,0 +1,48 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.models; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +/** + * Test for {@link AmqpAddress}. + */ +public class AmqpAddressTest { + + /** + * Verifies {@link AmqpAddress} constructor for null values. + */ + @Test + public void constructorNullValue() { + // Arrange, Act & Assert + Assertions.assertThrows(NullPointerException.class, () -> new AmqpAddress(null)); + } + + /** + * Verifies {@link AmqpAddress} with same value makes instances equal. + */ + @Test + public void equalityTest() { + // Arrange + final AmqpAddress address1 = new AmqpAddress("a"); + final AmqpAddress address2 = new AmqpAddress("a"); + + // Act & Assert + Assertions.assertEquals(address1, address2); + } + + /** + * Verifies {@link AmqpAddress} , two different values are not equal. + */ + @Test + public void nonEqualityTest() { + // Arrange + final AmqpAddress address1 = new AmqpAddress("a"); + final AmqpAddress address2 = new AmqpAddress("a1"); + + // Act & Assert + Assertions.assertNotEquals(address1, address2); + } +} diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/models/AmqpAnnotatedMessageTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/models/AmqpAnnotatedMessageTest.java index ce6f84bba3ea3..435386d93d0ce 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/models/AmqpAnnotatedMessageTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/models/AmqpAnnotatedMessageTest.java @@ -8,17 +8,10 @@ import org.junit.jupiter.api.Test; import java.nio.charset.StandardCharsets; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNotSame; /** * Test class for {@link AmqpAnnotatedMessage} @@ -28,115 +21,19 @@ public class AmqpAnnotatedMessageTest { private static final byte[] CONTENTS_BYTES = "Some-contents".getBytes(StandardCharsets.UTF_8); private final ClientLogger logger = new ClientLogger(AmqpAnnotatedMessageTest.class); - /** - * Verifies we correctly set values via copy constructor for {@link AmqpAnnotatedMessage} and create new - * instances of the properties. - */ - @Test - public void copyConstructorTest() { - // Arrange - final int expectedBinaryDataSize = 1; - List expectedBinaryData = new ArrayList<>(); - expectedBinaryData.add(CONTENTS_BYTES); - - final AmqpDataBody amqpDataBody = new AmqpDataBody(expectedBinaryData); - final AmqpAnnotatedMessage expected = new AmqpAnnotatedMessage(amqpDataBody); - final Map expectedMessageAnnotations = expected.getMessageAnnotations(); - expectedMessageAnnotations.put("ma-1", "ma-value1"); - - final Map expectedDeliveryAnnotations = expected.getDeliveryAnnotations(); - expectedDeliveryAnnotations.put("da-1", "da-value1"); - - final Map expectedApplicationProperties = expected.getApplicationProperties(); - expectedApplicationProperties.put("ap-1", "ap-value1"); - - final Map expectedFooter = expected.getFooter(); - expectedFooter.put("foo-1", "foo-value1"); - - final AmqpMessageProperties expectedMessageProperties = expected.getProperties(); - expectedMessageProperties.setGroupSequence(2L); - expectedMessageProperties.setContentEncoding("content-enc"); - expectedMessageProperties.setReplyToGroupId("a"); - expectedMessageProperties.setReplyTo("b"); - expectedMessageProperties.setCorrelationId("c"); - expectedMessageProperties.setSubject("d"); - expectedMessageProperties.setMessageId("id"); - - final AmqpMessageHeader expectedMessageHeader = expected.getHeader(); - expectedMessageHeader.setDeliveryCount(5L); - expectedMessageHeader.setTimeToLive(Duration.ofSeconds(20)); - expectedMessageHeader.setPriority(Short.valueOf("4")); - - final AmqpAnnotatedMessage actual = new AmqpAnnotatedMessage(expected); - - // Act - // Now update the values after we have created AmqpAnnotatedMessage using copy constructor. - expectedDeliveryAnnotations.remove("da-1"); - expectedApplicationProperties.put("ap-2", "ap-value2"); - expectedFooter.remove("foo-1"); - expected.getHeader().setDeliveryCount(Long.valueOf(100)); - expectedBinaryData = new ArrayList<>(); - - // Assert - // Ensure the memory references are not same. - assertNotSame(expected.getProperties(), actual.getProperties()); - assertNotSame(expected.getApplicationProperties(), actual.getApplicationProperties()); - assertNotSame(expected.getDeliveryAnnotations(), actual.getDeliveryAnnotations()); - assertNotSame(expected.getFooter(), actual.getFooter()); - assertNotSame(expected.getHeader(), actual.getHeader()); - assertNotSame(expected.getMessageAnnotations(), actual.getMessageAnnotations()); - assertNotSame(expected.getProperties().getUserId(), actual.getProperties().getUserId()); - assertNotSame(expected.getHeader().getDeliveryCount(), actual.getHeader().getDeliveryCount()); - - assertEquals(1, actual.getDeliveryAnnotations().size()); - assertEquals(1, actual.getApplicationProperties().size()); - assertEquals(1, actual.getFooter().size()); - - assertEquals(expectedMessageProperties.getGroupSequence(), actual.getProperties().getGroupSequence()); - assertEquals(expectedMessageProperties.getContentEncoding(), actual.getProperties().getContentEncoding()); - assertEquals(expectedMessageProperties.getReplyToGroupId(), actual.getProperties().getReplyToGroupId()); - assertEquals(expectedMessageProperties.getReplyTo(), actual.getProperties().getReplyTo()); - assertEquals(expectedMessageProperties.getCorrelationId(), actual.getProperties().getCorrelationId()); - assertEquals(expectedMessageProperties.getSubject(), actual.getProperties().getSubject()); - assertEquals(expectedMessageProperties.getMessageId(), actual.getProperties().getMessageId()); - - assertEquals(expectedMessageHeader.getTimeToLive(), actual.getHeader().getTimeToLive()); - assertEquals(expectedMessageHeader.getPriority(), actual.getHeader().getPriority()); - - assertMessageBody(expectedBinaryDataSize, CONTENTS_BYTES, actual); - } - /** * Verifies we correctly set values via constructor for {@link AmqpAnnotatedMessage}. */ @Test public void constructorValidValues() { // Arrange - final List expectedBinaryData = Collections.singletonList(CONTENTS_BYTES); - final AmqpDataBody amqpDataBody = new AmqpDataBody(expectedBinaryData); - - // Act - final AmqpAnnotatedMessage actual = new AmqpAnnotatedMessage(amqpDataBody); - - // Assert - assertMessageCreation(AmqpBodyType.DATA, expectedBinaryData.size(), actual); - } - - /** - * Verifies we correctly set values via constructor for {@link AmqpAnnotatedMessage}. - */ - @Test - public void constructorAmqpValidValues() { - // Arrange - final List expectedBinaryData = Collections.singletonList(CONTENTS_BYTES); - final AmqpDataBody amqpDataBody = new AmqpDataBody(expectedBinaryData); - final AmqpAnnotatedMessage expected = new AmqpAnnotatedMessage(amqpDataBody); + final AmqpMessageBody amqpMessageBody = AmqpMessageBody.fromData(CONTENTS_BYTES); // Act - final AmqpAnnotatedMessage actual = new AmqpAnnotatedMessage(expected); + final AmqpAnnotatedMessage actual = new AmqpAnnotatedMessage(amqpMessageBody); // Assert - assertMessageCreation(AmqpBodyType.DATA, expectedBinaryData.size(), actual); + assertMessageCreation(AmqpMessageBodyType.DATA, actual); } /** @@ -145,13 +42,13 @@ public void constructorAmqpValidValues() { @Test public void constructorNullValidValues() { // Arrange - final AmqpDataBody body = null; + final AmqpMessageBody body = null; // Act & Assert Assertions.assertThrows(NullPointerException.class, () -> new AmqpAnnotatedMessage(body)); } - private void assertMessageCreation(AmqpBodyType expectedType, int expectedMessageSize, AmqpAnnotatedMessage actual) { + private void assertMessageCreation(AmqpMessageBodyType expectedType, AmqpAnnotatedMessage actual) { assertEquals(expectedType, actual.getBody().getBodyType()); assertNotNull(actual.getProperties()); assertNotNull(actual.getHeader()); @@ -163,16 +60,15 @@ private void assertMessageCreation(AmqpBodyType expectedType, int expectedMessag // Validate Message Body assertNotNull(actual.getBody()); - assertMessageBody(expectedMessageSize, CONTENTS_BYTES, actual); + assertMessageBody(CONTENTS_BYTES, actual); } - private void assertMessageBody(int expectedMessageSize, byte[] expectedbody, AmqpAnnotatedMessage actual) { - final AmqpBodyType actualType = actual.getBody().getBodyType(); + private void assertMessageBody(byte[] expectedbody, AmqpAnnotatedMessage actual) { + final AmqpMessageBodyType actualType = actual.getBody().getBodyType(); switch (actualType) { case DATA: - List actualData = ((AmqpDataBody) actual.getBody()).getData().stream().collect(Collectors.toList()); - assertEquals(expectedMessageSize, actualData.size()); - assertArrayEquals(expectedbody, actualData.get(0)); + byte[] actualData = actual.getBody().getData().stream().findFirst().get(); + assertArrayEquals(expectedbody, actualData); break; case VALUE: case SEQUENCE: diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/models/AmqpDataBodyTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/models/AmqpDataBodyTest.java deleted file mode 100644 index e5d196c1c1d2e..0000000000000 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/models/AmqpDataBodyTest.java +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.azure.core.amqp.models; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Collectors; - -import static org.junit.jupiter.api.Assertions.assertArrayEquals; -import static org.junit.jupiter.api.Assertions.assertEquals; - -/** - * Test for {@link AmqpDataBody}. - */ -public class AmqpDataBodyTest { - - /** - * Verifies we correctly set values via constructor for {@link AmqpAnnotatedMessage}. - */ - @Test - public void constructorValidValues() { - // Arrange - final List expectedDataList = new ArrayList<>(); - expectedDataList.add("some data 1".getBytes()); - expectedDataList.add("some data 2".getBytes()); - - // Act - final AmqpDataBody actual = new AmqpDataBody(expectedDataList); - - // Assert - assertEquals(AmqpBodyType.DATA, actual.getBodyType()); - - // Validate Message Body - final List dataList = actual.getData().stream().collect(Collectors.toList()); - assertEquals(expectedDataList.size(), dataList.size()); - assertArrayEquals(expectedDataList.toArray(), dataList.toArray()); - } - - /** - * Verifies {@link AmqpDataBody} constructor for null values. - */ - @Test - public void constructorNullValidValues() { - // Arrange - final List listBinaryData = null; - - // Act & Assert - Assertions.assertThrows(NullPointerException.class, () -> new AmqpDataBody(listBinaryData)); - } -} diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/models/AmqpMessageBodyTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/models/AmqpMessageBodyTest.java new file mode 100644 index 0000000000000..2f8a3d303c7b2 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/models/AmqpMessageBodyTest.java @@ -0,0 +1,47 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.models; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Test for {@link AmqpMessageBody}. + */ +public class AmqpMessageBodyTest { + + /** + * Verifies we correctly set values via constructor for {@link AmqpAnnotatedMessage}. + */ + @Test + public void constructorValidValues() { + // Arrange + final byte[] expectedData = "some data 1".getBytes(); + + // Act + final AmqpMessageBody actual = AmqpMessageBody.fromData(expectedData); + + // Assert + assertEquals(AmqpMessageBodyType.DATA, actual.getBodyType()); + + // Validate Message Body + assertArrayEquals(expectedData, actual.getFirstData()); + assertArrayEquals(expectedData, actual.getData().stream().findFirst().get()); + } + + /** + * Verifies {@link AmqpMessageBody} constructor for null values. + */ + @Test + public void constructorNullValidValues() { + // Arrange + final byte[] binaryData = null; + + // Act & Assert + Assertions.assertThrows(NullPointerException.class, () -> AmqpMessageBody.fromData(binaryData)); + } +} diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/models/AmqpMessageIdTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/models/AmqpMessageIdTest.java new file mode 100644 index 0000000000000..bb4c7bbdf6d11 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/models/AmqpMessageIdTest.java @@ -0,0 +1,47 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.models; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +/** + * Test for {@link AmqpMessageId}. + */ +public class AmqpMessageIdTest { + /** + * Verifies {@link AmqpMessageId} constructor for null values. + */ + @Test + public void constructorNullValue() { + // Arrange, Act & Assert + Assertions.assertThrows(NullPointerException.class, () -> new AmqpMessageId(null)); + } + + /** + * Verifies {@link AmqpMessageId} with same value makes instances equal. + */ + @Test + public void equalityTest() { + // Arrange + final AmqpMessageId id1 = new AmqpMessageId("a"); + final AmqpMessageId id2 = new AmqpMessageId("a"); + + // Act & Assert + Assertions.assertEquals(id1, id2); + } + + /** + * Verifies {@link AmqpMessageId} , two different values are not equal. + */ + @Test + public void nonEqualityTest() { + // Arrange + final AmqpMessageId id1 = new AmqpMessageId("a"); + final AmqpMessageId id2 = new AmqpMessageId("a1"); + + // Act & Assert + Assertions.assertNotEquals(id1, id2); + } +} diff --git a/sdk/servicebus/azure-messaging-servicebus/pom.xml b/sdk/servicebus/azure-messaging-servicebus/pom.xml index 0ac4ce29f8c4f..9937871495fec 100644 --- a/sdk/servicebus/azure-messaging-servicebus/pom.xml +++ b/sdk/servicebus/azure-messaging-servicebus/pom.xml @@ -52,7 +52,7 @@ com.azure azure-core-amqp - 1.7.0-beta.2 + 1.7.0-beta.3 com.azure diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessage.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessage.java index c0b25cacd6058..febcc79775a53 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessage.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessage.java @@ -4,9 +4,13 @@ package com.azure.messaging.servicebus; import com.azure.core.amqp.AmqpMessageConstant; +import com.azure.core.amqp.models.AmqpAddress; import com.azure.core.amqp.models.AmqpAnnotatedMessage; -import com.azure.core.amqp.models.AmqpBodyType; -import com.azure.core.amqp.models.AmqpDataBody; +import com.azure.core.amqp.models.AmqpMessageBody; +import com.azure.core.amqp.models.AmqpMessageBodyType; +import com.azure.core.amqp.models.AmqpMessageHeader; +import com.azure.core.amqp.models.AmqpMessageId; +import com.azure.core.amqp.models.AmqpMessageProperties; import com.azure.core.util.Context; import com.azure.core.util.logging.ClientLogger; import com.azure.core.experimental.util.BinaryData; @@ -14,10 +18,8 @@ import java.time.Duration; import java.time.OffsetDateTime; import java.time.ZoneOffset; -import java.util.Collections; import java.util.Map; import java.util.Objects; -import java.util.Optional; import static com.azure.core.amqp.AmqpMessageConstant.DEAD_LETTER_DESCRIPTION_ANNOTATION_NAME; import static com.azure.core.amqp.AmqpMessageConstant.DEAD_LETTER_REASON_ANNOTATION_NAME; @@ -97,8 +99,7 @@ public ServiceBusMessage(String body) { public ServiceBusMessage(BinaryData body) { Objects.requireNonNull(body, "'body' cannot be null."); this.context = Context.NONE; - this.amqpAnnotatedMessage = new AmqpAnnotatedMessage( - new AmqpDataBody(Collections.singletonList(body.toBytes()))); + this.amqpAnnotatedMessage = new AmqpAnnotatedMessage(AmqpMessageBody.fromData(body.toBytes())); } /** @@ -108,23 +109,106 @@ public ServiceBusMessage(BinaryData body) { * @param receivedMessage The received message to create new message from. * * @throws NullPointerException if {@code receivedMessage} is {@code null}. + * @throws UnsupportedOperationException if {@link AmqpMessageBodyType} is {@link AmqpMessageBodyType#SEQUENCE} or + * {@link AmqpMessageBodyType#VALUE}. + * @throws IllegalStateException for invalid {@link AmqpMessageBodyType}. */ public ServiceBusMessage(ServiceBusReceivedMessage receivedMessage) { Objects.requireNonNull(receivedMessage, "'receivedMessage' cannot be null."); - this.amqpAnnotatedMessage = new AmqpAnnotatedMessage(receivedMessage.getAmqpAnnotatedMessage()); - this.context = Context.NONE; + final AmqpMessageBodyType bodyType = receivedMessage.getAmqpAnnotatedMessage().getBody().getBodyType(); + AmqpMessageBody amqpMessageBody; + switch (bodyType) { + case DATA: + amqpMessageBody = AmqpMessageBody.fromData(receivedMessage.getAmqpAnnotatedMessage().getBody() + .getFirstData()); + break; + case SEQUENCE: + case VALUE: + // This should not happen because we will not create `ServiceBusReceivedMessage` with these types. + throw logger.logExceptionAsError(new UnsupportedOperationException( + "This constructor only supports the AMQP Data body type at present. Track this issue, " + + "https://github.com/Azure/azure-sdk-for-java/issues/17614 for other body type support in " + + "future.")); + default: + throw logger.logExceptionAsError(new IllegalStateException("Body type not valid " + + bodyType.toString())); + } + this.amqpAnnotatedMessage = new AmqpAnnotatedMessage(amqpMessageBody); + + // set properties + final AmqpMessageProperties receivedProperties = receivedMessage.getAmqpAnnotatedMessage().getProperties(); + final AmqpMessageProperties newProperties = amqpAnnotatedMessage.getProperties(); + newProperties.setMessageId(receivedProperties.getMessageId()); + newProperties.setUserId(receivedProperties.getUserId()); + newProperties.setTo(receivedProperties.getTo()); + newProperties.setSubject(receivedProperties.getSubject()); + newProperties.setReplyTo(receivedProperties.getReplyTo()); + newProperties.setCorrelationId(receivedProperties.getCorrelationId()); + newProperties.setContentType(receivedProperties.getContentType()); + newProperties.setContentEncoding(receivedProperties.getContentEncoding()); + newProperties.setAbsoluteExpiryTime(receivedProperties.getAbsoluteExpiryTime()); + newProperties.setCreationTime(receivedProperties.getCreationTime()); + newProperties.setGroupId(receivedProperties.getGroupId()); + newProperties.setGroupSequence(receivedProperties.getGroupSequence()); + newProperties.setReplyToGroupId(receivedProperties.getReplyToGroupId()); + + // copy header except for delivery count which should be set to null + final AmqpMessageHeader receivedHeader = receivedMessage.getAmqpAnnotatedMessage().getHeader(); + final AmqpMessageHeader newHeader = this.amqpAnnotatedMessage.getHeader(); + newHeader.setPriority(receivedHeader.getPriority()); + newHeader.setTimeToLive(receivedHeader.getTimeToLive()); + newHeader.setDurable(receivedHeader.isDurable()); + newHeader.setFirstAcquirer(receivedHeader.isFirstAcquirer()); + + // copy message annotations except for broker set ones + final Map receivedAnnotations = receivedMessage.getAmqpAnnotatedMessage() + .getMessageAnnotations(); + final Map newAnnotations = this.amqpAnnotatedMessage.getMessageAnnotations(); + + for (Map.Entry entry: receivedAnnotations.entrySet()) { + if (AmqpMessageConstant.fromString(entry.getKey()) == LOCKED_UNTIL_KEY_ANNOTATION_NAME + || AmqpMessageConstant.fromString(entry.getKey()) == SEQUENCE_NUMBER_ANNOTATION_NAME + || AmqpMessageConstant.fromString(entry.getKey()) == DEAD_LETTER_SOURCE_KEY_ANNOTATION_NAME + || AmqpMessageConstant.fromString(entry.getKey()) == ENQUEUED_SEQUENCE_NUMBER_ANNOTATION_NAME + || AmqpMessageConstant.fromString(entry.getKey()) == ENQUEUED_TIME_UTC_ANNOTATION_NAME) { + + continue; + } + newAnnotations.put(entry.getKey(), entry.getValue()); + } - // clean up data which user is not allowed to set. - amqpAnnotatedMessage.getHeader().setDeliveryCount(null); + // copy delivery annotations + final Map receivedDelivery = receivedMessage.getAmqpAnnotatedMessage().getDeliveryAnnotations(); + final Map newDelivery = this.amqpAnnotatedMessage.getDeliveryAnnotations(); - removeValues(amqpAnnotatedMessage.getMessageAnnotations(), LOCKED_UNTIL_KEY_ANNOTATION_NAME, - SEQUENCE_NUMBER_ANNOTATION_NAME, DEAD_LETTER_SOURCE_KEY_ANNOTATION_NAME, - ENQUEUED_SEQUENCE_NUMBER_ANNOTATION_NAME, ENQUEUED_TIME_UTC_ANNOTATION_NAME); + for (Map.Entry entry: receivedDelivery.entrySet()) { + newDelivery.put(entry.getKey(), entry.getValue()); + } + + // copy Footer + final Map receivedFooter = receivedMessage.getAmqpAnnotatedMessage().getFooter(); + final Map newFooter = this.amqpAnnotatedMessage.getFooter(); + + for (Map.Entry entry: receivedFooter.entrySet()) { + newFooter.put(entry.getKey(), entry.getValue()); + } - removeValues(amqpAnnotatedMessage.getApplicationProperties(), DEAD_LETTER_DESCRIPTION_ANNOTATION_NAME, - DEAD_LETTER_REASON_ANNOTATION_NAME); + // copy application properties except for broker set ones + final Map receivedApplicationProperties = receivedMessage.getAmqpAnnotatedMessage() + .getApplicationProperties(); + final Map newApplicationProperties = this.amqpAnnotatedMessage.getApplicationProperties(); + + for (Map.Entry entry: receivedApplicationProperties.entrySet()) { + if (AmqpMessageConstant.fromString(entry.getKey()) == DEAD_LETTER_DESCRIPTION_ANNOTATION_NAME + || AmqpMessageConstant.fromString(entry.getKey()) == DEAD_LETTER_REASON_ANNOTATION_NAME) { + + continue; + } + newApplicationProperties.put(entry.getKey(), entry.getValue()); + } + this.context = Context.NONE; } /** @@ -162,20 +246,10 @@ public Map getApplicationProperties() { * @return A byte array representing the data. */ public BinaryData getBody() { - final AmqpBodyType type = amqpAnnotatedMessage.getBody().getBodyType(); + final AmqpMessageBodyType type = amqpAnnotatedMessage.getBody().getBodyType(); switch (type) { case DATA: - Optional byteArrayData = ((AmqpDataBody) amqpAnnotatedMessage.getBody()).getData().stream() - .findFirst(); - final byte[] bytes; - - if (byteArrayData.isPresent()) { - bytes = byteArrayData.get(); - } else { - logger.warning("Data not present."); - bytes = new byte[0]; - } - return BinaryData.fromBytes(bytes); + return BinaryData.fromBytes(amqpAnnotatedMessage.getBody().getFirstData()); case SEQUENCE: case VALUE: throw logger.logExceptionAsError(new UnsupportedOperationException("Not supported AmqpBodyType: " @@ -219,7 +293,12 @@ public ServiceBusMessage setContentType(String contentType) { * Routing and Correlation */ public String getCorrelationId() { - return amqpAnnotatedMessage.getProperties().getCorrelationId(); + String correlationId = null; + AmqpMessageId amqpCorrelationId = amqpAnnotatedMessage.getProperties().getCorrelationId(); + if (amqpCorrelationId != null) { + correlationId = amqpCorrelationId.toString(); + } + return correlationId; } /** @@ -231,7 +310,11 @@ public String getCorrelationId() { * @see #getCorrelationId() */ public ServiceBusMessage setCorrelationId(String correlationId) { - amqpAnnotatedMessage.getProperties().setCorrelationId(correlationId); + AmqpMessageId id = null; + if (correlationId != null) { + id = new AmqpMessageId(correlationId); + } + amqpAnnotatedMessage.getProperties().setCorrelationId(id); return this; } @@ -260,7 +343,12 @@ public ServiceBusMessage setSubject(String subject) { * @return Id of the {@link ServiceBusMessage}. */ public String getMessageId() { - return amqpAnnotatedMessage.getProperties().getMessageId(); + String messageId = null; + AmqpMessageId amqpMessageId = amqpAnnotatedMessage.getProperties().getMessageId(); + if (amqpMessageId != null) { + messageId = amqpMessageId.toString(); + } + return messageId; } /** @@ -273,7 +361,11 @@ public String getMessageId() { */ public ServiceBusMessage setMessageId(String messageId) { checkIdLength("messageId", messageId, MAX_MESSAGE_ID_LENGTH); - amqpAnnotatedMessage.getProperties().setMessageId(messageId); + AmqpMessageId id = null; + if (messageId != null) { + id = new AmqpMessageId(messageId); + } + amqpAnnotatedMessage.getProperties().setMessageId(id); return this; } @@ -324,7 +416,12 @@ public ServiceBusMessage setPartitionKey(String partitionKey) { * Routing and Correlation */ public String getReplyTo() { - return amqpAnnotatedMessage.getProperties().getReplyTo(); + String replyTo = null; + AmqpAddress amqpAddress = amqpAnnotatedMessage.getProperties().getReplyTo(); + if (amqpAddress != null) { + replyTo = amqpAddress.toString(); + } + return replyTo; } /** @@ -336,7 +433,11 @@ public String getReplyTo() { * @see #getReplyTo() */ public ServiceBusMessage setReplyTo(String replyTo) { - amqpAnnotatedMessage.getProperties().setReplyTo(replyTo); + AmqpAddress replyToAddress = null; + if (replyTo != null) { + replyToAddress = new AmqpAddress(replyTo); + } + amqpAnnotatedMessage.getProperties().setReplyTo(replyToAddress); return this; } @@ -346,7 +447,12 @@ public ServiceBusMessage setReplyTo(String replyTo) { * @return "To" property value of this message */ public String getTo() { - return amqpAnnotatedMessage.getProperties().getTo(); + String to = null; + AmqpAddress amqpAddress = amqpAnnotatedMessage.getProperties().getTo(); + if (amqpAddress != null) { + to = amqpAddress.toString(); + } + return to; } /** @@ -362,7 +468,11 @@ public String getTo() { * @return The updated {@link ServiceBusMessage}. */ public ServiceBusMessage setTo(String to) { - amqpAnnotatedMessage.getProperties().setTo(to); + AmqpAddress toAddress = null; + if (to != null) { + toAddress = new AmqpAddress(to); + } + amqpAnnotatedMessage.getProperties().setTo(toAddress); return this; } @@ -540,15 +650,6 @@ public ServiceBusMessage addContext(String key, Object value) { return this; } - /* - * Gets value from given map. - */ - private void removeValues(Map dataMap, AmqpMessageConstant... keys) { - for (AmqpMessageConstant key : keys) { - dataMap.remove(key.getValue()); - } - } - /** * Checks the length of ID fields. * diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessageSerializer.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessageSerializer.java index 5edc881140cae..a49dc1dc00d36 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessageSerializer.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessageSerializer.java @@ -9,8 +9,10 @@ import com.azure.core.amqp.exception.AmqpResponseCode; import com.azure.core.amqp.implementation.MessageSerializer; import com.azure.core.amqp.implementation.RequestResponseUtils; +import com.azure.core.amqp.models.AmqpAddress; import com.azure.core.amqp.models.AmqpAnnotatedMessage; import com.azure.core.amqp.models.AmqpMessageHeader; +import com.azure.core.amqp.models.AmqpMessageId; import com.azure.core.amqp.models.AmqpMessageProperties; import com.azure.core.experimental.util.BinaryData; import com.azure.core.util.logging.ClientLogger; @@ -387,21 +389,27 @@ private ServiceBusReceivedMessage deserializeMessage(Message amqpMessage) { // Properties final AmqpMessageProperties brokeredProperties = brokeredAmqpAnnotatedMessage.getProperties(); brokeredProperties.setReplyToGroupId(amqpMessage.getReplyToGroupId()); - brokeredProperties.setReplyTo(amqpMessage.getReplyTo()); + final String replyTo = amqpMessage.getReplyTo(); + if (replyTo != null) { + brokeredProperties.setReplyTo(new AmqpAddress(amqpMessage.getReplyTo())); + } final Object messageId = amqpMessage.getMessageId(); if (messageId != null) { - brokeredProperties.setMessageId(messageId.toString()); + brokeredProperties.setMessageId(new AmqpMessageId(messageId.toString())); } brokeredProperties.setContentType(amqpMessage.getContentType()); final Object correlationId = amqpMessage.getCorrelationId(); if (correlationId != null) { - brokeredProperties.setCorrelationId(correlationId.toString()); + brokeredProperties.setCorrelationId(new AmqpMessageId(correlationId.toString())); } final Properties amqpProperties = amqpMessage.getProperties(); if (amqpProperties != null) { - brokeredProperties.setTo(amqpProperties.getTo()); + final String to = amqpProperties.getTo(); + if (to != null) { + brokeredProperties.setTo(new AmqpAddress(amqpProperties.getTo())); + } if (amqpProperties.getAbsoluteExpiryTime() != null) { brokeredProperties.setAbsoluteExpiryTime(amqpProperties.getAbsoluteExpiryTime().toInstant() diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceivedMessage.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceivedMessage.java index 74c6df584c826..3ce23ca4ef2f8 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceivedMessage.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceivedMessage.java @@ -4,9 +4,11 @@ package com.azure.messaging.servicebus; import com.azure.core.amqp.AmqpMessageConstant; +import com.azure.core.amqp.models.AmqpAddress; import com.azure.core.amqp.models.AmqpAnnotatedMessage; -import com.azure.core.amqp.models.AmqpBodyType; -import com.azure.core.amqp.models.AmqpDataBody; +import com.azure.core.amqp.models.AmqpMessageBody; +import com.azure.core.amqp.models.AmqpMessageBodyType; +import com.azure.core.amqp.models.AmqpMessageId; import com.azure.core.experimental.util.BinaryData; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.servicebus.models.ReceiveMode; @@ -14,7 +16,6 @@ import java.time.Duration; import java.time.OffsetDateTime; import java.time.ZoneOffset; -import java.util.Collections; import java.util.Date; import java.util.Map; import java.util.Objects; @@ -43,7 +44,7 @@ public final class ServiceBusReceivedMessage { ServiceBusReceivedMessage(BinaryData body) { Objects.requireNonNull(body, "'body' cannot be null."); - amqpAnnotatedMessage = new AmqpAnnotatedMessage(new AmqpDataBody(Collections.singletonList(body.toBytes()))); + amqpAnnotatedMessage = new AmqpAnnotatedMessage(AmqpMessageBody.fromData(body.toBytes())); } /** @@ -72,10 +73,10 @@ public AmqpAnnotatedMessage getAmqpAnnotatedMessage() { * @return A byte array representing the data. */ public BinaryData getBody() { - final AmqpBodyType bodyType = amqpAnnotatedMessage.getBody().getBodyType(); + final AmqpMessageBodyType bodyType = amqpAnnotatedMessage.getBody().getBodyType(); switch (bodyType) { case DATA: - return BinaryData.fromBytes(((AmqpDataBody) amqpAnnotatedMessage.getBody()) + return BinaryData.fromBytes(amqpAnnotatedMessage.getBody() .getData().stream().findFirst().get()); case SEQUENCE: case VALUE: @@ -118,7 +119,12 @@ public String getContentType() { * Routing and Correlation */ public String getCorrelationId() { - return amqpAnnotatedMessage.getProperties().getCorrelationId(); + String correlationId = null; + AmqpMessageId amqpCorrelationId = amqpAnnotatedMessage.getProperties().getCorrelationId(); + if (amqpCorrelationId != null) { + correlationId = amqpCorrelationId.toString(); + } + return correlationId; } /** @@ -280,7 +286,12 @@ public OffsetDateTime getLockedUntil() { * @return Id of the {@link ServiceBusReceivedMessage}. */ public String getMessageId() { - return amqpAnnotatedMessage.getProperties().getMessageId(); + String messageId = null; + AmqpMessageId amqpMessageId = amqpAnnotatedMessage.getProperties().getMessageId(); + if (amqpMessageId != null) { + messageId = amqpMessageId.toString(); + } + return messageId; } /** @@ -327,7 +338,12 @@ public Map getApplicationProperties() { * Routing and Correlation */ public String getReplyTo() { - return amqpAnnotatedMessage.getProperties().getReplyTo(); + String replyTo = null; + AmqpAddress amqpAddress = amqpAnnotatedMessage.getProperties().getReplyTo(); + if (amqpAddress != null) { + replyTo = amqpAddress.toString(); + } + return replyTo; } /** @@ -414,7 +430,12 @@ public Duration getTimeToLive() { * @return "To" property value of this message */ public String getTo() { - return amqpAnnotatedMessage.getProperties().getTo(); + String to = null; + AmqpAddress amqpAddress = amqpAnnotatedMessage.getProperties().getTo(); + if (amqpAddress != null) { + to = amqpAddress.toString(); + } + return to; } /** @@ -450,7 +471,11 @@ boolean isSettled() { * @see #getCorrelationId() */ void setCorrelationId(String correlationId) { - amqpAnnotatedMessage.getProperties().setCorrelationId(correlationId); + AmqpMessageId id = null; + if (correlationId != null) { + id = new AmqpMessageId(correlationId); + } + amqpAnnotatedMessage.getProperties().setCorrelationId(id); } /** @@ -554,7 +579,11 @@ void setLockedUntil(OffsetDateTime lockedUntil) { * @param messageId to be set. */ void setMessageId(String messageId) { - amqpAnnotatedMessage.getProperties().setMessageId(messageId); + AmqpMessageId id = null; + if (messageId != null) { + id = new AmqpMessageId(messageId); + } + amqpAnnotatedMessage.getProperties().setMessageId(id); } /** @@ -625,7 +654,12 @@ void setTimeToLive(Duration timeToLive) { * @see #getReplyTo() */ void setReplyTo(String replyTo) { - amqpAnnotatedMessage.getProperties().setReplyTo(replyTo); + AmqpAddress replyToAddress = null; + if (replyTo != null) { + replyToAddress = new AmqpAddress(replyTo); + } + amqpAnnotatedMessage.getProperties().setReplyTo(replyToAddress); + } /** @@ -648,7 +682,11 @@ void setReplyToSessionId(String replyToSessionId) { * @param to To property value of this message */ void setTo(String to) { - amqpAnnotatedMessage.getProperties().setTo(to); + AmqpAddress toAddress = null; + if (to != null) { + toAddress = new AmqpAddress(to); + } + amqpAnnotatedMessage.getProperties().setTo(toAddress); } /** diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusMessageTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusMessageTest.java index f0a809d55be72..013888fd5889e 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusMessageTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusMessageTest.java @@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test; import java.time.Duration; +import java.util.Map; /** * Test for {@link ServiceBusMessage}. @@ -51,6 +52,12 @@ public void copyConstructorTest() { final Duration expectedTimeToLive = Duration.ofSeconds(20); final String expectedPartitionKey = "old-p-key"; + final short expectedPriority = 10; + final String expectedFooterValue = "foo-value1"; + final String expectedDeliveryAnnotationsValue = "da-value1"; + final String expectedApplicationValue = "ap-value1"; + + final ServiceBusReceivedMessage expected = new ServiceBusReceivedMessage(PAYLOAD_BINARY); expected.getAmqpAnnotatedMessage().getMessageAnnotations().put(SEQUENCE_NUMBER_ANNOTATION_NAME.getValue(), "10"); expected.getAmqpAnnotatedMessage().getMessageAnnotations().put(DEAD_LETTER_SOURCE_KEY_ANNOTATION_NAME.getValue(), "abc"); @@ -67,6 +74,17 @@ public void copyConstructorTest() { expected.setTimeToLive(expectedTimeToLive); expected.setPartitionKey(expectedPartitionKey); + expected.getAmqpAnnotatedMessage().getHeader().setPriority(expectedPriority); + + final Map expectedFooter = expected.getAmqpAnnotatedMessage().getFooter(); + expectedFooter.put("foo-1", expectedFooterValue); + + final Map expectedDeliveryAnnotations = expected.getAmqpAnnotatedMessage().getDeliveryAnnotations(); + expectedDeliveryAnnotations.put("da-1", expectedDeliveryAnnotationsValue); + + final Map expectedApplicationProperties = expected.getApplicationProperties(); + expectedApplicationProperties.put("ap-1", expectedApplicationValue); + final ServiceBusMessage actual = new ServiceBusMessage(expected); // Act @@ -79,6 +97,13 @@ public void copyConstructorTest() { expected.setTimeToLive(Duration.ofSeconds(40)); expected.setPartitionKey("new-p-key"); + // Change original values + expected.getAmqpAnnotatedMessage().getHeader().setPriority((short) (expectedPriority + 1)); + expectedFooter.put("foo-1", expectedFooterValue + "-changed"); + expected.getAmqpAnnotatedMessage().getDeliveryAnnotations().put("da-1", expectedDeliveryAnnotationsValue + "-changed"); + expected.getAmqpAnnotatedMessage().getApplicationProperties().put("ap-1", expectedApplicationValue + "-changed"); + + // Assert assertNotSame(expected.getAmqpAnnotatedMessage(), actual.getAmqpAnnotatedMessage()); @@ -101,8 +126,14 @@ public void copyConstructorTest() { assertNull(actual.getAmqpAnnotatedMessage().getApplicationProperties().get(DEAD_LETTER_DESCRIPTION_ANNOTATION_NAME.getValue())); assertNull(actual.getAmqpAnnotatedMessage().getApplicationProperties().get(DEAD_LETTER_REASON_ANNOTATION_NAME.getValue())); assertNull(actual.getAmqpAnnotatedMessage().getHeader().getDeliveryCount()); - } + // Testing , updating original message did not change copied message values.. + assertEquals(expectedPriority, actual.getAmqpAnnotatedMessage().getHeader().getPriority()); + assertEquals(expectedFooterValue, actual.getAmqpAnnotatedMessage().getFooter().get("foo-1").toString()); + assertEquals(expectedDeliveryAnnotationsValue, actual.getAmqpAnnotatedMessage().getDeliveryAnnotations().get("da-1").toString()); + assertEquals(expectedApplicationValue, actual.getAmqpAnnotatedMessage().getApplicationProperties().get("ap-1").toString()); + + } /** * Verifies we correctly set values via copy constructor for {@link ServiceBusMessage}. @@ -146,10 +177,10 @@ public void copyConstructorModifyAfterCopyTest() { // Assert // Validate updated values assertEquals(expectedSubject, originalMessage.getAmqpAnnotatedMessage().getProperties().getSubject()); - assertEquals(expectedTo, originalMessage.getAmqpAnnotatedMessage().getProperties().getTo()); - assertEquals(expectedReplyTo, originalMessage.getAmqpAnnotatedMessage().getProperties().getReplyTo()); + assertEquals(expectedTo, originalMessage.getAmqpAnnotatedMessage().getProperties().getTo().toString()); + assertEquals(expectedReplyTo, originalMessage.getAmqpAnnotatedMessage().getProperties().getReplyTo().toString()); assertEquals(expectedReplyToSessionId, originalMessage.getAmqpAnnotatedMessage().getProperties().getReplyToGroupId()); - assertEquals(expectedCorrelationId, originalMessage.getAmqpAnnotatedMessage().getProperties().getCorrelationId()); + assertEquals(expectedCorrelationId, originalMessage.getAmqpAnnotatedMessage().getProperties().getCorrelationId().toString()); assertEquals(expectedTimeToLive, originalMessage.getAmqpAnnotatedMessage().getHeader().getTimeToLive()); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java index 75e2405380ff1..86bca3652b76c 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java @@ -3,9 +3,11 @@ package com.azure.messaging.servicebus; +import com.azure.core.amqp.models.AmqpAddress; import com.azure.core.amqp.models.AmqpAnnotatedMessage; -import com.azure.core.amqp.models.AmqpDataBody; +import com.azure.core.amqp.models.AmqpMessageBody; import com.azure.core.amqp.models.AmqpMessageHeader; +import com.azure.core.amqp.models.AmqpMessageId; import com.azure.core.amqp.models.AmqpMessageProperties; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.servicebus.implementation.DispositionStatus; @@ -1013,13 +1015,13 @@ void receiveAndValidateProperties(MessagingEntityType entityType) { final String messageId = UUID.randomUUID().toString(); final AmqpAnnotatedMessage expectedAmqpProperties = new AmqpAnnotatedMessage( - new AmqpDataBody(Collections.singletonList(CONTENTS_BYTES))); + AmqpMessageBody.fromData(CONTENTS_BYTES)); expectedAmqpProperties.getProperties().setSubject(subject); expectedAmqpProperties.getProperties().setReplyToGroupId("r-gid"); - expectedAmqpProperties.getProperties().setReplyTo("reply-to"); + expectedAmqpProperties.getProperties().setReplyTo(new AmqpAddress("reply-to")); expectedAmqpProperties.getProperties().setContentType("content-type"); - expectedAmqpProperties.getProperties().setCorrelationId("correlation-id"); - expectedAmqpProperties.getProperties().setTo("to"); + expectedAmqpProperties.getProperties().setCorrelationId(new AmqpMessageId("correlation-id")); + expectedAmqpProperties.getProperties().setTo(new AmqpAddress("to")); expectedAmqpProperties.getProperties().setAbsoluteExpiryTime(OffsetDateTime.now().plusSeconds(60)); expectedAmqpProperties.getProperties().setUserId("user-id-1".getBytes()); expectedAmqpProperties.getProperties().setContentEncoding("string");