Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SB Track2: Expose AMQP details ServiceBus Messages for sending and receiving. #14848

Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp.models;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
* The representation of message as defined by AMQP protocol.
*
* @see <a href="http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format">
* Amqp Message Format.</a>
*/
public final class AmqpAnnotatedMessage {
private final AmqpMessageBody amqpMessageBody;
private final Map<String, Object> applicationProperties;
private final Map<String, Object> deliveryAnnotations;
private final Map<String, Object> messageAnnotations;
private final Map<String, Object> footer;
private final AmqpMessageHeader header;
private final AmqpMessageProperties properties;

/**
* Creates instance of {@link AmqpAnnotatedMessage} with given {@link AmqpMessageBody}.
*
* @param body to be set on amqp message.
*
* @throws NullPointerException if {@code body} is null.
*/
public AmqpAnnotatedMessage(AmqpMessageBody body) {
amqpMessageBody = Objects.requireNonNull(body, "'body' cannot be null.");

applicationProperties = new HashMap<>();
deliveryAnnotations = new HashMap<>();
messageAnnotations = new HashMap<>();
footer = new HashMap<>();
header = new AmqpMessageHeader();
properties = new AmqpMessageProperties();
}

/**
* Creates instance of {@link AmqpAnnotatedMessage} with given {@link AmqpAnnotatedMessage} instance.
*
* @param message used to create another instance of {@link AmqpAnnotatedMessage}.
*
* @throws NullPointerException if {@code message} or {@link AmqpAnnotatedMessage#getBody() body} is null.
*/
public AmqpAnnotatedMessage(AmqpAnnotatedMessage message) {
Objects.requireNonNull(message, "'message' cannot be null.");
amqpMessageBody = Objects.requireNonNull(message.getBody(), "'message.body' cannot be null.");

applicationProperties = message.getApplicationProperties();
deliveryAnnotations = message.getDeliveryAnnotations();
messageAnnotations = message.getMessageAnnotations();
header = message.getHeader();
properties = message.getProperties();
footer = message.getFooter();
}

/**
* Gets the {@link Map} of application properties.
* @return The application properties.
*/
public Map<String, Object> getApplicationProperties() {
return applicationProperties;
}

/**
* Gets the {@link AmqpMessageBody} of an amqp message.
*
* @return the {@link AmqpMessageBody} object.
*/
public AmqpMessageBody getBody() {
return amqpMessageBody;
}

/**
* Gets the {@link Map} representation of delivery annotations defined on an amqp message.
*
* @return the {@link Map} representation of delivery annotations.
*/
public Map<String, Object> getDeliveryAnnotations() {
return deliveryAnnotations;
}

/**
* Gets the {@link Map} representation of footer defined on an amqp message.
*
* @return the {@link Map} representation of footer.
*/
public Map<String, Object> getFooter() {
return footer;
}

/**
* Gets the {@link AmqpMessageHeader} defined on an amqp message.
*
* @return the {@link AmqpMessageHeader} object.
*/
public AmqpMessageHeader getHeader() {
return header;
}

/**
* Gets the {@link Map} representation of message annotations defined on an amqp message.
*
* @return the {@link Map} representation of message annotations.
*/
public Map<String, Object> getMessageAnnotations() {
return messageAnnotations;
}

/**
* Gets the {@link AmqpMessageProperties} defined on an amqp message.
*
* @return the {@link AmqpMessageProperties} object.
*/
public AmqpMessageProperties getProperties() {
return properties;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp.models;

/**
* All AmqpBodyType available for AMQP Message.
*/
public enum AmqpBodyType {
/**
* Represent Amqp Data type
*/
DATA,
/**
* Represent Amqp Value type
*/
VALUE,
/**
* Represent Amqp Sequence type
*/
SEQUENCE;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp.models;

import com.azure.core.util.IterableStream;

import java.util.Objects;

/**
* This is amqp message body which represents {@link AmqpBodyType#DATA} type.
*/
public final class AmqpDataBody implements AmqpMessageBody {
hemanttanwar marked this conversation as resolved.
Show resolved Hide resolved
private final IterableStream<BinaryData> data;

/**
* Creates instance of {@link AmqpDataBody} with given {@link Iterable} of {@link BinaryData}.
*
* @param data to be set on amqp body.
*
* @throws NullPointerException if {@code data} is null.
*/
public AmqpDataBody(Iterable<BinaryData> data) {
Objects.requireNonNull(data, "'data' cannot be null.");
this.data = new IterableStream<>(data);
}

@Override
public AmqpBodyType getBodyType() {
return AmqpBodyType.DATA;
}

/**
* Gets {@link BinaryData} set on this {@link AmqpDataBody}.
* @return data set on {@link AmqpDataBody}.
*/
public IterableStream<BinaryData> getData() {
return data;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp.models;

/**
* Interface representing Amqp Message Body.
*/
public interface AmqpMessageBody {
/**
* Type representing various supported amqp body types.
*
* @return The {@link AmqpBodyType}.
*/
AmqpBodyType getBodyType();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp.models;

import com.azure.core.annotation.Fluent;

import java.time.Duration;

/**
* The representation of message header as defined by AMQP protocol.
*
* @see <a href="http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format">
* Amqp Message Format.</a>
*/
@Fluent
public class AmqpMessageHeader {

private Long deliveryCount;
private Boolean durable;
private Boolean firstAcquirer;
private Short priority;
private Duration timeToLive;

AmqpMessageHeader() {
// This class does not have any public constructors, and is not able to be instantiated using 'new'.
}

/**
* Gets the delivery count from amqp message header.
*
* @return the delivery count value.
*/
public Long getDeliveryCount() {
return deliveryCount;
}

/**
* Sets the given {@code deliveryCount} value on {@link AmqpMessageHeader} object.
*
* @param deliveryCount to be set.
* @return updated {@link AmqpMessageHeader} object.
*/
public AmqpMessageHeader setDeliveryCount(Long deliveryCount) {
this.deliveryCount = deliveryCount;
return this;
}

/**
* Gets durable boolean flag from amqp message header.
*
* @return the durable flag.
*/
public Boolean isDurable() {
return durable;
}

/**
* Sets the given {@code durable} value on {@link AmqpMessageHeader} object.
*
* @param durable to set on {@link AmqpMessageHeader}.
*
* @return updated {@link AmqpMessageHeader} object.
*/
public AmqpMessageHeader setDurable(Boolean durable) {
this.durable = durable;
return this;
}

/**
* Gets boolean flag for {@code firstAcquirer} from amqp message header.
*
* @return the {@code firstAcquirer} value.
*/
public Boolean isFirstAcquirer() {
return this.firstAcquirer;
}

/**
* Sets the given {@code firstAcquirer} value on {@link AmqpMessageHeader} object.
*
* @param firstAcquirer to set on {@link AmqpMessageHeader}.
*
* @return updated {@link AmqpMessageHeader} object.
*/
public AmqpMessageHeader setFirstAcquirer(Boolean firstAcquirer) {
this.firstAcquirer = firstAcquirer;
return this;
}

/**
* Gets the priority on {@code amqpMessage} from amqp message header.
*
* @return the {@code priority} value.
*/
public Short getPriority() {
return priority;
}

/**
* Sets the given {@code priority} value on {@link AmqpMessageHeader} object.
*
* @param priority to set on {@link AmqpMessageHeader}.
*
* @return updated {@link AmqpMessageHeader} object.
*/
public AmqpMessageHeader setPriority(short priority) {
hemanttanwar marked this conversation as resolved.
Show resolved Hide resolved
this.priority = priority;
return this;
}

/**
* Gets {@code timeToLive} from amqp message header.
*
* @return the {@code timeToLive} value.
*/
public Duration getTimeToLive() {
return timeToLive;
}

/**
* Sets the given {@code timeToLive} value on {@link AmqpMessageHeader} object.
*
* @param timeToLive to set on {@link AmqpMessageHeader}.
*
* @return updated {@link AmqpMessageHeader} object.
*/
public AmqpMessageHeader setTimeToLive(Duration timeToLive) {
this.timeToLive = timeToLive;
return this;
}
}
Loading