diff --git a/sdk/core/azure-core-amqp/CHANGELOG.md b/sdk/core/azure-core-amqp/CHANGELOG.md index 37453e692edf1..5b99af016d9df 100644 --- a/sdk/core/azure-core-amqp/CHANGELOG.md +++ b/sdk/core/azure-core-amqp/CHANGELOG.md @@ -1,6 +1,7 @@ # Release History ## 1.5.0-beta.1 (Unreleased) +- Added Amqp Message envelope which can be accessed using `AmqpAnnotatedMessage`. ## 1.4.0 (2020-08-11) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpMessageConstant.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpMessageConstant.java index 4f250ca9719f1..3cc986e779ef8 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpMessageConstant.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpMessageConstant.java @@ -93,7 +93,36 @@ public enum AmqpMessageConstant { /** * The name of the entity that published a message. */ - PUBLISHER_ANNOTATION_NAME("x-opt-publisher"); + PUBLISHER_ANNOTATION_NAME("x-opt-publisher"), + /** + * The name representing scheduled enqueue time. + */ + SCHEDULED_ENQUEUE_UTC_TIME_NAME("x-opt-scheduled-enqueue-time"), + /** + * The identifier associated with a given via-partition. + */ + VIA_PARTITION_KEY_ANNOTATION_NAME("x-opt-via-partition-key"), + /** + * The identifier for locked until. + */ + LOCKED_UNTIL_KEY_ANNOTATION_NAME("x-opt-locked-until"), + /** + * The identifier for deadletter source. + */ + DEAD_LETTER_SOURCE_KEY_ANNOTATION_NAME("x-opt-deadletter-source"), + /** + * The name representing enqueue sequence number. + * This one appears to always be 0, but is always returned with each message. + */ + ENQUEUED_SEQUENCE_NUMBER_ANNOTATION_NAME("x-opt-enqueue-sequence-number"), + /** + * The identifier for deadletter description. + */ + DEAD_LETTER_DESCRIPTION_ANNOTATION_NAME("DeadLetterErrorDescription"), + /** + * The identifier for deadletter reason. + */ + DEAD_LETTER_REASON_ANNOTATION_NAME("DeadLetterReason"); private static final Map RESERVED_CONSTANTS_MAP = new HashMap<>(); private final String constant; diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpAnnotatedMessage.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpAnnotatedMessage.java new file mode 100644 index 0000000000000..a07e930123180 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpAnnotatedMessage.java @@ -0,0 +1,123 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.models; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * The representation of message as defined by AMQP protocol. + * + * @see + * Amqp Message Format. + */ +public final class AmqpAnnotatedMessage { + private final AmqpMessageBody amqpMessageBody; + private final Map applicationProperties; + private final Map deliveryAnnotations; + private final Map messageAnnotations; + private final Map footer; + private final AmqpMessageHeader header; + private final AmqpMessageProperties properties; + + /** + * Creates instance of {@link AmqpAnnotatedMessage} with given {@link AmqpMessageBody}. + * + * @param body to be set on amqp message. + * + * @throws NullPointerException if {@code body} is null. + */ + public AmqpAnnotatedMessage(AmqpMessageBody body) { + amqpMessageBody = Objects.requireNonNull(body, "'body' cannot be null."); + + applicationProperties = new HashMap<>(); + deliveryAnnotations = new HashMap<>(); + messageAnnotations = new HashMap<>(); + footer = new HashMap<>(); + header = new AmqpMessageHeader(); + properties = new AmqpMessageProperties(); + } + + /** + * Creates instance of {@link AmqpAnnotatedMessage} with given {@link AmqpAnnotatedMessage} instance. + * + * @param message used to create another instance of {@link AmqpAnnotatedMessage}. + * + * @throws NullPointerException if {@code message} or {@link AmqpAnnotatedMessage#getBody() body} is null. + */ + public AmqpAnnotatedMessage(AmqpAnnotatedMessage message) { + Objects.requireNonNull(message, "'message' cannot be null."); + amqpMessageBody = Objects.requireNonNull(message.getBody(), "'message.body' cannot be null."); + applicationProperties = new HashMap<>(message.getApplicationProperties()); + deliveryAnnotations = new HashMap<>(message.getDeliveryAnnotations()); + messageAnnotations = new HashMap<>(message.getMessageAnnotations()); + footer = new HashMap<>(message.getFooter()); + header = new AmqpMessageHeader(message.getHeader()); + properties = new AmqpMessageProperties(message.getProperties()); + } + + /** + * Gets the {@link Map} of application properties. + * + * @return The application properties. + */ + public Map getApplicationProperties() { + return applicationProperties; + } + + /** + * Gets the {@link AmqpMessageBody} of an amqp message. + * + * @return the {@link AmqpMessageBody} object. + */ + public AmqpMessageBody getBody() { + return amqpMessageBody; + } + + /** + * Gets the {@link Map} representation of delivery annotations defined on an amqp message. + * + * @return the {@link Map} representation of delivery annotations. + */ + public Map getDeliveryAnnotations() { + return deliveryAnnotations; + } + + /** + * Gets the {@link Map} representation of footer defined on an amqp message. + * + * @return the {@link Map} representation of footer. + */ + public Map getFooter() { + return footer; + } + + /** + * Gets the {@link AmqpMessageHeader} defined on an amqp message. + * + * @return the {@link AmqpMessageHeader} object. + */ + public AmqpMessageHeader getHeader() { + return header; + } + + /** + * Gets the {@link Map} representation of message annotations defined on an amqp message. + * + * @return the {@link Map} representation of message annotations. + */ + public Map getMessageAnnotations() { + return messageAnnotations; + } + + /** + * Gets the {@link AmqpMessageProperties} defined on an amqp message. + * + * @return the {@link AmqpMessageProperties} object. + */ + public AmqpMessageProperties getProperties() { + return properties; + } +} diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpBodyType.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpBodyType.java new file mode 100644 index 0000000000000..80c66af356087 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpBodyType.java @@ -0,0 +1,23 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.models; + +/** + * All AmqpBodyType available for AMQP Message. + */ +public enum AmqpBodyType { + /** + * Represent Amqp Data type + */ + DATA, + /** + * Represent Amqp Value type + */ + VALUE, + /** + * Represent Amqp Sequence type + */ + SEQUENCE; + +} diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpDataBody.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpDataBody.java new file mode 100644 index 0000000000000..a71b65f45d618 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpDataBody.java @@ -0,0 +1,41 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.models; + +import com.azure.core.util.IterableStream; + +import java.util.Objects; + +/** + * This is amqp message body which represents {@link AmqpBodyType#DATA} type. + */ +public final class AmqpDataBody implements AmqpMessageBody { + private final IterableStream data; + + /** + * Creates instance of {@link AmqpDataBody} with given {@link Iterable} of {@link BinaryData}. + * + * @param data to be set on amqp body. + * + * @throws NullPointerException if {@code data} is null. + */ + public AmqpDataBody(Iterable data) { + Objects.requireNonNull(data, "'data' cannot be null."); + this.data = new IterableStream<>(data); + } + + @Override + public AmqpBodyType getBodyType() { + return AmqpBodyType.DATA; + } + + /** + * Gets {@link BinaryData} set on this {@link AmqpDataBody}. + * + * @return data set on {@link AmqpDataBody}. + */ + public IterableStream getData() { + return data; + } +} diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageBody.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageBody.java new file mode 100644 index 0000000000000..a984fb566720d --- /dev/null +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageBody.java @@ -0,0 +1,16 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.models; + +/** + * Interface representing Amqp Message Body. + */ +public interface AmqpMessageBody { + /** + * Type representing various supported amqp body types. + * + * @return The {@link AmqpBodyType}. + */ + AmqpBodyType getBodyType(); +} diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageHeader.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageHeader.java new file mode 100644 index 0000000000000..e15a482558f3a --- /dev/null +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageHeader.java @@ -0,0 +1,138 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.models; + +import com.azure.core.annotation.Fluent; + +import java.time.Duration; +import java.util.Objects; + +/** + * The representation of message header as defined by AMQP protocol. + * @see + * Amqp Message Format. + */ +@Fluent +public class AmqpMessageHeader { + + private Long deliveryCount; + private Boolean durable; + private Boolean firstAcquirer; + private Short priority; + private Duration timeToLive; + + AmqpMessageHeader() { + // This class does not have any public constructors, and is not able to be instantiated using 'new'. + } + + /** + * The constructor is used to clone the values. + */ + AmqpMessageHeader(AmqpMessageHeader header) { + super(); + Objects.requireNonNull(header, "'header' cannot be null."); + deliveryCount = header.getDeliveryCount(); + durable = header.isDurable(); + firstAcquirer = header.isFirstAcquirer(); + timeToLive = header.getTimeToLive(); + priority = header.getPriority(); + } + + /** + * Gets the delivery count from amqp message header. + * + * @return the delivery count value. + */ + public Long getDeliveryCount() { + return deliveryCount; + } + + /** + * Sets the given {@code deliveryCount} value on {@link AmqpMessageHeader} object. + * @param deliveryCount to be set. + * + * @return updated {@link AmqpMessageHeader} object. + */ + public AmqpMessageHeader setDeliveryCount(Long deliveryCount) { + this.deliveryCount = deliveryCount; + return this; + } + + /** + * Gets durable boolean flag from amqp message header. + * @return the durable flag. + */ + public Boolean isDurable() { + return durable; + } + + /** + * Sets the given {@code durable} value on {@link AmqpMessageHeader} object. + * @param durable to set on {@link AmqpMessageHeader}. + * + * @return updated {@link AmqpMessageHeader} object. + */ + public AmqpMessageHeader setDurable(Boolean durable) { + this.durable = durable; + return this; + } + + /** + * Gets boolean flag for {@code firstAcquirer} from amqp message header. + * @return the {@code firstAcquirer} value. + */ + public Boolean isFirstAcquirer() { + return this.firstAcquirer; + } + + /** + * Sets the given {@code firstAcquirer} value on {@link AmqpMessageHeader} object. + * @param firstAcquirer to set on {@link AmqpMessageHeader}. + * + * @return updated {@link AmqpMessageHeader} object. + */ + public AmqpMessageHeader setFirstAcquirer(Boolean firstAcquirer) { + this.firstAcquirer = firstAcquirer; + return this; + } + + /** + * Gets the priority on {@code amqpMessage} from amqp message header. + * @return the {@code priority} value. + */ + public Short getPriority() { + return priority; + } + + /** + * Sets the given {@code priority} value on {@link AmqpMessageHeader} object. + * @param priority to set on {@link AmqpMessageHeader}. + * + * @return updated {@link AmqpMessageHeader} object. + */ + public AmqpMessageHeader setPriority(Short priority) { + this.priority = priority; + return this; + } + + /** + * Gets {@code timeToLive} from amqp message header. + * + * @return the {@code timeToLive} value. + */ + public Duration getTimeToLive() { + return timeToLive; + } + + /** + * Sets the given {@code timeToLive} value on {@link AmqpMessageHeader} object. + * @param timeToLive to set on {@link AmqpMessageHeader}. + * + * @return updated {@link AmqpMessageHeader} object. + */ + public AmqpMessageHeader setTimeToLive(Duration timeToLive) { + this.timeToLive = timeToLive; + return this; + } +} diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageProperties.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageProperties.java new file mode 100644 index 0000000000000..29e7595e5b417 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/AmqpMessageProperties.java @@ -0,0 +1,331 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.models; + +import com.azure.core.annotation.Fluent; + +import java.time.OffsetDateTime; +import java.util.Arrays; +import java.util.Objects; + +/** + * The representation of message properties as defined by AMQP protocol. + * + * @see + * Amqp Message Format. + */ +@Fluent +public class AmqpMessageProperties { + + private OffsetDateTime absoluteExpiryTime; + private String contentEncoding; + private String contentType; + private String correlationId; + private OffsetDateTime creationTime; + private String groupId; + private Long groupSequence; + private String messageId; + private String replyToGroupId; + private String replyTo; + private String to; + private String subject; + private byte[] userId; + + AmqpMessageProperties() { + // This class does not have any public constructors, and is not able to be instantiated using 'new'. + } + + /** + * The constructor is used to clone the values. + */ + AmqpMessageProperties(AmqpMessageProperties properties) { + super(); + Objects.requireNonNull(properties, "'properties' cannot be null."); + absoluteExpiryTime = properties.getAbsoluteExpiryTime(); + contentEncoding = properties.getContentEncoding(); + contentType = properties.getContentType(); + correlationId = properties.getCorrelationId(); + creationTime = properties.getCreationTime(); + groupId = properties.getGroupId(); + groupSequence = properties.getGroupSequence(); + messageId = properties.getMessageId(); + replyToGroupId = properties.getReplyToGroupId(); + replyTo = properties.getReplyTo(); + to = properties.getTo(); + subject = properties.getSubject(); + userId = properties.getUserId(); + } + + /** + * Gets {@code absoluteExpiryTime} from amqp message properties. + * + * @return the {@code absoluteExpiryTime} value. + */ + public OffsetDateTime getAbsoluteExpiryTime() { + return absoluteExpiryTime; + } + + /** + * Sets the given {@code absoluteExpiryTime} value on {@link AmqpMessageProperties} object. + * + * @param absoluteExpiryTime to be set. + * @return updated {@link AmqpMessageProperties} object. + */ + public AmqpMessageProperties setAbsoluteExpiryTime(OffsetDateTime absoluteExpiryTime) { + this.absoluteExpiryTime = absoluteExpiryTime; + return this; + } + + /** + * Gets AbsoluteExpiryTime from amqp message properties. + * + * @return the {@code absoluteExpiryTime} value. + */ + public String getContentEncoding() { + return contentEncoding; + } + + /** + * Sets the given {@code contentEncoding} value on {@link AmqpMessageProperties} object. + * + * @param contentEncoding to be set. + * + * @return updated {@link AmqpMessageProperties} object. + */ + public AmqpMessageProperties setContentEncoding(String contentEncoding) { + this.contentEncoding = contentEncoding; + return this; + } + + /** + * Gets {@code contentType} from amqp message properties. + * + * @return the {@code contentType} value. + */ + public String getContentType() { + return contentType; + } + + /** + * Sets the given {@code contentType} value on {@link AmqpMessageProperties} object. + * + * @param contentType to be set. + * + * @return updated {@link AmqpMessageProperties} object. + */ + public AmqpMessageProperties setContentType(String contentType) { + this.contentType = contentType; + return this; + } + + /** + * Gets {@code correlationId} from amqp message properties. + * + * @return the {@code correlationId} value. + */ + public String getCorrelationId() { + return correlationId; + } + + /** + * Sets the given {@code correlationId} value on {@link AmqpMessageProperties} object. + * + * @param correlationId to be set. + * + * @return updated {@link AmqpMessageProperties} object. + */ + public AmqpMessageProperties setCorrelationId(String correlationId) { + this.correlationId = correlationId; + return this; + } + + /** + * Gets {@code creationTime} from amqp message properties. + * + * @return the {@code creationTime} value. + */ + public OffsetDateTime getCreationTime() { + return creationTime; + } + + /** + * Sets the given {@code creationTime} value on {@link AmqpMessageProperties} object. + * + * @param creationTime to be set. + * + * @return updated {@link AmqpMessageProperties} object. + */ + public AmqpMessageProperties setCreationTime(OffsetDateTime creationTime) { + this.creationTime = creationTime; + return this; + } + + /** + * Gets {@code groupId} from amqp message properties. + * + * @return the {@code groupId} value. + */ + public String getGroupId() { + return groupId; + } + + /** + * Sets the given {@code groupId} value on {@link AmqpMessageProperties} object. + * + * @param groupId to be set. + * + * @return updated {@link AmqpMessageProperties} object. + */ + public AmqpMessageProperties setGroupId(String groupId) { + this.groupId = groupId; + return this; + } + + /** + * Gets {@code groupSequence} from amqp message properties. + * + * @return the {@code groupSequence} value. + */ + public Long getGroupSequence() { + return groupSequence; + } + + /** + * Sets the given {@code groupSequence} value on {@link AmqpMessageProperties} object. + * + * @param groupSequence to be set. + * + * @return updated {@link AmqpMessageProperties} object. + */ + public AmqpMessageProperties setGroupSequence(Long groupSequence) { + this.groupSequence = groupSequence; + return this; + } + + /** + * Gets {@code messageId} from amqp message properties. + * + * @return the {@code messageId} value. + */ + public String getMessageId() { + return messageId; + } + + /** + * Sets the given {@code messageId} value on {@link AmqpMessageProperties} object. + * + * @param messageId to be set . + * + * @return updated {@link AmqpMessageProperties} object. + */ + public AmqpMessageProperties setMessageId(String messageId) { + this.messageId = messageId; + return this; + } + + /** + * Gets {@code replyTo} from amqp message properties. + * + * @return The {@code replyTo} value. + */ + public String getReplyTo() { + return replyTo; + } + + /** + * Sets the given {@code replyTo} value on {@link AmqpMessageProperties} object. + * + * @param replyTo to be set. + * + * @return updated {@link AmqpMessageProperties} object. + */ + public AmqpMessageProperties setReplyTo(String replyTo) { + this.replyTo = replyTo; + return this; + } + + /** + * Gets {@code replyToGroupId} from amqp message properties. + * + * @return The {@code replyToGroupId} value. + */ + public String getReplyToGroupId() { + return replyToGroupId; + } + + /** + * Sets the given {@code replyToGroupId} value on {@link AmqpMessageProperties} object. + * + * @param replyToGroupId to be set. + * + * @return updated {@link AmqpMessageProperties} object. + */ + public AmqpMessageProperties setReplyToGroupId(String replyToGroupId) { + this.replyToGroupId = replyToGroupId; + return this; + } + + /** + * Gets {@code subject} from amqp message properties. + * + * @return the {@code subject} value. + */ + public String getSubject() { + return subject; + } + + /** + * Sets the given {@code subject} value on {@link AmqpMessageProperties} object. + * + * @param subject to be set. + * + * @return updated {@link AmqpMessageProperties} object. + */ + public AmqpMessageProperties setSubject(String subject) { + this.subject = subject; + return this; + } + + /** + * Gets {@code to} from amqp message properties. + * + * @return the {@code to} value. + */ + public String getTo() { + return to; + } + + /** + * Sets the given {@code to} value on {@link AmqpMessageProperties} object. + * + * @param to to be set. + * + * @return updated {@link AmqpMessageProperties} object. + */ + public AmqpMessageProperties setTo(String to) { + this.to = to; + return this; + } + + /** + * Gets {@code userId} from amqp message properties. + * + * @return the {@code userId} value. + */ + public byte[] getUserId() { + return userId != null ? Arrays.copyOf(userId, userId.length) : new byte[0]; + } + + /** + * Sets the given {@code userId} value on {@link AmqpMessageProperties} object. + * + * @param userId to be set . + * @return updated {@link AmqpMessageProperties} object. + */ + public AmqpMessageProperties setUserId(byte[] userId) { + this.userId = userId != null ? Arrays.copyOf(userId, userId.length) : new byte[0]; + return this; + } + +} diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/BinaryData.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/BinaryData.java new file mode 100644 index 0000000000000..218e1eed10a36 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/BinaryData.java @@ -0,0 +1,34 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.models; + +import java.util.Arrays; +import java.util.Objects; + +/** + * Binary representation of amqp message body. + */ +public final class BinaryData { + private final byte[] data; + + /** + * Create {@link BinaryData} instance with given byte array data. + * + * @param data to use. + */ + public BinaryData(byte[] data) { + Objects.requireNonNull(data, "'data' cannot be null."); + this.data = Arrays.copyOf(data, data.length); + } + + /** + * Gets the data. + * + * @return byte array representing {@link BinaryData}. + */ + + public byte[] getData() { + return Arrays.copyOf(data, data.length); + } +} diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/package-info.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/package-info.java new file mode 100644 index 0000000000000..49c5ed1490ff0 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/models/package-info.java @@ -0,0 +1,7 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +/** + * Package containing classes related to AMQP models classes. + */ +package com.azure.core.amqp.models; diff --git a/sdk/core/azure-core-amqp/src/main/java/module-info.java b/sdk/core/azure-core-amqp/src/main/java/module-info.java index 77497a6a08af5..3edfcafb02114 100644 --- a/sdk/core/azure-core-amqp/src/main/java/module-info.java +++ b/sdk/core/azure-core-amqp/src/main/java/module-info.java @@ -9,6 +9,7 @@ requires transitive org.apache.qpid.proton.j; exports com.azure.core.amqp; + exports com.azure.core.amqp.models; exports com.azure.core.amqp.exception; // FIXME this should not be a long-term solution diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/models/AmqpAnnotatedMessageTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/models/AmqpAnnotatedMessageTest.java new file mode 100644 index 0000000000000..aea81dc2a7fa2 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/models/AmqpAnnotatedMessageTest.java @@ -0,0 +1,185 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.models; + +import com.azure.core.util.logging.ClientLogger; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNotSame; + +/** + * Test class for {@link AmqpAnnotatedMessage} + */ +public class AmqpAnnotatedMessageTest { + + private static final byte[] CONTENTS_BYTES = "Some-contents".getBytes(StandardCharsets.UTF_8); + private static final BinaryData DATA_BYTES = new BinaryData(CONTENTS_BYTES); + private final ClientLogger logger = new ClientLogger(AmqpAnnotatedMessageTest.class); + + /** + * Verifies we correctly set values via copy constructor for {@link AmqpAnnotatedMessage} and create new + * instances of the properties. + */ + @Test + public void copyConstructorTest() { + // Arrange + final int expectedBinaryDataSize = 1; + List expectedBinaryData = new ArrayList<>(); + expectedBinaryData.add(DATA_BYTES); + + final AmqpDataBody amqpDataBody = new AmqpDataBody(expectedBinaryData); + final AmqpAnnotatedMessage expected = new AmqpAnnotatedMessage(amqpDataBody); + final Map expectedMessageAnnotations = expected.getMessageAnnotations(); + expectedMessageAnnotations.put("ma-1", "ma-value1"); + + final Map expectedDeliveryAnnotations = expected.getDeliveryAnnotations(); + expectedDeliveryAnnotations.put("da-1", "da-value1"); + + final Map expectedApplicationProperties = expected.getApplicationProperties(); + expectedApplicationProperties.put("ap-1", "ap-value1"); + + final Map expectedFooter = expected.getFooter(); + expectedFooter.put("foo-1", "foo-value1"); + + final AmqpMessageProperties expectedMessageProperties = expected.getProperties(); + expectedMessageProperties.setGroupSequence(2L); + expectedMessageProperties.setContentEncoding("content-enc"); + expectedMessageProperties.setReplyToGroupId("a"); + expectedMessageProperties.setReplyTo("b"); + expectedMessageProperties.setCorrelationId("c"); + expectedMessageProperties.setSubject("d"); + expectedMessageProperties.setMessageId("id"); + + final AmqpMessageHeader expectedMessageHeader = expected.getHeader(); + expectedMessageHeader.setDeliveryCount(5L); + expectedMessageHeader.setTimeToLive(Duration.ofSeconds(20)); + expectedMessageHeader.setPriority(Short.valueOf("4")); + + final AmqpAnnotatedMessage actual = new AmqpAnnotatedMessage(expected); + + // Act + // Now update the values after we have created AmqpAnnotatedMessage using copy constructor. + expectedDeliveryAnnotations.remove("da-1"); + expectedApplicationProperties.put("ap-2", "ap-value2"); + expectedFooter.remove("foo-1"); + expected.getHeader().setDeliveryCount(Long.valueOf(100)); + expectedBinaryData = new ArrayList<>(); + + // Assert + // Ensure the memory references are not same. + assertNotSame(expected.getProperties(), actual.getProperties()); + assertNotSame(expected.getApplicationProperties(), actual.getApplicationProperties()); + assertNotSame(expected.getDeliveryAnnotations(), actual.getDeliveryAnnotations()); + assertNotSame(expected.getFooter(), actual.getFooter()); + assertNotSame(expected.getHeader(), actual.getHeader()); + assertNotSame(expected.getMessageAnnotations(), actual.getMessageAnnotations()); + assertNotSame(expected.getProperties().getUserId(), actual.getProperties().getUserId()); + assertNotSame(expected.getHeader().getDeliveryCount(), actual.getHeader().getDeliveryCount()); + + assertEquals(1, actual.getDeliveryAnnotations().size()); + assertEquals(1, actual.getApplicationProperties().size()); + assertEquals(1, actual.getFooter().size()); + + assertEquals(expectedMessageProperties.getGroupSequence(), actual.getProperties().getGroupSequence()); + assertEquals(expectedMessageProperties.getContentEncoding(), actual.getProperties().getContentEncoding()); + assertEquals(expectedMessageProperties.getReplyToGroupId(), actual.getProperties().getReplyToGroupId()); + assertEquals(expectedMessageProperties.getReplyTo(), actual.getProperties().getReplyTo()); + assertEquals(expectedMessageProperties.getCorrelationId(), actual.getProperties().getCorrelationId()); + assertEquals(expectedMessageProperties.getSubject(), actual.getProperties().getSubject()); + assertEquals(expectedMessageProperties.getMessageId(), actual.getProperties().getMessageId()); + + assertEquals(expectedMessageHeader.getTimeToLive(), actual.getHeader().getTimeToLive()); + assertEquals(expectedMessageHeader.getPriority(), actual.getHeader().getPriority()); + + assertMessageBody(expectedBinaryDataSize, CONTENTS_BYTES, actual); + } + + /** + * Verifies we correctly set values via constructor for {@link AmqpAnnotatedMessage}. + */ + @Test + public void constructorValidValues() { + // Arrange + final List expectedBinaryData = Collections.singletonList(DATA_BYTES); + final AmqpDataBody amqpDataBody = new AmqpDataBody(expectedBinaryData); + + // Act + final AmqpAnnotatedMessage actual = new AmqpAnnotatedMessage(amqpDataBody); + + // Assert + assertMessageCreation(AmqpBodyType.DATA, expectedBinaryData.size(), actual); + } + + /** + * Verifies we correctly set values via constructor for {@link AmqpAnnotatedMessage}. + */ + @Test + public void constructorAmqpValidValues() { + // Arrange + final List expectedBinaryData = Collections.singletonList(DATA_BYTES); + final AmqpDataBody amqpDataBody = new AmqpDataBody(expectedBinaryData); + final AmqpAnnotatedMessage expected = new AmqpAnnotatedMessage(amqpDataBody); + + // Act + final AmqpAnnotatedMessage actual = new AmqpAnnotatedMessage(expected); + + // Assert + assertMessageCreation(AmqpBodyType.DATA, expectedBinaryData.size(), actual); + } + + /** + * Verifies {@link AmqpAnnotatedMessage} constructor for null values. + */ + @Test + public void constructorNullValidValues() { + // Arrange + final AmqpDataBody body = null; + + // Act & Assert + Assertions.assertThrows(NullPointerException.class, () -> new AmqpAnnotatedMessage(body)); + } + + private void assertMessageCreation(AmqpBodyType expectedType, int expectedMessageSize, AmqpAnnotatedMessage actual) { + assertEquals(expectedType, actual.getBody().getBodyType()); + assertNotNull(actual.getProperties()); + assertNotNull(actual.getHeader()); + assertNotNull(actual.getFooter()); + assertNotNull(actual.getApplicationProperties()); + assertNotNull(actual.getDeliveryAnnotations()); + assertNotNull(actual.getMessageAnnotations()); + assertNotNull(actual.getApplicationProperties()); + + // Validate Message Body + assertNotNull(actual.getBody()); + assertMessageBody(expectedMessageSize, CONTENTS_BYTES, actual); + } + + private void assertMessageBody(int expectedMessageSize, byte[] expectedbody, AmqpAnnotatedMessage actual) { + final AmqpBodyType actualType = actual.getBody().getBodyType(); + switch (actualType) { + case DATA: + List actualData = ((AmqpDataBody) actual.getBody()).getData().stream().collect(Collectors.toList()); + assertEquals(expectedMessageSize, actualData.size()); + assertArrayEquals(expectedbody, actualData.get(0).getData()); + break; + case VALUE: + case SEQUENCE: + throw logger.logExceptionAsError(new UnsupportedOperationException("type not supported yet :" + actualType)); + default: + throw logger.logExceptionAsError(new IllegalStateException("Invalid type :" + actualType)); + } + } +} diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/models/AmqpDataBodyTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/models/AmqpDataBodyTest.java new file mode 100644 index 0000000000000..1b9b6ecd752f4 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/models/AmqpDataBodyTest.java @@ -0,0 +1,54 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.models; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Test for {@link AmqpDataBody}. + */ +public class AmqpDataBodyTest { + + /** + * Verifies we correctly set values via constructor for {@link AmqpAnnotatedMessage}. + */ + @Test + public void constructorValidValues() { + // Arrange + final List expectedDataList = new ArrayList<>(); + expectedDataList.add(new BinaryData("some data 1".getBytes())); + expectedDataList.add(new BinaryData("some data 2".getBytes())); + + // Act + final AmqpDataBody actual = new AmqpDataBody(expectedDataList); + + // Assert + assertEquals(AmqpBodyType.DATA, actual.getBodyType()); + + // Validate Message Body + final List dataList = actual.getData().stream().collect(Collectors.toList()); + assertEquals(expectedDataList.size(), dataList.size()); + assertArrayEquals(expectedDataList.toArray(), dataList.toArray()); + } + + /** + * Verifies {@link BinaryData} constructor for null values. + */ + @Test + public void constructorNullValidValues() { + // Arrange + final List listBinaryData = null; + + // Act & Assert + Assertions.assertThrows(NullPointerException.class, () -> new AmqpDataBody(listBinaryData)); + } +} diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/models/BinaryDataTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/models/BinaryDataTest.java new file mode 100644 index 0000000000000..ba52eb9c996c6 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/models/BinaryDataTest.java @@ -0,0 +1,40 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.models; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; + +/** + * Test for {@link BinaryData}. + */ +public class BinaryDataTest { + + private static final byte[] CONTENTS_BYTES = "Some-contents".getBytes(StandardCharsets.UTF_8); + + /** + * Verifies we correctly set values via constructor for {@link BinaryData}. + */ + @Test + public void constructorValidValues() { + // Arrange & Act + final BinaryData actual = new BinaryData(CONTENTS_BYTES); + + // Assert + assertArrayEquals(CONTENTS_BYTES, actual.getData()); + } + + /** + * Verifies {@link BinaryData} constructor for null valeus. + */ + @Test + public void constructorNullValidValues() { + // Arrange, Act & Assert + Assertions.assertThrows(NullPointerException.class, () -> new BinaryData(null)); + } +} diff --git a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md index c6e7114bad31a..609ebe8b9d750 100644 --- a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md @@ -1,11 +1,13 @@ # Release History ## 7.0.0-beta.6 (Unreleased) +- Exposing Amqp Message envelope in form of `AmqpAnnotatedMessage` as a property of `ServiceBusReceivedMessage` and `ServiceBusMessage`. - Removed `ServiceBusReceiverClientBuilder.maxAutoLockRenewalDuration`. Use method `getAutoRenewMessageLock` of classes `ServiceBusReceiverClient` and `ServiceBusReceiverAsyncClient` to lock messages and sessions. - Updated datetime related APIs to use `java.time.OffsetDateTime` instead of `java.time.Instant`. - Removed `scheduledMessageCount` from `SubscriptionRuntimeInfo` and added it to `TopicRuntimeInfo`. - Changed `QueueRuntimeInfo`, `TopicRuntimeInfo` and `SubscriptionRuntimeInfo` to `QueueRuntimeProperties`, `TopicRuntimeProperties` and `SubscriptionRuntimeProperties` respectively. + ## 7.0.0-beta.5 (2020-08-11) - Remove public constructor for QueueDescription, TopicDescription, SubscriptionDescription. diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessage.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessage.java index aa54261b7dbd4..1b64b77a28978 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessage.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessage.java @@ -3,16 +3,35 @@ package com.azure.messaging.servicebus; +import com.azure.core.amqp.AmqpMessageConstant; +import com.azure.core.amqp.models.AmqpAnnotatedMessage; +import com.azure.core.amqp.models.AmqpBodyType; +import com.azure.core.amqp.models.AmqpDataBody; +import com.azure.core.amqp.models.BinaryData; import com.azure.core.util.Context; +import com.azure.core.util.logging.ClientLogger; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.util.Arrays; -import java.util.HashMap; +import java.util.Collections; +import java.util.Date; import java.util.Map; import java.util.Objects; +import static com.azure.core.amqp.AmqpMessageConstant.DEAD_LETTER_DESCRIPTION_ANNOTATION_NAME; +import static com.azure.core.amqp.AmqpMessageConstant.DEAD_LETTER_REASON_ANNOTATION_NAME; +import static com.azure.core.amqp.AmqpMessageConstant.DEAD_LETTER_SOURCE_KEY_ANNOTATION_NAME; +import static com.azure.core.amqp.AmqpMessageConstant.ENQUEUED_SEQUENCE_NUMBER_ANNOTATION_NAME; +import static com.azure.core.amqp.AmqpMessageConstant.ENQUEUED_TIME_UTC_ANNOTATION_NAME; +import static com.azure.core.amqp.AmqpMessageConstant.LOCKED_UNTIL_KEY_ANNOTATION_NAME; +import static com.azure.core.amqp.AmqpMessageConstant.PARTITION_KEY_ANNOTATION_NAME; +import static com.azure.core.amqp.AmqpMessageConstant.SCHEDULED_ENQUEUE_UTC_TIME_NAME; +import static com.azure.core.amqp.AmqpMessageConstant.SEQUENCE_NUMBER_ANNOTATION_NAME; +import static com.azure.core.amqp.AmqpMessageConstant.VIA_PARTITION_KEY_ANNOTATION_NAME; + /** * The data structure encapsulating the message being sent-to Service Bus. * @@ -22,7 +41,7 @@ * AMQP 1.0 specification * *
    - *
  1. {@link #getProperties()} - AMQPMessage.ApplicationProperties section
  2. + *
  3. {@link #getApplicationProperties()} - AMQPMessage.ApplicationProperties section
  4. *
  5. {@link #getBody()} - if AMQPMessage.Body has Data section
  6. *
* @@ -34,21 +53,11 @@ * @see ServiceBusMessageBatch */ public class ServiceBusMessage { - private final Map properties = new HashMap<>(); - private final byte[] body; + private final AmqpAnnotatedMessage amqpAnnotatedMessage; + private final ClientLogger logger = new ClientLogger(ServiceBusMessage.class); + + private final byte[] binaryData; private Context context; - private String contentType; - private String correlationId; - private String label; - private String messageId; - private String partitionKey; - private String replyTo; - private String replyToSessionId; - private OffsetDateTime scheduledEnqueueTime; - private String sessionId; - private Duration timeToLive; - private String to; - private String viaPartitionKey; /** * Creates a {@link ServiceBusMessage} with a {@link java.nio.charset.StandardCharsets#UTF_8 UTF_8} encoded body. @@ -69,8 +78,10 @@ public ServiceBusMessage(String body) { * @throws NullPointerException if {@code body} is {@code null}. */ public ServiceBusMessage(byte[] body) { - this.body = Objects.requireNonNull(body, "'body' cannot be null."); + this.binaryData = Objects.requireNonNull(body, "'body' cannot be null."); this.context = Context.NONE; + this.amqpAnnotatedMessage = new AmqpAnnotatedMessage(new AmqpDataBody(Collections.singletonList( + new BinaryData(binaryData)))); } /** @@ -82,32 +93,43 @@ public ServiceBusMessage(byte[] body) { * @throws NullPointerException if {@code receivedMessage} is {@code null}. */ public ServiceBusMessage(ServiceBusReceivedMessage receivedMessage) { - this.body = receivedMessage.getBody(); + Objects.requireNonNull(receivedMessage, "'receivedMessage' cannot be null."); + + this.amqpAnnotatedMessage = new AmqpAnnotatedMessage(receivedMessage.getAmqpAnnotatedMessage()); this.context = Context.NONE; - setMessageId(receivedMessage.getMessageId()); - setScheduledEnqueueTime(receivedMessage.getScheduledEnqueueTime()); - setContentType(receivedMessage.getContentType()); - setCorrelationId(receivedMessage.getCorrelationId()); - setLabel(receivedMessage.getLabel()); - setPartitionKey(receivedMessage.getPartitionKey()); - setReplyTo(receivedMessage.getReplyTo()); - setReplyToSessionId(receivedMessage.getReplyToSessionId()); - setTimeToLive(receivedMessage.getTimeToLive()); - setTo(receivedMessage.getTo()); - setSessionId(receivedMessage.getSessionId()); - setViaPartitionKey(receivedMessage.getViaPartitionKey()); + this.binaryData = receivedMessage.getBody(); + + // clean up data which user is not allowed to set. + amqpAnnotatedMessage.getHeader().setDeliveryCount(null); + + removeValues(amqpAnnotatedMessage.getMessageAnnotations(), LOCKED_UNTIL_KEY_ANNOTATION_NAME, + SEQUENCE_NUMBER_ANNOTATION_NAME, DEAD_LETTER_SOURCE_KEY_ANNOTATION_NAME, + ENQUEUED_SEQUENCE_NUMBER_ANNOTATION_NAME, ENQUEUED_TIME_UTC_ANNOTATION_NAME); + + removeValues(amqpAnnotatedMessage.getApplicationProperties(), DEAD_LETTER_DESCRIPTION_ANNOTATION_NAME, + DEAD_LETTER_REASON_ANNOTATION_NAME); + + } + + /** + * Gets the {@link AmqpAnnotatedMessage}. + * + * @return the amqp message. + */ + public AmqpAnnotatedMessage getAmqpAnnotatedMessage() { + return amqpAnnotatedMessage; } /** * Gets the set of free-form {@link ServiceBusMessage} properties which may be used for passing metadata associated - * with the {@link ServiceBusMessage} during Service Bus operations. A common use-case for {@code properties()} is - * to associate serialization hints for the {@link #getBody()} as an aid to consumers who wish to deserialize the - * binary data. + * with the {@link ServiceBusMessage} during Service Bus operations. A common use-case for + * {@code getApplicationProperties()} is to associate serialization hints for the {@link #getBody()} as an aid to + * consumers who wish to deserialize the binary data. * * @return Application properties associated with this {@link ServiceBusMessage}. */ - public Map getProperties() { - return properties; + public Map getApplicationProperties() { + return amqpAnnotatedMessage.getApplicationProperties(); } /** @@ -115,14 +137,25 @@ public Map getProperties() { * *

* If the means for deserializing the raw data is not apparent to consumers, a common technique is to make use of - * {@link #getProperties()} when creating the event, to associate serialization hints as an aid to consumers who - * wish to deserialize the binary data. + * {@link #getApplicationProperties()} when creating the event, to associate serialization hints as an aid to + * consumers who wish to deserialize the binary data. *

* * @return A byte array representing the data. */ public byte[] getBody() { - return Arrays.copyOf(body, body.length); + final AmqpBodyType type = amqpAnnotatedMessage.getBody().getBodyType(); + switch (type) { + case DATA: + return Arrays.copyOf(binaryData, binaryData.length); + case SEQUENCE: + case VALUE: + throw logger.logExceptionAsError(new UnsupportedOperationException("Not supported AmqpBodyType: " + + type.toString())); + default: + throw logger.logExceptionAsError(new IllegalArgumentException("Unknown AmqpBodyType: " + + type.toString())); + } } /** @@ -131,7 +164,7 @@ public byte[] getBody() { * @return the contentType of the {@link ServiceBusMessage}. */ public String getContentType() { - return contentType; + return amqpAnnotatedMessage.getProperties().getContentType(); } /** @@ -142,7 +175,7 @@ public String getContentType() { * @return The updated {@link ServiceBusMessage}. */ public ServiceBusMessage setContentType(String contentType) { - this.contentType = contentType; + amqpAnnotatedMessage.getProperties().setContentType(contentType); return this; } @@ -158,7 +191,7 @@ public ServiceBusMessage setContentType(String contentType) { * Routing and Correlation */ public String getCorrelationId() { - return correlationId; + return amqpAnnotatedMessage.getProperties().getCorrelationId(); } /** @@ -170,28 +203,28 @@ public String getCorrelationId() { * @see #getCorrelationId() */ public ServiceBusMessage setCorrelationId(String correlationId) { - this.correlationId = correlationId; + amqpAnnotatedMessage.getProperties().setCorrelationId(correlationId); return this; } /** - * Gets the label for the message. + * Gets the subject for the message. * - * @return The label for the message. + * @return The subject for the message. */ - public String getLabel() { - return label; + public String getSubject() { + return amqpAnnotatedMessage.getProperties().getSubject(); } /** - * Sets the label for the message. + * Sets the subject for the message. * - * @param label The label to set. + * @param subject The subject to set. * * @return The updated {@link ServiceBusMessage} object. */ - public ServiceBusMessage setLabel(String label) { - this.label = label; + public ServiceBusMessage setSubject(String subject) { + amqpAnnotatedMessage.getProperties().setSubject(subject); return this; } @@ -199,7 +232,7 @@ public ServiceBusMessage setLabel(String label) { * @return Id of the {@link ServiceBusMessage}. */ public String getMessageId() { - return messageId; + return amqpAnnotatedMessage.getProperties().getMessageId(); } /** @@ -210,7 +243,7 @@ public String getMessageId() { * @return The updated {@link ServiceBusMessage}. */ public ServiceBusMessage setMessageId(String messageId) { - this.messageId = messageId; + amqpAnnotatedMessage.getProperties().setMessageId(messageId); return this; } @@ -228,7 +261,7 @@ public ServiceBusMessage setMessageId(String messageId) { * entities */ public String getPartitionKey() { - return partitionKey; + return (String) amqpAnnotatedMessage.getMessageAnnotations().get(PARTITION_KEY_ANNOTATION_NAME.getValue()); } /** @@ -240,7 +273,7 @@ public String getPartitionKey() { * @see #getPartitionKey() */ public ServiceBusMessage setPartitionKey(String partitionKey) { - this.partitionKey = partitionKey; + amqpAnnotatedMessage.getMessageAnnotations().put(PARTITION_KEY_ANNOTATION_NAME.getValue(), partitionKey); return this; } @@ -256,7 +289,7 @@ public ServiceBusMessage setPartitionKey(String partitionKey) { * Routing and Correlation */ public String getReplyTo() { - return replyTo; + return amqpAnnotatedMessage.getProperties().getReplyTo(); } /** @@ -268,7 +301,7 @@ public String getReplyTo() { * @see #getReplyTo() */ public ServiceBusMessage setReplyTo(String replyTo) { - this.replyTo = replyTo; + amqpAnnotatedMessage.getProperties().setReplyTo(replyTo); return this; } @@ -278,7 +311,7 @@ public ServiceBusMessage setReplyTo(String replyTo) { * @return "To" property value of this message */ public String getTo() { - return to; + return amqpAnnotatedMessage.getProperties().getTo(); } /** @@ -294,7 +327,7 @@ public String getTo() { * @return The updated {@link ServiceBusMessage}. */ public ServiceBusMessage setTo(String to) { - this.to = to; + amqpAnnotatedMessage.getProperties().setTo(to); return this; } @@ -311,7 +344,7 @@ public ServiceBusMessage setTo(String to) { * @see Message Expiration */ public Duration getTimeToLive() { - return timeToLive; + return amqpAnnotatedMessage.getHeader().getTimeToLive(); } /** @@ -323,7 +356,7 @@ public Duration getTimeToLive() { * @see #getTimeToLive() */ public ServiceBusMessage setTimeToLive(Duration timeToLive) { - this.timeToLive = timeToLive; + amqpAnnotatedMessage.getHeader().setTimeToLive(timeToLive); return this; } @@ -341,11 +374,16 @@ public ServiceBusMessage setTimeToLive(Duration timeToLive) { * Timestamps */ public OffsetDateTime getScheduledEnqueueTime() { - return scheduledEnqueueTime; + Object value = amqpAnnotatedMessage.getMessageAnnotations().get(SCHEDULED_ENQUEUE_UTC_TIME_NAME.getValue()); + return value != null + ? ((Date) value).toInstant().atOffset(ZoneOffset.UTC) + : null; } /** - * Sets the scheduled enqueue time of this message. + * Sets the scheduled enqueue time of this message. A {@code null} will not be set. If this value needs to be unset + * it could be done by value removing from {@link AmqpAnnotatedMessage#getMessageAnnotations()} using key + * {@link AmqpMessageConstant#SCHEDULED_ENQUEUE_UTC_TIME_NAME}. * * @param scheduledEnqueueTime the datetime at which this message should be enqueued in Azure Service Bus. * @@ -353,7 +391,10 @@ public OffsetDateTime getScheduledEnqueueTime() { * @see #getScheduledEnqueueTime() */ public ServiceBusMessage setScheduledEnqueueTime(OffsetDateTime scheduledEnqueueTime) { - this.scheduledEnqueueTime = scheduledEnqueueTime; + if (scheduledEnqueueTime != null) { + amqpAnnotatedMessage.getMessageAnnotations().put(SCHEDULED_ENQUEUE_UTC_TIME_NAME.getValue(), + scheduledEnqueueTime); + } return this; } @@ -368,7 +409,7 @@ public ServiceBusMessage setScheduledEnqueueTime(OffsetDateTime scheduledEnqueue * Routing and Correlation */ public String getReplyToSessionId() { - return replyToSessionId; + return amqpAnnotatedMessage.getProperties().getReplyToGroupId(); } /** @@ -379,7 +420,7 @@ public String getReplyToSessionId() { * @return The updated {@link ServiceBusMessage}. */ public ServiceBusMessage setReplyToSessionId(String replyToSessionId) { - this.replyToSessionId = replyToSessionId; + amqpAnnotatedMessage.getProperties().setReplyToGroupId(replyToSessionId); return this; } @@ -395,7 +436,7 @@ public ServiceBusMessage setReplyToSessionId(String replyToSessionId) { * and Send Via */ public String getViaPartitionKey() { - return viaPartitionKey; + return (String) amqpAnnotatedMessage.getMessageAnnotations().get(VIA_PARTITION_KEY_ANNOTATION_NAME.getValue()); } /** @@ -407,7 +448,7 @@ public String getViaPartitionKey() { * @see #getViaPartitionKey() */ public ServiceBusMessage setViaPartitionKey(String viaPartitionKey) { - this.viaPartitionKey = viaPartitionKey; + amqpAnnotatedMessage.getMessageAnnotations().put(VIA_PARTITION_KEY_ANNOTATION_NAME.getValue(), viaPartitionKey); return this; } @@ -417,7 +458,7 @@ public ServiceBusMessage setViaPartitionKey(String viaPartitionKey) { * @return Session Id of the {@link ServiceBusMessage}. */ public String getSessionId() { - return sessionId; + return amqpAnnotatedMessage.getProperties().getGroupId(); } /** @@ -428,7 +469,7 @@ public String getSessionId() { * @return The updated {@link ServiceBusMessage}. */ public ServiceBusMessage setSessionId(String sessionId) { - this.sessionId = sessionId; + amqpAnnotatedMessage.getProperties().setGroupId(sessionId); return this; } @@ -458,4 +499,13 @@ public ServiceBusMessage addContext(String key, Object value) { return this; } + + /* + * Gets value from given map. + */ + private void removeValues(Map dataMap, AmqpMessageConstant... keys) { + for (AmqpMessageConstant key : keys) { + dataMap.remove(key.getValue()); + } + } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessageBatch.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessageBatch.java index d4c94b1dcb775..57d5f363fc8da 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessageBatch.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessageBatch.java @@ -144,7 +144,8 @@ private ServiceBusMessage traceMessageSpan(ServiceBusMessage serviceBusMessage) Context eventSpanContext = tracerProvider.startSpan(serviceBusMessage.getContext(), ProcessKind.MESSAGE); Optional eventDiagnosticIdOptional = eventSpanContext.getData(DIAGNOSTIC_ID_KEY); if (eventDiagnosticIdOptional.isPresent()) { - serviceBusMessage.getProperties().put(DIAGNOSTIC_ID_KEY, eventDiagnosticIdOptional.get().toString()); + serviceBusMessage.getApplicationProperties().put(DIAGNOSTIC_ID_KEY, eventDiagnosticIdOptional.get() + .toString()); tracerProvider.endSpan(eventSpanContext, Signal.complete()); serviceBusMessage.addContext(SPAN_CONTEXT_KEY, eventSpanContext); } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessageSerializer.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessageSerializer.java index 9e06ac82c6748..ad2aab025c63c 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessageSerializer.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessageSerializer.java @@ -3,9 +3,15 @@ package com.azure.messaging.servicebus; +import static com.azure.core.amqp.AmqpMessageConstant.SCHEDULED_ENQUEUE_UTC_TIME_NAME; +import static com.azure.core.amqp.AmqpMessageConstant.PARTITION_KEY_ANNOTATION_NAME; +import static com.azure.core.amqp.AmqpMessageConstant.VIA_PARTITION_KEY_ANNOTATION_NAME; import com.azure.core.amqp.exception.AmqpResponseCode; import com.azure.core.amqp.implementation.MessageSerializer; import com.azure.core.amqp.implementation.RequestResponseUtils; +import com.azure.core.amqp.models.AmqpAnnotatedMessage; +import com.azure.core.amqp.models.AmqpMessageHeader; +import com.azure.core.amqp.models.AmqpMessageProperties; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.servicebus.implementation.ManagementConstants; import com.azure.messaging.servicebus.implementation.MessageWithLockToken; @@ -24,6 +30,8 @@ import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; +import org.apache.qpid.proton.amqp.messaging.Footer; import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; import org.apache.qpid.proton.amqp.messaging.Properties; import org.apache.qpid.proton.amqp.messaging.Section; @@ -53,18 +61,6 @@ */ class ServiceBusMessageSerializer implements MessageSerializer { private static final byte[] EMPTY_BYTE_ARRAY = new byte[0]; - private static final String ENQUEUED_TIME_UTC_NAME = "x-opt-enqueued-time"; - private static final String SCHEDULED_ENQUEUE_TIME_NAME = "x-opt-scheduled-enqueue-time"; - private static final String SEQUENCE_NUMBER_NAME = "x-opt-sequence-number"; - private static final String LOCKED_UNTIL_NAME = "x-opt-locked-until"; - private static final String PARTITION_KEY_NAME = "x-opt-partition-key"; - private static final String VIA_PARTITION_KEY_NAME = "x-opt-via-partition-key"; - private static final String DEAD_LETTER_SOURCE_NAME = "x-opt-deadletter-source"; - private static final String DEAD_LETTER_DESCRIPTION = "DeadLetterErrorDescription"; - private static final String DEAD_LETTER_REASON = "DeadLetterReason"; - - // This one appears to always be 0, but is always returned with each message. - private static final String ENQUEUED_SEQUENCE_NUMBER = "x-opt-enqueue-sequence-number"; private final ClientLogger logger = new ClientLogger(ServiceBusMessageSerializer.class); @@ -132,8 +128,8 @@ public Message serialize(T object) { //TODO (conniey): support AMQP sequence and AMQP value. amqpMessage.setBody(new Data(new Binary(body))); - if (brokeredMessage.getProperties() != null) { - amqpMessage.setApplicationProperties(new ApplicationProperties(brokeredMessage.getProperties())); + if (brokeredMessage.getApplicationProperties() != null) { + amqpMessage.setApplicationProperties(new ApplicationProperties(brokeredMessage.getApplicationProperties())); } if (brokeredMessage.getTimeToLive() != null) { @@ -143,34 +139,82 @@ public Message serialize(T object) { if (amqpMessage.getProperties() == null) { amqpMessage.setProperties(new Properties()); } - amqpMessage.setMessageId(brokeredMessage.getMessageId()); amqpMessage.setContentType(brokeredMessage.getContentType()); amqpMessage.setCorrelationId(brokeredMessage.getCorrelationId()); - amqpMessage.setSubject(brokeredMessage.getLabel()); - amqpMessage.getProperties().setTo(brokeredMessage.getTo()); + amqpMessage.setSubject(brokeredMessage.getSubject()); amqpMessage.setReplyTo(brokeredMessage.getReplyTo()); amqpMessage.setReplyToGroupId(brokeredMessage.getReplyToSessionId()); amqpMessage.setGroupId(brokeredMessage.getSessionId()); + final AmqpMessageProperties brokeredProperties = brokeredMessage.getAmqpAnnotatedMessage().getProperties(); + + amqpMessage.setContentEncoding(brokeredProperties.getContentEncoding()); + if (brokeredProperties.getGroupSequence() != null) { + amqpMessage.setGroupSequence(brokeredProperties.getGroupSequence()); + } + amqpMessage.getProperties().setTo(brokeredMessage.getTo()); + amqpMessage.getProperties().setUserId(new Binary(brokeredProperties.getUserId())); + + if (brokeredProperties.getAbsoluteExpiryTime() != null) { + amqpMessage.getProperties().setAbsoluteExpiryTime(Date.from(brokeredProperties.getAbsoluteExpiryTime() + .toInstant())); + } + if (brokeredProperties.getCreationTime() != null) { + amqpMessage.getProperties().setCreationTime(Date.from(brokeredProperties.getCreationTime().toInstant())); + } + + //set footer + amqpMessage.setFooter(new Footer(brokeredMessage.getAmqpAnnotatedMessage().getFooter())); + + //set header + AmqpMessageHeader header = brokeredMessage.getAmqpAnnotatedMessage().getHeader(); + if (header.getDeliveryCount() != null) { + amqpMessage.setDeliveryCount(header.getDeliveryCount()); + } + if (header.getPriority() != null) { + amqpMessage.setPriority(header.getPriority()); + } + if (header.isDurable() != null) { + amqpMessage.setDurable(header.isDurable()); + } + if (header.isFirstAcquirer() != null) { + amqpMessage.setFirstAcquirer(header.isFirstAcquirer()); + } + if (header.getTimeToLive() != null) { + amqpMessage.setTtl(header.getTimeToLive().toMillis()); + } + final Map messageAnnotationsMap = new HashMap<>(); if (brokeredMessage.getScheduledEnqueueTime() != null) { - messageAnnotationsMap.put(Symbol.valueOf(SCHEDULED_ENQUEUE_TIME_NAME), + messageAnnotationsMap.put(Symbol.valueOf(SCHEDULED_ENQUEUE_UTC_TIME_NAME.getValue()), Date.from(brokeredMessage.getScheduledEnqueueTime().toInstant())); } final String partitionKey = brokeredMessage.getPartitionKey(); if (partitionKey != null && !partitionKey.isEmpty()) { - messageAnnotationsMap.put(Symbol.valueOf(PARTITION_KEY_NAME), brokeredMessage.getPartitionKey()); + messageAnnotationsMap.put(Symbol.valueOf(PARTITION_KEY_ANNOTATION_NAME.getValue()), + brokeredMessage.getPartitionKey()); } final String viaPartitionKey = brokeredMessage.getViaPartitionKey(); if (viaPartitionKey != null && !viaPartitionKey.isEmpty()) { - messageAnnotationsMap.put(Symbol.valueOf(VIA_PARTITION_KEY_NAME), viaPartitionKey); + messageAnnotationsMap.put(Symbol.valueOf(VIA_PARTITION_KEY_ANNOTATION_NAME.getValue()), viaPartitionKey); } amqpMessage.setMessageAnnotations(new MessageAnnotations(messageAnnotationsMap)); + // Set Delivery Annotations. + final Map deliveryAnnotationsMap = new HashMap<>(); + + final Map deliveryAnnotations = brokeredMessage.getAmqpAnnotatedMessage() + .getDeliveryAnnotations(); + for (Map.Entry deliveryEntry : deliveryAnnotations.entrySet()) { + deliveryAnnotationsMap.put(Symbol.valueOf(deliveryEntry.getKey()), deliveryEntry.getValue()); + } + + amqpMessage.setDeliveryAnnotations(new DeliveryAnnotations(deliveryAnnotationsMap)); + return amqpMessage; } @@ -317,106 +361,92 @@ private List deserializeListOfMessages(Message amqpMe } private ServiceBusReceivedMessage deserializeMessage(Message amqpMessage) { - final ServiceBusReceivedMessage brokeredMessage; + final byte[] bytes; final Section body = amqpMessage.getBody(); if (body != null) { //TODO (conniey): Support other AMQP types like AmqpValue and AmqpSequence. if (body instanceof Data) { final Binary messageData = ((Data) body).getValue(); - final byte[] bytes = messageData.getArray(); - brokeredMessage = new ServiceBusReceivedMessage(bytes); + bytes = messageData.getArray(); } else { logger.warning(String.format(Messages.MESSAGE_NOT_OF_TYPE, body.getType())); - brokeredMessage = new ServiceBusReceivedMessage(EMPTY_BYTE_ARRAY); + bytes = EMPTY_BYTE_ARRAY; } } else { logger.warning(String.format(Messages.MESSAGE_NOT_OF_TYPE, "null")); - brokeredMessage = new ServiceBusReceivedMessage(EMPTY_BYTE_ARRAY); + bytes = EMPTY_BYTE_ARRAY; } + final ServiceBusReceivedMessage brokeredMessage = new ServiceBusReceivedMessage(bytes); + AmqpAnnotatedMessage brokeredAmqpAnnotatedMessage = brokeredMessage.getAmqpAnnotatedMessage(); // Application properties ApplicationProperties applicationProperties = amqpMessage.getApplicationProperties(); if (applicationProperties != null) { final Map propertiesValue = applicationProperties.getValue(); - brokeredMessage.getProperties().putAll(propertiesValue); - - if (propertiesValue.containsKey(DEAD_LETTER_REASON)) { - brokeredMessage.setDeadLetterReason(String.valueOf(propertiesValue.get(DEAD_LETTER_REASON))); - } - if (propertiesValue.containsKey(DEAD_LETTER_DESCRIPTION)) { - brokeredMessage.setDeadLetterErrorDescription(String.valueOf( - propertiesValue.get(DEAD_LETTER_DESCRIPTION))); - } + brokeredAmqpAnnotatedMessage.getApplicationProperties().putAll(propertiesValue); } // Header - brokeredMessage.setTimeToLive(Duration.ofMillis(amqpMessage.getTtl())); - brokeredMessage.setDeliveryCount(amqpMessage.getDeliveryCount()); + final AmqpMessageHeader brokeredHeader = brokeredAmqpAnnotatedMessage.getHeader(); + brokeredHeader.setTimeToLive(Duration.ofMillis(amqpMessage.getTtl())); + brokeredHeader.setDeliveryCount(amqpMessage.getDeliveryCount()); + brokeredHeader.setDurable(amqpMessage.getHeader().getDurable()); + brokeredHeader.setFirstAcquirer(amqpMessage.getHeader().getFirstAcquirer()); + brokeredHeader.setPriority(amqpMessage.getPriority()); + + // Footer + final Footer footer = amqpMessage.getFooter(); + if (footer != null && footer.getValue() != null) { + @SuppressWarnings("unchecked") final Map footerValue = footer.getValue(); + setValues(footerValue, brokeredAmqpAnnotatedMessage.getFooter()); + + } // Properties + final AmqpMessageProperties brokeredProperties = brokeredAmqpAnnotatedMessage.getProperties(); + brokeredProperties.setReplyToGroupId(amqpMessage.getReplyToGroupId()); + brokeredProperties.setReplyTo(amqpMessage.getReplyTo()); final Object messageId = amqpMessage.getMessageId(); if (messageId != null) { - brokeredMessage.setMessageId(messageId.toString()); + brokeredProperties.setMessageId(messageId.toString()); } - brokeredMessage.setContentType(amqpMessage.getContentType()); + brokeredProperties.setContentType(amqpMessage.getContentType()); final Object correlationId = amqpMessage.getCorrelationId(); if (correlationId != null) { - brokeredMessage.setCorrelationId(correlationId.toString()); + brokeredProperties.setCorrelationId(correlationId.toString()); } - final Properties properties = amqpMessage.getProperties(); - if (properties != null) { - brokeredMessage.setTo(properties.getTo()); + final Properties amqpProperties = amqpMessage.getProperties(); + if (amqpProperties != null) { + brokeredProperties.setTo(amqpProperties.getTo()); + + if (amqpProperties.getAbsoluteExpiryTime() != null) { + brokeredProperties.setAbsoluteExpiryTime(amqpProperties.getAbsoluteExpiryTime().toInstant() + .atOffset(ZoneOffset.UTC)); + } + if (amqpProperties.getCreationTime() != null) { + brokeredProperties.setCreationTime(amqpProperties.getCreationTime().toInstant() + .atOffset(ZoneOffset.UTC)); + } } - brokeredMessage.setLabel(amqpMessage.getSubject()); - brokeredMessage.setReplyTo(amqpMessage.getReplyTo()); - brokeredMessage.setReplyToSessionId(amqpMessage.getReplyToGroupId()); - brokeredMessage.setSessionId(amqpMessage.getGroupId()); + brokeredProperties.setSubject(amqpMessage.getSubject()); + brokeredProperties.setGroupId(amqpMessage.getGroupId()); + brokeredProperties.setContentEncoding(amqpMessage.getContentEncoding()); + brokeredProperties.setGroupSequence(amqpMessage.getGroupSequence()); + brokeredProperties.setUserId(amqpMessage.getUserId()); + + // DeliveryAnnotations + final DeliveryAnnotations deliveryAnnotations = amqpMessage.getDeliveryAnnotations(); + if (deliveryAnnotations != null) { + setValues(deliveryAnnotations.getValue(), brokeredAmqpAnnotatedMessage.getDeliveryAnnotations()); + } // Message Annotations final MessageAnnotations messageAnnotations = amqpMessage.getMessageAnnotations(); if (messageAnnotations != null) { - Map messageAnnotationsMap = messageAnnotations.getValue(); - if (messageAnnotationsMap != null) { - for (Map.Entry entry : messageAnnotationsMap.entrySet()) { - final String key = entry.getKey().toString(); - final Object value = entry.getValue(); - - switch (key) { - case ENQUEUED_TIME_UTC_NAME: - brokeredMessage.setEnqueuedTime(((Date) value).toInstant().atOffset(ZoneOffset.UTC)); - - break; - case SCHEDULED_ENQUEUE_TIME_NAME: - brokeredMessage.setScheduledEnqueueTime(((Date) value).toInstant() - .atOffset(ZoneOffset.UTC)); - break; - case SEQUENCE_NUMBER_NAME: - brokeredMessage.setSequenceNumber((long) value); - break; - case LOCKED_UNTIL_NAME: - brokeredMessage.setLockedUntil(((Date) value).toInstant().atOffset(ZoneOffset.UTC)); - break; - case PARTITION_KEY_NAME: - brokeredMessage.setPartitionKey((String) value); - break; - case VIA_PARTITION_KEY_NAME: - brokeredMessage.setViaPartitionKey((String) value); - break; - case DEAD_LETTER_SOURCE_NAME: - brokeredMessage.setDeadLetterSource((String) value); - break; - case ENQUEUED_SEQUENCE_NUMBER: - brokeredMessage.setEnqueuedSequenceNumber((long) value); - break; - default: - logger.info("Unrecognised key: {}, value: {}", key, value); - break; - } - } - } + setValues(messageAnnotations.getValue(), brokeredAmqpAnnotatedMessage.getMessageAnnotations()); } if (amqpMessage instanceof MessageWithLockToken) { @@ -445,6 +475,14 @@ private static int getPayloadSize(Message msg) { } } + private void setValues(Map sourceMap, Map targetMap) { + if (sourceMap != null) { + for (Map.Entry entry : sourceMap.entrySet()) { + targetMap.put(entry.getKey().toString(), entry.getValue()); + } + } + } + @SuppressWarnings("rawtypes") private static int sizeof(Object obj) { if (obj == null) { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceivedMessage.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceivedMessage.java index e709f04901e93..33373fe9d7596 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceivedMessage.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceivedMessage.java @@ -3,12 +3,31 @@ package com.azure.messaging.servicebus; +import static com.azure.core.amqp.AmqpMessageConstant.ENQUEUED_TIME_UTC_ANNOTATION_NAME; +import static com.azure.core.amqp.AmqpMessageConstant.PARTITION_KEY_ANNOTATION_NAME; +import static com.azure.core.amqp.AmqpMessageConstant.SCHEDULED_ENQUEUE_UTC_TIME_NAME; +import static com.azure.core.amqp.AmqpMessageConstant.VIA_PARTITION_KEY_ANNOTATION_NAME; +import static com.azure.core.amqp.AmqpMessageConstant.SEQUENCE_NUMBER_ANNOTATION_NAME; +import static com.azure.core.amqp.AmqpMessageConstant.LOCKED_UNTIL_KEY_ANNOTATION_NAME; +import static com.azure.core.amqp.AmqpMessageConstant.DEAD_LETTER_SOURCE_KEY_ANNOTATION_NAME; +import static com.azure.core.amqp.AmqpMessageConstant.ENQUEUED_SEQUENCE_NUMBER_ANNOTATION_NAME; +import static com.azure.core.amqp.AmqpMessageConstant.DEAD_LETTER_DESCRIPTION_ANNOTATION_NAME; +import static com.azure.core.amqp.AmqpMessageConstant.DEAD_LETTER_REASON_ANNOTATION_NAME; + +import com.azure.core.amqp.AmqpMessageConstant; +import com.azure.core.amqp.models.AmqpAnnotatedMessage; +import com.azure.core.amqp.models.AmqpBodyType; +import com.azure.core.amqp.models.AmqpDataBody; +import com.azure.core.amqp.models.BinaryData; +import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.servicebus.models.ReceiveMode; import java.time.Duration; import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.util.Arrays; -import java.util.HashMap; +import java.util.Collections; +import java.util.Date; import java.util.Map; import java.util.Objects; import java.util.UUID; @@ -17,34 +36,28 @@ * This class represents a received message from Service Bus. */ public final class ServiceBusReceivedMessage { + private final ClientLogger logger = new ClientLogger(ServiceBusReceivedMessage.class); + + private final AmqpAnnotatedMessage amqpAnnotatedMessage; + private final byte[] binaryData; private UUID lockToken; - private long sequenceNumber; - private long enqueuedSequenceNumber; - private long deliveryCount; - private OffsetDateTime enqueuedTime; - private OffsetDateTime lockedUntil; - private String deadLetterSource; - - private final Map properties; - private final byte[] body; - private String contentType; - private String correlationId; - private String label; - private String messageId; - private String partitionKey; - private String replyTo; - private String replyToSessionId; - private OffsetDateTime scheduledEnqueueTime; - private String sessionId; - private Duration timeToLive; - private String to; - private String viaPartitionKey; - private String deadLetterReason; - private String deadLetterErrorDescription; + + /** + * The representation of message as defined by AMQP protocol. + * + * @see + * Amqp Message Format. + * + * @return the {@link AmqpAnnotatedMessage} representing amqp message. + */ + public AmqpAnnotatedMessage getAmqpAnnotatedMessage() { + return amqpAnnotatedMessage; + } ServiceBusReceivedMessage(byte[] body) { - this.body = Objects.requireNonNull(body, "'body' cannot be null."); - this.properties = new HashMap<>(); + binaryData = Objects.requireNonNull(body, "'body' cannot be null."); + amqpAnnotatedMessage = new AmqpAnnotatedMessage(new AmqpDataBody(Collections.singletonList( + new BinaryData(binaryData)))); } /** @@ -52,14 +65,26 @@ public final class ServiceBusReceivedMessage { * *

* If the means for deserializing the raw data is not apparent to consumers, a common technique is to make use of - * {@link #getProperties()} when creating the event, to associate serialization hints as an aid to consumers who - * wish to deserialize the binary data. + * {@link #getApplicationProperties()} when creating the event, to associate serialization hints as an aid to + * consumers who wish to deserialize the binary data. *

* * @return A byte array representing the data. */ public byte[] getBody() { - return Arrays.copyOf(body, body.length); + final AmqpBodyType bodyType = amqpAnnotatedMessage.getBody().getBodyType(); + switch (bodyType) { + case DATA: + return Arrays.copyOf(binaryData, binaryData.length); + case SEQUENCE: + case VALUE: + throw logger.logExceptionAsError(new UnsupportedOperationException("Body type not supported yet " + + bodyType.toString())); + default: + logger.warning("Invalid body type {}.", bodyType); + throw logger.logExceptionAsError(new IllegalStateException("Body type not valid " + + bodyType.toString())); + } } /** @@ -68,7 +93,7 @@ public byte[] getBody() { * @return the contentType of the {@link ServiceBusReceivedMessage}. */ public String getContentType() { - return contentType; + return amqpAnnotatedMessage.getProperties().getContentType(); } /** @@ -84,7 +109,7 @@ public String getContentType() { * Routing and Correlation */ public String getCorrelationId() { - return correlationId; + return amqpAnnotatedMessage.getProperties().getCorrelationId(); } /** @@ -93,7 +118,8 @@ public String getCorrelationId() { * @return The description for a message that has been dead-lettered. */ public String getDeadLetterErrorDescription() { - return deadLetterErrorDescription; + return getStringValue(amqpAnnotatedMessage.getApplicationProperties(), + DEAD_LETTER_DESCRIPTION_ANNOTATION_NAME.getValue()); } /** @@ -102,7 +128,8 @@ public String getDeadLetterErrorDescription() { * @return The reason for a message that has been dead-lettered. */ public String getDeadLetterReason() { - return deadLetterReason; + return getStringValue(amqpAnnotatedMessage.getApplicationProperties(), + DEAD_LETTER_REASON_ANNOTATION_NAME.getValue()); } /** @@ -119,7 +146,8 @@ public String getDeadLetterReason() { * queues */ public String getDeadLetterSource() { - return deadLetterSource; + return getStringValue(amqpAnnotatedMessage.getMessageAnnotations(), + DEAD_LETTER_SOURCE_KEY_ANNOTATION_NAME.getValue()); } /** @@ -134,7 +162,7 @@ public String getDeadLetterSource() { * transfers, locks, and settlement. */ public long getDeliveryCount() { - return deliveryCount; + return amqpAnnotatedMessage.getHeader().getDeliveryCount(); } /** @@ -149,7 +177,8 @@ public long getDeliveryCount() { * Timestamps */ public long getEnqueuedSequenceNumber() { - return this.enqueuedSequenceNumber; + return getLongValue(amqpAnnotatedMessage.getMessageAnnotations(), + ENQUEUED_SEQUENCE_NUMBER_ANNOTATION_NAME.getValue()); } /** @@ -165,7 +194,8 @@ public long getEnqueuedSequenceNumber() { * Timestamps */ public OffsetDateTime getEnqueuedTime() { - return enqueuedTime; + return getOffsetDateTimeValue(amqpAnnotatedMessage.getMessageAnnotations(), + ENQUEUED_TIME_UTC_ANNOTATION_NAME.getValue()); } /** @@ -182,6 +212,7 @@ public OffsetDateTime getEnqueuedTime() { */ public OffsetDateTime getExpiresAt() { final Duration timeToLive = getTimeToLive(); + final OffsetDateTime enqueuedTime = getEnqueuedTime(); return enqueuedTime != null && timeToLive != null ? enqueuedTime.plus(timeToLive) : null; @@ -193,7 +224,7 @@ public OffsetDateTime getExpiresAt() { * @return The label for the message. */ public String getLabel() { - return label; + return amqpAnnotatedMessage.getProperties().getSubject(); } /** @@ -232,14 +263,15 @@ public String getLockToken() { * transfers, locks, and settlement */ public OffsetDateTime getLockedUntil() { - return lockedUntil; + return getOffsetDateTimeValue(amqpAnnotatedMessage.getMessageAnnotations(), + LOCKED_UNTIL_KEY_ANNOTATION_NAME.getValue()); } /** * @return Id of the {@link ServiceBusReceivedMessage}. */ public String getMessageId() { - return messageId; + return amqpAnnotatedMessage.getProperties().getMessageId(); } /** @@ -257,7 +289,8 @@ public String getMessageId() { * entities */ public String getPartitionKey() { - return partitionKey; + return getStringValue(amqpAnnotatedMessage.getMessageAnnotations(), + PARTITION_KEY_ANNOTATION_NAME.getValue()); } /** @@ -268,8 +301,8 @@ public String getPartitionKey() { * * @return Application properties associated with this {@link ServiceBusReceivedMessage}. */ - public Map getProperties() { - return properties; + public Map getApplicationProperties() { + return amqpAnnotatedMessage.getApplicationProperties(); } /** @@ -285,7 +318,7 @@ public Map getProperties() { * Routing and Correlation */ public String getReplyTo() { - return replyTo; + return amqpAnnotatedMessage.getProperties().getReplyTo(); } /** @@ -300,7 +333,7 @@ public String getReplyTo() { * Routing and Correlation */ public String getReplyToSessionId() { - return replyToSessionId; + return amqpAnnotatedMessage.getProperties().getReplyToGroupId(); } /** @@ -318,7 +351,8 @@ public String getReplyToSessionId() { * Timestamps */ public OffsetDateTime getScheduledEnqueueTime() { - return scheduledEnqueueTime; + return getOffsetDateTimeValue(amqpAnnotatedMessage.getMessageAnnotations(), + SCHEDULED_ENQUEUE_UTC_TIME_NAME.getValue()); } /** @@ -335,7 +369,8 @@ public OffsetDateTime getScheduledEnqueueTime() { * Timestamps */ public long getSequenceNumber() { - return this.sequenceNumber; + return getLongValue(amqpAnnotatedMessage.getMessageAnnotations(), + SEQUENCE_NUMBER_ANNOTATION_NAME.getValue()); } /** @@ -344,7 +379,7 @@ public long getSequenceNumber() { * @return Session Id of the {@link ServiceBusReceivedMessage}. */ public String getSessionId() { - return sessionId; + return amqpAnnotatedMessage.getProperties().getGroupId(); } /** @@ -361,7 +396,7 @@ public String getSessionId() { * @see Message Expiration */ public Duration getTimeToLive() { - return timeToLive; + return amqpAnnotatedMessage.getHeader().getTimeToLive(); } /** @@ -370,7 +405,7 @@ public Duration getTimeToLive() { * @return "To" property value of this message */ public String getTo() { - return to; + return amqpAnnotatedMessage.getProperties().getTo(); } /** @@ -385,7 +420,8 @@ public String getTo() { * @see Transfers and Send Via */ public String getViaPartitionKey() { - return viaPartitionKey; + return getStringValue(amqpAnnotatedMessage.getMessageAnnotations(), + VIA_PARTITION_KEY_ANNOTATION_NAME.getValue()); } /** @@ -396,7 +432,7 @@ public String getViaPartitionKey() { * @see #getCorrelationId() */ void setCorrelationId(String correlationId) { - this.correlationId = correlationId; + amqpAnnotatedMessage.getProperties().setCorrelationId(correlationId); } /** @@ -405,7 +441,7 @@ void setCorrelationId(String correlationId) { * @param contentType of the message. */ void setContentType(String contentType) { - this.contentType = contentType; + amqpAnnotatedMessage.getProperties().setContentType(contentType); } /** @@ -414,7 +450,8 @@ void setContentType(String contentType) { * @param deadLetterErrorDescription Dead letter description. */ void setDeadLetterErrorDescription(String deadLetterErrorDescription) { - this.deadLetterErrorDescription = deadLetterErrorDescription; + amqpAnnotatedMessage.getApplicationProperties().put(DEAD_LETTER_DESCRIPTION_ANNOTATION_NAME.getValue(), + deadLetterErrorDescription); } /** @@ -423,7 +460,8 @@ void setDeadLetterErrorDescription(String deadLetterErrorDescription) { * @param deadLetterReason Dead letter reason. */ void setDeadLetterReason(String deadLetterReason) { - this.deadLetterReason = deadLetterReason; + amqpAnnotatedMessage.getApplicationProperties().put(DEAD_LETTER_REASON_ANNOTATION_NAME.getValue(), + deadLetterReason); } /** @@ -434,7 +472,8 @@ void setDeadLetterReason(String deadLetterReason) { * before it was deadlettered. */ void setDeadLetterSource(String deadLetterSource) { - this.deadLetterSource = deadLetterSource; + amqpAnnotatedMessage.getMessageAnnotations().put(DEAD_LETTER_SOURCE_KEY_ANNOTATION_NAME.getValue(), + deadLetterSource); } /** @@ -443,11 +482,12 @@ void setDeadLetterSource(String deadLetterSource) { * @param deliveryCount the number of the times this message was delivered to clients. */ void setDeliveryCount(long deliveryCount) { - this.deliveryCount = deliveryCount; + amqpAnnotatedMessage.getHeader().setDeliveryCount(deliveryCount); } void setEnqueuedSequenceNumber(long enqueuedSequenceNumber) { - this.enqueuedSequenceNumber = enqueuedSequenceNumber; + amqpAnnotatedMessage.getMessageAnnotations().put(ENQUEUED_SEQUENCE_NUMBER_ANNOTATION_NAME.getValue(), + enqueuedSequenceNumber); } /** @@ -456,16 +496,16 @@ void setEnqueuedSequenceNumber(long enqueuedSequenceNumber) { * @param enqueuedTime the datetime at which this message was enqueued in Azure Service Bus. */ void setEnqueuedTime(OffsetDateTime enqueuedTime) { - this.enqueuedTime = enqueuedTime; + setValue(amqpAnnotatedMessage.getMessageAnnotations(), ENQUEUED_TIME_UTC_ANNOTATION_NAME, enqueuedTime); } /** - * Sets the label for the message. + * Sets the subject for the message. * - * @param label The label to set. + * @param subject The subject to set. */ - void setLabel(String label) { - this.label = label; + void setSubject(String subject) { + amqpAnnotatedMessage.getProperties().setSubject(subject); } /** @@ -483,7 +523,7 @@ void setLockToken(UUID lockToken) { * @param lockedUntil the datetime at which the lock of this message expires. */ void setLockedUntil(OffsetDateTime lockedUntil) { - this.lockedUntil = lockedUntil; + setValue(amqpAnnotatedMessage.getMessageAnnotations(), LOCKED_UNTIL_KEY_ANNOTATION_NAME, lockedUntil); } /** @@ -492,7 +532,7 @@ void setLockedUntil(OffsetDateTime lockedUntil) { * @param messageId to be set. */ void setMessageId(String messageId) { - this.messageId = messageId; + amqpAnnotatedMessage.getProperties().setMessageId(messageId); } /** @@ -503,7 +543,7 @@ void setMessageId(String messageId) { * @see #getPartitionKey() */ void setPartitionKey(String partitionKey) { - this.partitionKey = partitionKey; + amqpAnnotatedMessage.getMessageAnnotations().put(PARTITION_KEY_ANNOTATION_NAME.getValue(), partitionKey); } /** @@ -514,7 +554,7 @@ void setPartitionKey(String partitionKey) { * @see #getScheduledEnqueueTime() */ void setScheduledEnqueueTime(OffsetDateTime scheduledEnqueueTime) { - this.scheduledEnqueueTime = scheduledEnqueueTime; + setValue(amqpAnnotatedMessage.getMessageAnnotations(), SCHEDULED_ENQUEUE_UTC_TIME_NAME, scheduledEnqueueTime); } /** @@ -523,7 +563,7 @@ void setScheduledEnqueueTime(OffsetDateTime scheduledEnqueueTime) { * @param sequenceNumber the unique number assigned to a message by Service Bus. */ void setSequenceNumber(long sequenceNumber) { - this.sequenceNumber = sequenceNumber; + amqpAnnotatedMessage.getMessageAnnotations().put(SEQUENCE_NUMBER_ANNOTATION_NAME.getValue(), sequenceNumber); } /** @@ -532,7 +572,7 @@ void setSequenceNumber(long sequenceNumber) { * @param sessionId to be set. */ void setSessionId(String sessionId) { - this.sessionId = sessionId; + amqpAnnotatedMessage.getProperties().setGroupId(sessionId); } /** @@ -543,7 +583,7 @@ void setSessionId(String sessionId) { * @see #getTimeToLive() */ void setTimeToLive(Duration timeToLive) { - this.timeToLive = timeToLive; + amqpAnnotatedMessage.getHeader().setTimeToLive(timeToLive); } /** @@ -554,7 +594,7 @@ void setTimeToLive(Duration timeToLive) { * @see #getReplyTo() */ void setReplyTo(String replyTo) { - this.replyTo = replyTo; + amqpAnnotatedMessage.getProperties().setReplyTo(replyTo); } /** @@ -563,7 +603,7 @@ void setReplyTo(String replyTo) { * @param replyToSessionId ReplyToSessionId property value of this message */ void setReplyToSessionId(String replyToSessionId) { - this.replyToSessionId = replyToSessionId; + amqpAnnotatedMessage.getProperties().setReplyToGroupId(replyToSessionId); } /** @@ -577,7 +617,7 @@ void setReplyToSessionId(String replyToSessionId) { * @param to To property value of this message */ void setTo(String to) { - this.to = to; + amqpAnnotatedMessage.getProperties().setTo(to); } /** @@ -588,6 +628,34 @@ void setTo(String to) { * @see #getViaPartitionKey() */ void setViaPartitionKey(String viaPartitionKey) { - this.viaPartitionKey = viaPartitionKey; + amqpAnnotatedMessage.getMessageAnnotations().put(VIA_PARTITION_KEY_ANNOTATION_NAME.getValue(), viaPartitionKey); + } + + /* + * Gets String value from given map and null if key does not exists. + */ + private String getStringValue(Map dataMap, String key) { + return (String) dataMap.get(key); + } + + /* + * Gets long value from given map and 0 if key does not exists. + */ + private long getLongValue(Map dataMap, String key) { + return dataMap.containsKey(key) ? (long) dataMap.get(key) : 0; + } + + /* + * Gets OffsetDateTime value from given map and null if key does not exists. + */ + private OffsetDateTime getOffsetDateTimeValue(Map dataMap, String key) { + return dataMap.containsKey(key) ? ((Date) dataMap.get(key)).toInstant().atOffset(ZoneOffset.UTC) : null; + } + + private void setValue(Map dataMap, AmqpMessageConstant key, OffsetDateTime value) { + if (value != null) { + amqpAnnotatedMessage.getMessageAnnotations().put(key.getValue(), + new Date(value.toInstant().toEpochMilli())); + } } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java index c4fde770c3fc2..ffb3985056c0a 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java @@ -427,7 +427,7 @@ private Mono sendIterable(Iterable messages, ServiceBus } private Mono scheduleMessageInternal(ServiceBusMessage message, OffsetDateTime scheduledEnqueueTime, - ServiceBusTransactionContext transactionContext) { + ServiceBusTransactionContext transactionContext) { if (Objects.isNull(message)) { return monoError(logger, new NullPointerException("'message' cannot be null.")); } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/administration/models/CorrelationRuleFilter.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/administration/models/CorrelationRuleFilter.java index da66d912e9add..3792823e6e11b 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/administration/models/CorrelationRuleFilter.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/administration/models/CorrelationRuleFilter.java @@ -16,7 +16,7 @@ * A CorrelationRuleFilter holds a set of conditions that are matched against one of more of an arriving message's user * and system properties. A common use is a match against the {@link ServiceBusMessage#getCorrelationId()} property, but * the application can also choose to match against {@link ServiceBusMessage#getContentType()}, {@link - * ServiceBusMessage#getLabel()}, {@link ServiceBusMessage#getMessageId()}, {@link ServiceBusMessage#getReplyTo()}, + * ServiceBusMessage#getSubject()}, {@link ServiceBusMessage#getMessageId()}, {@link ServiceBusMessage#getReplyTo()}, * {@link ServiceBusMessage#getReplyToSessionId()}, {@link ServiceBusMessage#getSessionId()}, {@link * ServiceBusMessage#getTo()}, and any user-defined properties. A match exists when an arriving message's value for a * property is equal to the value specified in the correlation filter. For string expressions, the comparison is diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientJavaDocCodeSamples.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientJavaDocCodeSamples.java index 08dd588c842e8..a9dadc7bc03a1 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientJavaDocCodeSamples.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientJavaDocCodeSamples.java @@ -94,9 +94,9 @@ public void batchSizeLimited() { .buildAsyncClient(); final ServiceBusMessage firstMessage = new ServiceBusMessage("92".getBytes(UTF_8)); - firstMessage.getProperties().put("telemetry", "latency"); + firstMessage.getApplicationProperties().put("telemetry", "latency"); final ServiceBusMessage secondMessage = new ServiceBusMessage("98".getBytes(UTF_8)); - secondMessage.getProperties().put("telemetry", "cpu-temperature"); + secondMessage.getApplicationProperties().put("telemetry", "cpu-temperature"); // BEGIN: com.azure.messaging.servicebus.servicebusasyncsenderclient.createBatch#CreateBatchOptionsLimitedSize final Flux telemetryMessages = Flux.just(firstMessage, secondMessage); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusSenderClientJavaDocCodeSamples.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusSenderClientJavaDocCodeSamples.java index 3e1f5a8745268..b9506539e0829 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusSenderClientJavaDocCodeSamples.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusSenderClientJavaDocCodeSamples.java @@ -80,11 +80,11 @@ public void batchSizeLimited() { .buildClient(); final ServiceBusMessage firstMessage = new ServiceBusMessage("message-1".getBytes(UTF_8)); - firstMessage.getProperties().put("telemetry", "latency"); + firstMessage.getApplicationProperties().put("telemetry", "latency"); final ServiceBusMessage secondMessage = new ServiceBusMessage("message-2".getBytes(UTF_8)); - secondMessage.getProperties().put("telemetry", "cpu-temperature"); + secondMessage.getApplicationProperties().put("telemetry", "cpu-temperature"); final ServiceBusMessage thirdMessage = new ServiceBusMessage("message-3".getBytes(UTF_8)); - thirdMessage.getProperties().put("telemetry", "fps"); + thirdMessage.getApplicationProperties().put("telemetry", "fps"); // BEGIN: com.azure.messaging.servicebus.servicebussenderclient.createBatch#CreateBatchOptions-int diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusMessageSerializerTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusMessageSerializerTest.java index e8c2745955738..c8ac8ec4ed956 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusMessageSerializerTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusMessageSerializerTest.java @@ -5,8 +5,11 @@ import com.azure.core.amqp.exception.AmqpResponseCode; import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; +import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; +import org.apache.qpid.proton.amqp.messaging.Footer; import org.apache.qpid.proton.message.Message; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -82,34 +85,58 @@ void deserializeMessage() { message.setReplyToGroupId("reply-to-session-id-property"); message.setGroupId("session-id-as-a-group-id"); + // Message Annotations + Map expectedMessageAnnotations = message.getMessageAnnotations().getValue(); + expectedMessageAnnotations.put(Symbol.valueOf("A"), "A value"); + + // Message Annotations + Map expectedDeliveryAnnotations = new HashMap<>(); + expectedDeliveryAnnotations.put(Symbol.valueOf("D"), "D value"); + message.setDeliveryAnnotations(new DeliveryAnnotations(expectedDeliveryAnnotations)); + + Map expectedFooterValues = new HashMap<>(); + expectedFooterValues.put(Symbol.valueOf("footer1"), "footer value"); + message.setFooter(new Footer(expectedFooterValues)); + // Act - final ServiceBusReceivedMessage serviceBusMessage = serializer.deserialize(message, ServiceBusReceivedMessage.class); + final ServiceBusReceivedMessage actualMessage = serializer.deserialize(message, ServiceBusReceivedMessage.class); // Assert // Verifying all our system properties were properly deserialized. - assertNotNull(serviceBusMessage.getEnqueuedTime()); - assertEquals(SEQUENCE_NUMBER, serviceBusMessage.getSequenceNumber()); + assertNotNull(actualMessage.getEnqueuedTime()); + assertEquals(SEQUENCE_NUMBER, actualMessage.getSequenceNumber()); // Verifying that all our properties are set. - assertEquals(message.getTtl(), serviceBusMessage.getTimeToLive().toMillis()); - assertEquals(message.getSubject(), serviceBusMessage.getLabel()); - assertEquals(message.getReplyTo(), serviceBusMessage.getReplyTo()); - assertEquals(message.getDeliveryCount(), serviceBusMessage.getDeliveryCount()); - assertEquals(message.getProperties().getTo(), serviceBusMessage.getTo()); - assertEquals(message.getReplyToGroupId(), serviceBusMessage.getReplyToSessionId()); - assertEquals(message.getGroupId(), serviceBusMessage.getSessionId()); - assertEquals(message.getContentType(), serviceBusMessage.getContentType()); - assertEquals(message.getCorrelationId(), serviceBusMessage.getCorrelationId()); + assertEquals(message.getTtl(), actualMessage.getTimeToLive().toMillis()); + assertEquals(message.getSubject(), actualMessage.getLabel()); + assertEquals(message.getReplyTo(), actualMessage.getReplyTo()); + assertEquals(message.getDeliveryCount(), actualMessage.getDeliveryCount()); + assertEquals(message.getProperties().getTo(), actualMessage.getTo()); + assertEquals(message.getReplyToGroupId(), actualMessage.getReplyToSessionId()); + assertEquals(message.getGroupId(), actualMessage.getSessionId()); + assertEquals(message.getContentType(), actualMessage.getContentType()); + assertEquals(message.getCorrelationId(), actualMessage.getCorrelationId()); + + assertValues(expectedMessageAnnotations, actualMessage.getAmqpAnnotatedMessage().getMessageAnnotations()); + assertValues(expectedDeliveryAnnotations, actualMessage.getAmqpAnnotatedMessage().getDeliveryAnnotations()); + assertValues(expectedFooterValues, actualMessage.getAmqpAnnotatedMessage().getFooter()); // Verifying our application properties are the same. - assertEquals(APPLICATION_PROPERTIES.size(), serviceBusMessage.getProperties().size()); + assertEquals(APPLICATION_PROPERTIES.size(), actualMessage.getApplicationProperties().size()); APPLICATION_PROPERTIES.forEach((key, value) -> { - Assertions.assertTrue(serviceBusMessage.getProperties().containsKey(key)); - assertEquals(value, serviceBusMessage.getProperties().get(key)); + Assertions.assertTrue(actualMessage.getApplicationProperties().containsKey(key)); + assertEquals(value, actualMessage.getApplicationProperties().get(key)); }); // Verifying the contents of our message is the same. - assertEquals(payload, new String(serviceBusMessage.getBody(), UTF_8)); + assertEquals(payload, new String(actualMessage.getBody(), UTF_8)); + } + + private void assertValues(Map expected, Map actual) { + assertEquals(expected.size(), actual.size()); + for (Map.Entry expectedEntry : expected.entrySet()) { + assertEquals(expectedEntry.getValue(), actual.get(expectedEntry.getKey().toString())); + } } /** diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusMessageTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusMessageTest.java index b060a8d731561..aab956b1a4c55 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusMessageTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusMessageTest.java @@ -3,18 +3,158 @@ package com.azure.messaging.servicebus; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; +import static com.azure.core.amqp.AmqpMessageConstant.DEAD_LETTER_DESCRIPTION_ANNOTATION_NAME; +import static com.azure.core.amqp.AmqpMessageConstant.DEAD_LETTER_REASON_ANNOTATION_NAME; +import static com.azure.core.amqp.AmqpMessageConstant.DEAD_LETTER_SOURCE_KEY_ANNOTATION_NAME; +import static com.azure.core.amqp.AmqpMessageConstant.ENQUEUED_SEQUENCE_NUMBER_ANNOTATION_NAME; +import static com.azure.core.amqp.AmqpMessageConstant.ENQUEUED_TIME_UTC_ANNOTATION_NAME; +import static com.azure.core.amqp.AmqpMessageConstant.LOCKED_UNTIL_KEY_ANNOTATION_NAME; +import static com.azure.core.amqp.AmqpMessageConstant.PARTITION_KEY_ANNOTATION_NAME; +import static com.azure.core.amqp.AmqpMessageConstant.SEQUENCE_NUMBER_ANNOTATION_NAME; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.time.Duration; + +/** + * Test for {@link ServiceBusMessage}. + */ public class ServiceBusMessageTest { // Create a giant payload with 10000 characters that are "a". private static final String PAYLOAD = new String(new char[10000]).replace("\0", "a"); private static final byte[] PAYLOAD_BYTES = PAYLOAD.getBytes(UTF_8); + /** + * Verifies we correctly set values via copy constructor for {@link ServiceBusMessage}. + * 1. And ensure system properties are cleared. + * 2. Ensure modifying original `ServiceBusReceivedMessage` object does not change values of new ServiceBusMessage + * object created using original `ServiceBusReceivedMessage`. + */ + @Test + public void copyConstructorTest() { + // Arrange + final String expectedSubject = "old-subject"; + final String expectedTo = "old-to"; + final String expectedReplyTo = "old-reply-to"; + final String expectedReplyToSessionId = "old-reply-to-session-id"; + final String expectedCorrelationId = "old-d-id"; + final String expectedDeadLetterSource = "old-d-l-source"; + final Duration expectedTimeToLive = Duration.ofSeconds(20); + final String expectedPartitionKey = "old-p-key"; + + final ServiceBusReceivedMessage expected = new ServiceBusReceivedMessage(PAYLOAD_BYTES); + expected.getAmqpAnnotatedMessage().getMessageAnnotations().put(SEQUENCE_NUMBER_ANNOTATION_NAME.getValue(), "10"); + expected.getAmqpAnnotatedMessage().getMessageAnnotations().put(DEAD_LETTER_SOURCE_KEY_ANNOTATION_NAME.getValue(), "abc"); + expected.getAmqpAnnotatedMessage().getMessageAnnotations().put(ENQUEUED_SEQUENCE_NUMBER_ANNOTATION_NAME.getValue(), "11"); + expected.getAmqpAnnotatedMessage().getMessageAnnotations().put(ENQUEUED_TIME_UTC_ANNOTATION_NAME.getValue(), "11"); + expected.getAmqpAnnotatedMessage().getApplicationProperties().put(DEAD_LETTER_DESCRIPTION_ANNOTATION_NAME.getValue(), "abc"); + expected.getAmqpAnnotatedMessage().getApplicationProperties().put(DEAD_LETTER_REASON_ANNOTATION_NAME.getValue(), "abc"); + expected.setSubject(expectedSubject); + expected.setTo(expectedTo); + expected.setReplyTo(expectedReplyTo); + expected.setReplyToSessionId(expectedReplyToSessionId); + expected.setCorrelationId(expectedCorrelationId); + expected.setDeadLetterSource(expectedDeadLetterSource); + expected.setTimeToLive(expectedTimeToLive); + expected.setPartitionKey(expectedPartitionKey); + + final ServiceBusMessage actual = new ServiceBusMessage(expected); + + // Act + // Modify the values after invoking copy constructor + expected.setSubject("new-subject"); + expected.setTo("new-to"); + expected.setReplyTo("new-reply-to"); + expected.setReplyToSessionId("new-session-id"); + expected.setCorrelationId("new-c-id"); + expected.setTimeToLive(Duration.ofSeconds(40)); + expected.setPartitionKey("new-p-key"); + + // Assert + assertNotSame(expected.getAmqpAnnotatedMessage(), actual.getAmqpAnnotatedMessage()); + + // Validate updated values + assertEquals(expectedSubject, actual.getSubject()); + assertEquals(expectedTo, actual.getTo()); + assertEquals(expectedReplyTo, actual.getReplyTo()); + assertEquals(expectedReplyToSessionId, actual.getReplyToSessionId()); + assertEquals(expectedCorrelationId, actual.getCorrelationId()); + assertEquals(expectedTimeToLive, actual.getTimeToLive()); + assertEquals(expectedPartitionKey, actual.getPartitionKey()); + + // Following values should be reset. + assertNull(actual.getAmqpAnnotatedMessage().getMessageAnnotations().get(LOCKED_UNTIL_KEY_ANNOTATION_NAME.getValue())); + assertNull(actual.getAmqpAnnotatedMessage().getMessageAnnotations().get(SEQUENCE_NUMBER_ANNOTATION_NAME.getValue())); + assertNull(actual.getAmqpAnnotatedMessage().getMessageAnnotations().get(DEAD_LETTER_SOURCE_KEY_ANNOTATION_NAME.getValue())); + assertNull(actual.getAmqpAnnotatedMessage().getMessageAnnotations().get(ENQUEUED_SEQUENCE_NUMBER_ANNOTATION_NAME.getValue())); + assertNull(actual.getAmqpAnnotatedMessage().getMessageAnnotations().get(ENQUEUED_TIME_UTC_ANNOTATION_NAME.getValue())); + + assertNull(actual.getAmqpAnnotatedMessage().getApplicationProperties().get(DEAD_LETTER_DESCRIPTION_ANNOTATION_NAME.getValue())); + assertNull(actual.getAmqpAnnotatedMessage().getApplicationProperties().get(DEAD_LETTER_REASON_ANNOTATION_NAME.getValue())); + assertNull(actual.getAmqpAnnotatedMessage().getHeader().getDeliveryCount()); + } + + + /** + * Verifies we correctly set values via copy constructor for {@link ServiceBusMessage}. + * 1. Ensure modifying original `ServiceBusReceivedMessage` object does not change values of new ServiceBusMessage + * object changes its values. + */ + @Test + public void copyConstructorModifyAfterCopyTest() { + // Arrange + final String expectedSubject = "old-subject"; + final String expectedTo = "old-to"; + final String expectedReplyTo = "old-reply-to"; + final String expectedReplyToSessionId = "old-reply-to-session-id"; + final String expectedCorrelationId = "old-d-id"; + final String expectedDeadLetterSource = "old-d-l-source"; + final Duration expectedTimeToLive = Duration.ofSeconds(20); + final String expectedPartitionKey = "old-p-key"; + + final ServiceBusReceivedMessage originalMessage = new ServiceBusReceivedMessage(PAYLOAD_BYTES); + originalMessage.setSubject(expectedSubject); + originalMessage.setTo(expectedTo); + originalMessage.setReplyTo(expectedReplyTo); + originalMessage.setReplyToSessionId(expectedReplyToSessionId); + originalMessage.setCorrelationId(expectedCorrelationId); + originalMessage.setDeadLetterSource(expectedDeadLetterSource); + originalMessage.setTimeToLive(expectedTimeToLive); + originalMessage.setPartitionKey(expectedPartitionKey); + + final ServiceBusMessage copiedMessage = new ServiceBusMessage(originalMessage); + + // Act + // Modify the values after invoking copy constructor + copiedMessage.setSubject("new-subject"); + copiedMessage.setTo("new-to"); + copiedMessage.setReplyTo("new-reply-to"); + copiedMessage.setReplyToSessionId("new-session-id"); + copiedMessage.setCorrelationId("new-c-id"); + copiedMessage.setTimeToLive(Duration.ofSeconds(40)); + copiedMessage.setPartitionKey("new-p-key"); + + // Assert + // Validate updated values + assertEquals(expectedSubject, originalMessage.getAmqpAnnotatedMessage().getProperties().getSubject()); + assertEquals(expectedTo, originalMessage.getAmqpAnnotatedMessage().getProperties().getTo()); + assertEquals(expectedReplyTo, originalMessage.getAmqpAnnotatedMessage().getProperties().getReplyTo()); + assertEquals(expectedReplyToSessionId, originalMessage.getAmqpAnnotatedMessage().getProperties().getReplyToGroupId()); + assertEquals(expectedCorrelationId, originalMessage.getAmqpAnnotatedMessage().getProperties().getCorrelationId()); + + assertEquals(expectedTimeToLive, originalMessage.getAmqpAnnotatedMessage().getHeader().getTimeToLive()); + + assertEquals(expectedPartitionKey, originalMessage.getAmqpAnnotatedMessage().getMessageAnnotations().get(PARTITION_KEY_ANNOTATION_NAME.getValue())); + } + /** * Verify UTF_8 encoded body is created. */ @@ -48,7 +188,7 @@ void messagePropertiesShouldNotBeNull() { // Assert Assertions.assertNotNull(serviceBusMessageData.getBody()); Assertions.assertNotNull(serviceBusMessageData.getContext()); - Assertions.assertNotNull(serviceBusMessageData.getProperties()); + Assertions.assertNotNull(serviceBusMessageData.getApplicationProperties()); } /** diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceivedMessageTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceivedMessageTest.java index 093c6c189dd2a..6b89faa6e030d 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceivedMessageTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceivedMessageTest.java @@ -3,24 +3,41 @@ package com.azure.messaging.servicebus; -import org.junit.jupiter.api.Assertions; +import com.azure.core.amqp.AmqpMessageConstant; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.message.Message; import org.junit.jupiter.api.Test; import java.time.Duration; - +import java.time.Instant; +import java.util.Date; +import java.util.Map; + +import static com.azure.core.amqp.AmqpMessageConstant.DEAD_LETTER_DESCRIPTION_ANNOTATION_NAME; +import static com.azure.core.amqp.AmqpMessageConstant.DEAD_LETTER_REASON_ANNOTATION_NAME; +import static com.azure.core.amqp.AmqpMessageConstant.DEAD_LETTER_SOURCE_KEY_ANNOTATION_NAME; +import static com.azure.core.amqp.AmqpMessageConstant.ENQUEUED_SEQUENCE_NUMBER_ANNOTATION_NAME; +import static com.azure.core.amqp.AmqpMessageConstant.ENQUEUED_TIME_UTC_ANNOTATION_NAME; +import static com.azure.core.amqp.AmqpMessageConstant.LOCKED_UNTIL_KEY_ANNOTATION_NAME; +import static com.azure.core.amqp.AmqpMessageConstant.SEQUENCE_NUMBER_ANNOTATION_NAME; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class ServiceBusReceivedMessageTest { // Create a giant payload with 10000 characters that are "a". private static final String PAYLOAD = new String(new char[10000]).replace("\0", "a"); private static final byte[] PAYLOAD_BYTES = PAYLOAD.getBytes(UTF_8); - private static final String PAYLOAD_STRING = new String(PAYLOAD_BYTES); @Test public void byteArrayNotNull() { - assertThrows(NullPointerException.class, () -> new ServiceBusReceivedMessage((byte[]) null)); + assertThrows(NullPointerException.class, () -> new ServiceBusReceivedMessage(null)); } @Test @@ -29,8 +46,8 @@ public void messagePropertiesShouldNotBeNull() { final ServiceBusReceivedMessage receivedMessage = new ServiceBusReceivedMessage(PAYLOAD_BYTES); // Assert - Assertions.assertNotNull(receivedMessage.getBody()); - Assertions.assertNotNull(receivedMessage.getProperties()); + assertNotNull(receivedMessage.getBody()); + assertNotNull(receivedMessage.getApplicationProperties()); } @@ -47,8 +64,8 @@ public void canCreateWithEmptyArray() { // Assert final byte[] actual = serviceBusMessageData.getBody(); - Assertions.assertNotNull(actual); - Assertions.assertEquals(0, actual.length); + assertNotNull(actual); + assertEquals(0, actual.length); } /** @@ -60,13 +77,17 @@ public void canCreateWithBytePayload() { final ServiceBusReceivedMessage serviceBusMessageData = new ServiceBusReceivedMessage(PAYLOAD_BYTES); // Assert - Assertions.assertNotNull(serviceBusMessageData.getBody()); - Assertions.assertEquals(PAYLOAD, new String(serviceBusMessageData.getBody(), UTF_8)); + assertNotNull(serviceBusMessageData.getBody()); + assertEquals(PAYLOAD, new String(serviceBusMessageData.getBody(), UTF_8)); } @Test public void toServiceBusMessageTest() { //Arrange + Message amqpMessage = mock(Message.class); + Data data = new Data(new Binary(PAYLOAD_BYTES)); + when(amqpMessage.getBody()).thenReturn(data); + // final ServiceBusReceivedMessage originalMessage = new ServiceBusReceivedMessage(PAYLOAD_BYTES); originalMessage.setMessageId("mid"); originalMessage.setContentType("type"); @@ -75,24 +96,53 @@ public void toServiceBusMessageTest() { originalMessage.setViaPartitionKey("something"); originalMessage.setTimeToLive(Duration.ofSeconds(10)); originalMessage.setReplyToSessionId("rsessionid"); - originalMessage.setLabel("label"); + originalMessage.setSubject("subject"); originalMessage.setTo("to"); + final Map originalMessageAnnotations = originalMessage.getAmqpAnnotatedMessage().getMessageAnnotations(); + originalMessageAnnotations.put(DEAD_LETTER_SOURCE_KEY_ANNOTATION_NAME.getValue(), "message annotations"); + originalMessageAnnotations.put(ENQUEUED_SEQUENCE_NUMBER_ANNOTATION_NAME.getValue(), Long.valueOf(3)); + originalMessageAnnotations.put(LOCKED_UNTIL_KEY_ANNOTATION_NAME.getValue(), new Date(Instant.now().toEpochMilli())); + originalMessageAnnotations.put(ENQUEUED_TIME_UTC_ANNOTATION_NAME.getValue(), new Date(Instant.now().toEpochMilli())); + + originalMessageAnnotations.put(SEQUENCE_NUMBER_ANNOTATION_NAME.getValue(), Long.valueOf(3)); + + final Map originalApplicationProperties = originalMessage.getAmqpAnnotatedMessage().getApplicationProperties(); + originalApplicationProperties.put(DEAD_LETTER_DESCRIPTION_ANNOTATION_NAME.getValue(), "description"); + originalApplicationProperties.put(DEAD_LETTER_REASON_ANNOTATION_NAME.getValue(), "description"); + + originalMessage.getAmqpAnnotatedMessage().getHeader().setDeliveryCount(Long.valueOf(5)); // Act - final ServiceBusMessage messageToSend = new ServiceBusMessage(originalMessage); + final ServiceBusMessage actual = new ServiceBusMessage(originalMessage); // Assert - Assertions.assertNotNull(messageToSend); - Assertions.assertNotNull(messageToSend.getBody()); - Assertions.assertEquals(PAYLOAD, new String(messageToSend.getBody(), UTF_8)); - Assertions.assertEquals(originalMessage.getMessageId(), messageToSend.getMessageId()); - Assertions.assertEquals(originalMessage.getContentType(), messageToSend.getContentType()); - Assertions.assertEquals(originalMessage.getCorrelationId(), messageToSend.getCorrelationId()); - Assertions.assertEquals(originalMessage.getReplyTo(), messageToSend.getReplyTo()); - Assertions.assertEquals(originalMessage.getViaPartitionKey(), messageToSend.getViaPartitionKey()); - Assertions.assertEquals(originalMessage.getTimeToLive().toMillis(), messageToSend.getTimeToLive().toMillis()); - Assertions.assertEquals(originalMessage.getLabel(), messageToSend.getLabel()); - Assertions.assertEquals(originalMessage.getReplyToSessionId(), messageToSend.getReplyToSessionId()); - Assertions.assertEquals(originalMessage.getTo(), messageToSend.getTo()); + assertNotNull(actual); + assertNotNull(actual.getBody()); + assertEquals(PAYLOAD, new String(actual.getBody(), UTF_8)); + assertEquals(originalMessage.getMessageId(), actual.getMessageId()); + assertEquals(originalMessage.getContentType(), actual.getContentType()); + assertEquals(originalMessage.getCorrelationId(), actual.getCorrelationId()); + assertEquals(originalMessage.getReplyTo(), actual.getReplyTo()); + assertEquals(originalMessage.getViaPartitionKey(), actual.getViaPartitionKey()); + assertEquals(originalMessage.getTimeToLive().toMillis(), actual.getTimeToLive().toMillis()); + assertEquals(originalMessage.getLabel(), actual.getSubject()); + assertEquals(originalMessage.getReplyToSessionId(), actual.getReplyToSessionId()); + assertEquals(originalMessage.getTo(), actual.getTo()); + + // Following values should be cleaned up. + assertNullValues(actual.getAmqpAnnotatedMessage().getMessageAnnotations(), DEAD_LETTER_SOURCE_KEY_ANNOTATION_NAME, + ENQUEUED_SEQUENCE_NUMBER_ANNOTATION_NAME, LOCKED_UNTIL_KEY_ANNOTATION_NAME, + SEQUENCE_NUMBER_ANNOTATION_NAME, ENQUEUED_TIME_UTC_ANNOTATION_NAME); + + assertNullValues(actual.getAmqpAnnotatedMessage().getApplicationProperties(), DEAD_LETTER_DESCRIPTION_ANNOTATION_NAME, + DEAD_LETTER_REASON_ANNOTATION_NAME); + + assertNull(actual.getAmqpAnnotatedMessage().getHeader().getDeliveryCount()); + } + + public void assertNullValues(Map dataMap, AmqpMessageConstant... keys) { + for (AmqpMessageConstant key : keys) { + assertNull(dataMap.get(key.getValue())); + } } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java index fddf42cc1e944..11cadd363de10 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java @@ -3,6 +3,11 @@ package com.azure.messaging.servicebus; +import com.azure.core.amqp.models.AmqpAnnotatedMessage; +import com.azure.core.amqp.models.AmqpDataBody; +import com.azure.core.amqp.models.AmqpMessageHeader; +import com.azure.core.amqp.models.AmqpMessageProperties; +import com.azure.core.amqp.models.BinaryData; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.servicebus.administration.models.DeadLetterOptions; import com.azure.messaging.servicebus.implementation.DispositionStatus; @@ -23,6 +28,8 @@ import java.time.OffsetDateTime; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.UUID; @@ -461,7 +468,7 @@ void peekBatchMessages(MessagingEntityType entityType, boolean isSessionEnabled) setSenderAndReceiver(entityType, TestUtils.USE_CASE_PEEK_BATCH_MESSAGES, isSessionEnabled); final BiConsumer checkCorrectMessage = (message, index) -> { - final Map properties = message.getProperties(); + final Map properties = message.getApplicationProperties(); final Object value = properties.get(MESSAGE_POSITION_ID); assertTrue(value instanceof Integer, "Did not contain correct position number: " + value); @@ -794,7 +801,7 @@ void sendReceiveMessageWithVariousPropertyTypes(MessagingEntityType entityType) final String messageId = UUID.randomUUID().toString(); final ServiceBusMessage messageToSend = getMessage(messageId, isSessionEnabled); - Map sentProperties = messageToSend.getProperties(); + Map sentProperties = messageToSend.getApplicationProperties(); sentProperties.put("NullProperty", null); sentProperties.put("BooleanProperty", true); sentProperties.put("ByteProperty", (byte) 1); @@ -815,7 +822,7 @@ void sendReceiveMessageWithVariousPropertyTypes(MessagingEntityType entityType) messagesPending.decrementAndGet(); assertMessageEquals(receivedMessage, messageId, isSessionEnabled); - final Map received = receivedMessage.getMessage().getProperties(); + final Map received = receivedMessage.getMessage().getApplicationProperties(); assertEquals(sentProperties.size(), received.size()); @@ -953,6 +960,133 @@ void renewMessageLock(MessagingEntityType entityType) throws InterruptedExceptio .verify(Duration.ofMinutes(3)); } + /** + * Verifies that we can receive a message which have different section set (i.e header, footer, annotations, + * application properties etc). + */ + @MethodSource("com.azure.messaging.servicebus.IntegrationTestBase#messagingEntityProvider") + @ParameterizedTest + void receiveAndValidateProperties(MessagingEntityType entityType) { + // Arrange + final boolean isSessionEnabled = false; + final String subject = "subject"; + final Map footer = new HashMap<>(); + footer.put("footer-key-1", "footer-value-1"); + footer.put("footer-key-2", "footer-value-2"); + + final Map aplicaitonProperties = new HashMap<>(); + aplicaitonProperties.put("ap-key-1", "ap-value-1"); + aplicaitonProperties.put("ap-key-2", "ap-value-2"); + + final Map deliveryAnnotation = new HashMap<>(); + deliveryAnnotation.put("delivery-annotations-key-1", "delivery-annotations-value-1"); + deliveryAnnotation.put("delivery-annotations-key-2", "delivery-annotations-value-2"); + + setSenderAndReceiver(entityType, TestUtils.USE_CASE_VALIDATE_AMQP_PROPERTIES, isSessionEnabled); + + final String messageId = UUID.randomUUID().toString(); + final AmqpAnnotatedMessage expectedAmqpProperties = new AmqpAnnotatedMessage(new AmqpDataBody(Collections.singletonList(new BinaryData(CONTENTS_BYTES)))); + expectedAmqpProperties.getProperties().setSubject(subject); + expectedAmqpProperties.getProperties().setReplyToGroupId("r-gid"); + expectedAmqpProperties.getProperties().setReplyTo("replyto"); + expectedAmqpProperties.getProperties().setContentType("content-type"); + expectedAmqpProperties.getProperties().setCorrelationId("corelation-id"); + expectedAmqpProperties.getProperties().setTo("to"); + expectedAmqpProperties.getProperties().setAbsoluteExpiryTime(OffsetDateTime.now().plusSeconds(60)); + expectedAmqpProperties.getProperties().setUserId("user-id-1".getBytes()); + expectedAmqpProperties.getProperties().setContentEncoding("string"); + expectedAmqpProperties.getProperties().setGroupSequence(Long.valueOf(2)); + expectedAmqpProperties.getProperties().setCreationTime(OffsetDateTime.now().plusSeconds(30)); + + expectedAmqpProperties.getHeader().setPriority(Short.valueOf((short) 2)); + expectedAmqpProperties.getHeader().setFirstAcquirer(true); + expectedAmqpProperties.getHeader().setDurable(true); + + expectedAmqpProperties.getFooter().putAll(footer); + expectedAmqpProperties.getDeliveryAnnotations().putAll(deliveryAnnotation); + expectedAmqpProperties.getApplicationProperties().putAll(aplicaitonProperties); + + final ServiceBusMessage message = TestUtils.getServiceBusMessage(CONTENTS_BYTES, messageId); + + final AmqpAnnotatedMessage amqpAnnotatedMessage = message.getAmqpAnnotatedMessage(); + amqpAnnotatedMessage.getMessageAnnotations().putAll(expectedAmqpProperties.getMessageAnnotations()); + amqpAnnotatedMessage.getApplicationProperties().putAll(expectedAmqpProperties.getApplicationProperties()); + amqpAnnotatedMessage.getDeliveryAnnotations().putAll(expectedAmqpProperties.getDeliveryAnnotations()); + amqpAnnotatedMessage.getFooter().putAll(expectedAmqpProperties.getFooter()); + + final AmqpMessageHeader header = amqpAnnotatedMessage.getHeader(); + header.setFirstAcquirer(expectedAmqpProperties.getHeader().isFirstAcquirer()); + header.setTimeToLive(expectedAmqpProperties.getHeader().getTimeToLive()); + header.setDurable(expectedAmqpProperties.getHeader().isDurable()); + header.setDeliveryCount(expectedAmqpProperties.getHeader().getDeliveryCount()); + header.setPriority(expectedAmqpProperties.getHeader().getPriority()); + + final AmqpMessageProperties amqpMessageProperties = amqpAnnotatedMessage.getProperties(); + amqpMessageProperties.setReplyTo((expectedAmqpProperties.getProperties().getReplyTo())); + amqpMessageProperties.setContentEncoding((expectedAmqpProperties.getProperties().getContentEncoding())); + amqpMessageProperties.setAbsoluteExpiryTime((expectedAmqpProperties.getProperties().getAbsoluteExpiryTime())); + amqpMessageProperties.setSubject((expectedAmqpProperties.getProperties().getSubject())); + amqpMessageProperties.setContentType(expectedAmqpProperties.getProperties().getContentType()); + amqpMessageProperties.setCorrelationId(expectedAmqpProperties.getProperties().getCorrelationId()); + amqpMessageProperties.setTo(expectedAmqpProperties.getProperties().getTo()); + amqpMessageProperties.setGroupSequence(expectedAmqpProperties.getProperties().getGroupSequence()); + amqpMessageProperties.setUserId(expectedAmqpProperties.getProperties().getUserId()); + amqpMessageProperties.setAbsoluteExpiryTime(expectedAmqpProperties.getProperties().getAbsoluteExpiryTime()); + amqpMessageProperties.setCreationTime(expectedAmqpProperties.getProperties().getCreationTime()); + amqpMessageProperties.setReplyToGroupId(expectedAmqpProperties.getProperties().getReplyToGroupId()); + + // Send the message + sendMessage(message).block(TIMEOUT); + + StepVerifier.create(receiver.receiveMessages().map(ServiceBusReceivedMessageContext::getMessage)) + .assertNext(received -> { + assertNotNull(received.getLockToken()); + AmqpAnnotatedMessage actual = received.getAmqpAnnotatedMessage(); + try { + assertArrayEquals(CONTENTS_BYTES, message.getBody()); + assertEquals(expectedAmqpProperties.getHeader().getPriority(), actual.getHeader().getPriority()); + assertEquals(expectedAmqpProperties.getHeader().isFirstAcquirer(), actual.getHeader().isFirstAcquirer()); + assertEquals(expectedAmqpProperties.getHeader().isDurable(), actual.getHeader().isDurable()); + + assertEquals(expectedAmqpProperties.getProperties().getSubject(), actual.getProperties().getSubject()); + assertEquals(expectedAmqpProperties.getProperties().getReplyToGroupId(), actual.getProperties().getReplyToGroupId()); + assertEquals(expectedAmqpProperties.getProperties().getReplyTo(), actual.getProperties().getReplyTo()); + assertEquals(expectedAmqpProperties.getProperties().getContentType(), actual.getProperties().getContentType()); + assertEquals(expectedAmqpProperties.getProperties().getCorrelationId(), actual.getProperties().getCorrelationId()); + assertEquals(expectedAmqpProperties.getProperties().getTo(), actual.getProperties().getTo()); + assertEquals(expectedAmqpProperties.getProperties().getAbsoluteExpiryTime().toEpochSecond(), actual.getProperties().getAbsoluteExpiryTime().toEpochSecond()); + assertEquals(expectedAmqpProperties.getProperties().getSubject(), actual.getProperties().getSubject()); + assertEquals(expectedAmqpProperties.getProperties().getContentEncoding(), actual.getProperties().getContentEncoding()); + assertEquals(expectedAmqpProperties.getProperties().getGroupSequence(), actual.getProperties().getGroupSequence()); + assertEquals(expectedAmqpProperties.getProperties().getCreationTime().toEpochSecond(), actual.getProperties().getCreationTime().toEpochSecond()); + assertArrayEquals(expectedAmqpProperties.getProperties().getUserId(), actual.getProperties().getUserId()); + + assertMapValues(expectedAmqpProperties.getDeliveryAnnotations(), actual.getDeliveryAnnotations()); + assertMapValues(expectedAmqpProperties.getMessageAnnotations(), actual.getMessageAnnotations()); + assertMapValues(expectedAmqpProperties.getApplicationProperties(), actual.getApplicationProperties()); + assertMapValues(expectedAmqpProperties.getFooter(), actual.getFooter()); + } finally { + logger.info("Completing message."); + receiver.complete(received).block(Duration.ofSeconds(15)); + messagesPending.decrementAndGet(); + } + }) + .thenCancel() + .verify(Duration.ofMinutes(2)); + } + + /** + * Asserts the length and values with in the map. + */ + private void assertMapValues(Map expectedMap, Map actualMap) { + assertTrue(actualMap.size() >= expectedMap.size()); + Iterator expectedKeys = expectedMap.keySet().iterator(); + while (expectedKeys.hasNext()) { + String key = expectedKeys.next(); + assertEquals(expectedMap.get(key), actualMap.get(key), "Value is not equal for Key " + key); + } + } + /** * Sets the sender and receiver. If session is enabled, then a single-named session receiver is created. */ @@ -1046,6 +1180,5 @@ private ServiceBusClientBuilder.ServiceBusReceiverClientBuilder getDeadLetterRec default: throw logger.logExceptionAsError(new IllegalArgumentException("Unknown entity type: " + entityType)); } - } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientIntegrationTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientIntegrationTest.java index 3875224e2af46..3ca0041e90ffc 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientIntegrationTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientIntegrationTest.java @@ -35,7 +35,7 @@ import static org.junit.jupiter.api.Assertions.fail; /** - * Integration tests for {@link ServiceBusReceiverClient} from queues or subscriptions. + * Integration tests for {@link com.azure.messaging.servicebus.ServiceBusReceiverClient} from queues or subscriptions. */ @Tag("integration") class ServiceBusReceiverClientIntegrationTest extends IntegrationTestBase { @@ -739,7 +739,7 @@ void sendReceiveMessageWithVariousPropertyTypes(MessagingEntityType entityType, final ServiceBusMessage messageToSend = getMessage(messageId, isSessionEnabled); final int maxMessages = 1; - Map sentProperties = messageToSend.getProperties(); + Map sentProperties = messageToSend.getApplicationProperties(); sentProperties.put("NullProperty", null); sentProperties.put("BooleanProperty", true); sentProperties.put("ByteProperty", (byte) 1); @@ -770,7 +770,7 @@ void sendReceiveMessageWithVariousPropertyTypes(MessagingEntityType entityType, messagesPending.decrementAndGet(); assertMessageEquals(receivedMessage, messageId, isSessionEnabled); - final Map received = receivedMessage.getProperties(); + final Map received = receivedMessage.getApplicationProperties(); assertEquals(sentProperties.size(), received.size()); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java index c7dc1b1461266..3d5384d5376b3 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java @@ -235,7 +235,7 @@ void createsMessageBatchWithSize() { int batchSize = 1024; // Overhead when serializing an event, to figure out what the maximum size we can use for an event payload. - int eventOverhead = 46; + int eventOverhead = 75; int maxEventPayload = batchSize - eventOverhead; final AmqpSendLink link = mock(AmqpSendLink.class); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TestUtils.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TestUtils.java index 8c46f3675c181..10c5c1c031fb1 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TestUtils.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TestUtils.java @@ -58,6 +58,7 @@ public class TestUtils { static final int USE_CASE_SEND_VIA_QUEUE_2 = 14; static final int USE_CASE_SEND_VIA_TOPIC_1 = 15; static final int USE_CASE_SEND_VIA_TOPIC_2 = 16; + static final int USE_CASE_VALIDATE_AMQP_PROPERTIES = 17; // An application property key to identify where in the stream this message was created. static final String MESSAGE_POSITION_ID = "message-position"; @@ -201,7 +202,7 @@ public static List getServiceBusMessages(int numberOfEvents, return IntStream.range(0, numberOfEvents) .mapToObj(number -> { final ServiceBusMessage message = getServiceBusMessage(content, messageId); - message.getProperties().put(MESSAGE_POSITION_ID, number); + message.getApplicationProperties().put(MESSAGE_POSITION_ID, number); return message; }) @@ -221,7 +222,7 @@ public static List getServiceBusMessages(int numberOfEvents, return IntStream.range(0, numberOfEvents) .mapToObj(number -> { final ServiceBusMessage message = getServiceBusMessage("Event " + number, messageId); - message.getProperties().put(MESSAGE_POSITION_ID, number); + message.getApplicationProperties().put(MESSAGE_POSITION_ID, number); return message; }) diff --git a/sdk/servicebus/test-resources.json b/sdk/servicebus/test-resources.json index ee1c184187f9b..97672339014ff 100644 --- a/sdk/servicebus/test-resources.json +++ b/sdk/servicebus/test-resources.json @@ -56,7 +56,7 @@ "namespaceDomainNameSuffix": "servicebus.windows.net", "queueName": "queue", "queueSessionName": "queue-session", - "numberOfInstances": 17, + "numberOfInstances": 20, "subscriptionName": "subscription", "subscriptionSessionName": "subscription-session", "serviceBusDataOwnerRoleId": "[concat('/subscriptions/', subscription().subscriptionId, '/providers/Microsoft.Authorization/roleDefinitions/090c5cfd-751d-490a-894a-3ce6f1109419')]",