Skip to content

Commit

Permalink
SB Track2: Expose AMQP details ServiceBus Messages for sending and re…
Browse files Browse the repository at this point in the history
…ceiving. (Azure#14848)

Expose AMQP details ServiceBus Messages for sending and receiving.
  • Loading branch information
hemanttanwar authored and conniey committed Sep 11, 2020
1 parent 1a4077f commit efdeaa7
Show file tree
Hide file tree
Showing 31 changed files with 1,824 additions and 291 deletions.
1 change: 1 addition & 0 deletions sdk/core/azure-core-amqp/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Release History

## 1.5.0-beta.1 (Unreleased)
- Added Amqp Message envelope which can be accessed using `AmqpAnnotatedMessage`.

## 1.4.0 (2020-08-11)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,36 @@ public enum AmqpMessageConstant {
/**
* The name of the entity that published a message.
*/
PUBLISHER_ANNOTATION_NAME("x-opt-publisher");
PUBLISHER_ANNOTATION_NAME("x-opt-publisher"),
/**
* The name representing scheduled enqueue time.
*/
SCHEDULED_ENQUEUE_UTC_TIME_NAME("x-opt-scheduled-enqueue-time"),
/**
* The identifier associated with a given via-partition.
*/
VIA_PARTITION_KEY_ANNOTATION_NAME("x-opt-via-partition-key"),
/**
* The identifier for locked until.
*/
LOCKED_UNTIL_KEY_ANNOTATION_NAME("x-opt-locked-until"),
/**
* The identifier for deadletter source.
*/
DEAD_LETTER_SOURCE_KEY_ANNOTATION_NAME("x-opt-deadletter-source"),
/**
* The name representing enqueue sequence number.
* This one appears to always be 0, but is always returned with each message.
*/
ENQUEUED_SEQUENCE_NUMBER_ANNOTATION_NAME("x-opt-enqueue-sequence-number"),
/**
* The identifier for deadletter description.
*/
DEAD_LETTER_DESCRIPTION_ANNOTATION_NAME("DeadLetterErrorDescription"),
/**
* The identifier for deadletter reason.
*/
DEAD_LETTER_REASON_ANNOTATION_NAME("DeadLetterReason");

private static final Map<String, AmqpMessageConstant> RESERVED_CONSTANTS_MAP = new HashMap<>();
private final String constant;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp.models;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
* The representation of message as defined by AMQP protocol.
*
* @see <a href="http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format">
* Amqp Message Format.</a>
*/
public final class AmqpAnnotatedMessage {
private final AmqpMessageBody amqpMessageBody;
private final Map<String, Object> applicationProperties;
private final Map<String, Object> deliveryAnnotations;
private final Map<String, Object> messageAnnotations;
private final Map<String, Object> footer;
private final AmqpMessageHeader header;
private final AmqpMessageProperties properties;

/**
* Creates instance of {@link AmqpAnnotatedMessage} with given {@link AmqpMessageBody}.
*
* @param body to be set on amqp message.
*
* @throws NullPointerException if {@code body} is null.
*/
public AmqpAnnotatedMessage(AmqpMessageBody body) {
amqpMessageBody = Objects.requireNonNull(body, "'body' cannot be null.");

applicationProperties = new HashMap<>();
deliveryAnnotations = new HashMap<>();
messageAnnotations = new HashMap<>();
footer = new HashMap<>();
header = new AmqpMessageHeader();
properties = new AmqpMessageProperties();
}

/**
* Creates instance of {@link AmqpAnnotatedMessage} with given {@link AmqpAnnotatedMessage} instance.
*
* @param message used to create another instance of {@link AmqpAnnotatedMessage}.
*
* @throws NullPointerException if {@code message} or {@link AmqpAnnotatedMessage#getBody() body} is null.
*/
public AmqpAnnotatedMessage(AmqpAnnotatedMessage message) {
Objects.requireNonNull(message, "'message' cannot be null.");
amqpMessageBody = Objects.requireNonNull(message.getBody(), "'message.body' cannot be null.");
applicationProperties = new HashMap<>(message.getApplicationProperties());
deliveryAnnotations = new HashMap<>(message.getDeliveryAnnotations());
messageAnnotations = new HashMap<>(message.getMessageAnnotations());
footer = new HashMap<>(message.getFooter());
header = new AmqpMessageHeader(message.getHeader());
properties = new AmqpMessageProperties(message.getProperties());
}

/**
* Gets the {@link Map} of application properties.
*
* @return The application properties.
*/
public Map<String, Object> getApplicationProperties() {
return applicationProperties;
}

/**
* Gets the {@link AmqpMessageBody} of an amqp message.
*
* @return the {@link AmqpMessageBody} object.
*/
public AmqpMessageBody getBody() {
return amqpMessageBody;
}

/**
* Gets the {@link Map} representation of delivery annotations defined on an amqp message.
*
* @return the {@link Map} representation of delivery annotations.
*/
public Map<String, Object> getDeliveryAnnotations() {
return deliveryAnnotations;
}

/**
* Gets the {@link Map} representation of footer defined on an amqp message.
*
* @return the {@link Map} representation of footer.
*/
public Map<String, Object> getFooter() {
return footer;
}

/**
* Gets the {@link AmqpMessageHeader} defined on an amqp message.
*
* @return the {@link AmqpMessageHeader} object.
*/
public AmqpMessageHeader getHeader() {
return header;
}

/**
* Gets the {@link Map} representation of message annotations defined on an amqp message.
*
* @return the {@link Map} representation of message annotations.
*/
public Map<String, Object> getMessageAnnotations() {
return messageAnnotations;
}

/**
* Gets the {@link AmqpMessageProperties} defined on an amqp message.
*
* @return the {@link AmqpMessageProperties} object.
*/
public AmqpMessageProperties getProperties() {
return properties;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp.models;

/**
* All AmqpBodyType available for AMQP Message.
*/
public enum AmqpBodyType {
/**
* Represent Amqp Data type
*/
DATA,
/**
* Represent Amqp Value type
*/
VALUE,
/**
* Represent Amqp Sequence type
*/
SEQUENCE;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp.models;

import com.azure.core.util.IterableStream;

import java.util.Objects;

/**
* This is amqp message body which represents {@link AmqpBodyType#DATA} type.
*/
public final class AmqpDataBody implements AmqpMessageBody {
private final IterableStream<BinaryData> data;

/**
* Creates instance of {@link AmqpDataBody} with given {@link Iterable} of {@link BinaryData}.
*
* @param data to be set on amqp body.
*
* @throws NullPointerException if {@code data} is null.
*/
public AmqpDataBody(Iterable<BinaryData> data) {
Objects.requireNonNull(data, "'data' cannot be null.");
this.data = new IterableStream<>(data);
}

@Override
public AmqpBodyType getBodyType() {
return AmqpBodyType.DATA;
}

/**
* Gets {@link BinaryData} set on this {@link AmqpDataBody}.
*
* @return data set on {@link AmqpDataBody}.
*/
public IterableStream<BinaryData> getData() {
return data;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp.models;

/**
* Interface representing Amqp Message Body.
*/
public interface AmqpMessageBody {
/**
* Type representing various supported amqp body types.
*
* @return The {@link AmqpBodyType}.
*/
AmqpBodyType getBodyType();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp.models;

import com.azure.core.annotation.Fluent;

import java.time.Duration;
import java.util.Objects;

/**
* The representation of message header as defined by AMQP protocol.
* @see <a href="http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format">
* Amqp Message Format.</a>
*/
@Fluent
public class AmqpMessageHeader {

private Long deliveryCount;
private Boolean durable;
private Boolean firstAcquirer;
private Short priority;
private Duration timeToLive;

AmqpMessageHeader() {
// This class does not have any public constructors, and is not able to be instantiated using 'new'.
}

/**
* The constructor is used to clone the values.
*/
AmqpMessageHeader(AmqpMessageHeader header) {
super();
Objects.requireNonNull(header, "'header' cannot be null.");
deliveryCount = header.getDeliveryCount();
durable = header.isDurable();
firstAcquirer = header.isFirstAcquirer();
timeToLive = header.getTimeToLive();
priority = header.getPriority();
}

/**
* Gets the delivery count from amqp message header.
*
* @return the delivery count value.
*/
public Long getDeliveryCount() {
return deliveryCount;
}

/**
* Sets the given {@code deliveryCount} value on {@link AmqpMessageHeader} object.
* @param deliveryCount to be set.
*
* @return updated {@link AmqpMessageHeader} object.
*/
public AmqpMessageHeader setDeliveryCount(Long deliveryCount) {
this.deliveryCount = deliveryCount;
return this;
}

/**
* Gets durable boolean flag from amqp message header.
* @return the durable flag.
*/
public Boolean isDurable() {
return durable;
}

/**
* Sets the given {@code durable} value on {@link AmqpMessageHeader} object.
* @param durable to set on {@link AmqpMessageHeader}.
*
* @return updated {@link AmqpMessageHeader} object.
*/
public AmqpMessageHeader setDurable(Boolean durable) {
this.durable = durable;
return this;
}

/**
* Gets boolean flag for {@code firstAcquirer} from amqp message header.
* @return the {@code firstAcquirer} value.
*/
public Boolean isFirstAcquirer() {
return this.firstAcquirer;
}

/**
* Sets the given {@code firstAcquirer} value on {@link AmqpMessageHeader} object.
* @param firstAcquirer to set on {@link AmqpMessageHeader}.
*
* @return updated {@link AmqpMessageHeader} object.
*/
public AmqpMessageHeader setFirstAcquirer(Boolean firstAcquirer) {
this.firstAcquirer = firstAcquirer;
return this;
}

/**
* Gets the priority on {@code amqpMessage} from amqp message header.
* @return the {@code priority} value.
*/
public Short getPriority() {
return priority;
}

/**
* Sets the given {@code priority} value on {@link AmqpMessageHeader} object.
* @param priority to set on {@link AmqpMessageHeader}.
*
* @return updated {@link AmqpMessageHeader} object.
*/
public AmqpMessageHeader setPriority(Short priority) {
this.priority = priority;
return this;
}

/**
* Gets {@code timeToLive} from amqp message header.
*
* @return the {@code timeToLive} value.
*/
public Duration getTimeToLive() {
return timeToLive;
}

/**
* Sets the given {@code timeToLive} value on {@link AmqpMessageHeader} object.
* @param timeToLive to set on {@link AmqpMessageHeader}.
*
* @return updated {@link AmqpMessageHeader} object.
*/
public AmqpMessageHeader setTimeToLive(Duration timeToLive) {
this.timeToLive = timeToLive;
return this;
}
}
Loading

0 comments on commit efdeaa7

Please sign in to comment.