Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename getAmqpAnnotatedMessage to getRawAmqpMessage #17712

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,11 @@ public ServiceBusMessage(BinaryData body) {
public ServiceBusMessage(ServiceBusReceivedMessage receivedMessage) {
Objects.requireNonNull(receivedMessage, "'receivedMessage' cannot be null.");

final AmqpMessageBodyType bodyType = receivedMessage.getAmqpAnnotatedMessage().getBody().getBodyType();
final AmqpMessageBodyType bodyType = receivedMessage.getRawAmqpMessage().getBody().getBodyType();
AmqpMessageBody amqpMessageBody;
switch (bodyType) {
case DATA:
amqpMessageBody = AmqpMessageBody.fromData(receivedMessage.getAmqpAnnotatedMessage().getBody()
amqpMessageBody = AmqpMessageBody.fromData(receivedMessage.getRawAmqpMessage().getBody()
.getFirstData());
break;
case SEQUENCE:
Expand All @@ -136,7 +136,7 @@ public ServiceBusMessage(ServiceBusReceivedMessage receivedMessage) {
this.amqpAnnotatedMessage = new AmqpAnnotatedMessage(amqpMessageBody);

// set properties
final AmqpMessageProperties receivedProperties = receivedMessage.getAmqpAnnotatedMessage().getProperties();
final AmqpMessageProperties receivedProperties = receivedMessage.getRawAmqpMessage().getProperties();
final AmqpMessageProperties newProperties = amqpAnnotatedMessage.getProperties();
newProperties.setMessageId(receivedProperties.getMessageId());
newProperties.setUserId(receivedProperties.getUserId());
Expand All @@ -153,15 +153,15 @@ public ServiceBusMessage(ServiceBusReceivedMessage receivedMessage) {
newProperties.setReplyToGroupId(receivedProperties.getReplyToGroupId());

// copy header except for delivery count which should be set to null
final AmqpMessageHeader receivedHeader = receivedMessage.getAmqpAnnotatedMessage().getHeader();
final AmqpMessageHeader receivedHeader = receivedMessage.getRawAmqpMessage().getHeader();
final AmqpMessageHeader newHeader = this.amqpAnnotatedMessage.getHeader();
newHeader.setPriority(receivedHeader.getPriority());
newHeader.setTimeToLive(receivedHeader.getTimeToLive());
newHeader.setDurable(receivedHeader.isDurable());
newHeader.setFirstAcquirer(receivedHeader.isFirstAcquirer());

// copy message annotations except for broker set ones
final Map<String, Object> receivedAnnotations = receivedMessage.getAmqpAnnotatedMessage()
final Map<String, Object> receivedAnnotations = receivedMessage.getRawAmqpMessage()
.getMessageAnnotations();
final Map<String, Object> newAnnotations = this.amqpAnnotatedMessage.getMessageAnnotations();

Expand All @@ -178,23 +178,23 @@ public ServiceBusMessage(ServiceBusReceivedMessage receivedMessage) {
}

// copy delivery annotations
final Map<String, Object> receivedDelivery = receivedMessage.getAmqpAnnotatedMessage().getDeliveryAnnotations();
final Map<String, Object> receivedDelivery = receivedMessage.getRawAmqpMessage().getDeliveryAnnotations();
final Map<String, Object> newDelivery = this.amqpAnnotatedMessage.getDeliveryAnnotations();

for (Map.Entry<String, Object> entry: receivedDelivery.entrySet()) {
newDelivery.put(entry.getKey(), entry.getValue());
}

// copy Footer
final Map<String, Object> receivedFooter = receivedMessage.getAmqpAnnotatedMessage().getFooter();
final Map<String, Object> receivedFooter = receivedMessage.getRawAmqpMessage().getFooter();
final Map<String, Object> newFooter = this.amqpAnnotatedMessage.getFooter();

for (Map.Entry<String, Object> entry: receivedFooter.entrySet()) {
newFooter.put(entry.getKey(), entry.getValue());
}

// copy application properties except for broker set ones
final Map<String, Object> receivedApplicationProperties = receivedMessage.getAmqpAnnotatedMessage()
final Map<String, Object> receivedApplicationProperties = receivedMessage.getRawAmqpMessage()
.getApplicationProperties();
final Map<String, Object> newApplicationProperties = this.amqpAnnotatedMessage.getApplicationProperties();

Expand All @@ -215,7 +215,7 @@ public ServiceBusMessage(ServiceBusReceivedMessage receivedMessage) {
*
* @return the amqp message.
*/
public AmqpAnnotatedMessage getAmqpAnnotatedMessage() {
public AmqpAnnotatedMessage getRawAmqpMessage() {
return amqpAnnotatedMessage;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public <T> Message serialize(T object) {
amqpMessage.setReplyToGroupId(brokeredMessage.getReplyToSessionId());
amqpMessage.setGroupId(brokeredMessage.getSessionId());

final AmqpMessageProperties brokeredProperties = brokeredMessage.getAmqpAnnotatedMessage().getProperties();
final AmqpMessageProperties brokeredProperties = brokeredMessage.getRawAmqpMessage().getProperties();

amqpMessage.setContentEncoding(brokeredProperties.getContentEncoding());
if (brokeredProperties.getGroupSequence() != null) {
Expand All @@ -166,10 +166,10 @@ public <T> Message serialize(T object) {
}

//set footer
amqpMessage.setFooter(new Footer(brokeredMessage.getAmqpAnnotatedMessage().getFooter()));
amqpMessage.setFooter(new Footer(brokeredMessage.getRawAmqpMessage().getFooter()));

//set header
AmqpMessageHeader header = brokeredMessage.getAmqpAnnotatedMessage().getHeader();
AmqpMessageHeader header = brokeredMessage.getRawAmqpMessage().getHeader();
if (header.getDeliveryCount() != null) {
amqpMessage.setDeliveryCount(header.getDeliveryCount());
}
Expand Down Expand Up @@ -203,7 +203,7 @@ public <T> Message serialize(T object) {
// Set Delivery Annotations.
final Map<Symbol, Object> deliveryAnnotationsMap = new HashMap<>();

final Map<String, Object> deliveryAnnotations = brokeredMessage.getAmqpAnnotatedMessage()
final Map<String, Object> deliveryAnnotations = brokeredMessage.getRawAmqpMessage()
.getDeliveryAnnotations();
for (Map.Entry<String, Object> deliveryEntry : deliveryAnnotations.entrySet()) {
deliveryAnnotationsMap.put(Symbol.valueOf(deliveryEntry.getKey()), deliveryEntry.getValue());
Expand Down Expand Up @@ -355,7 +355,7 @@ private ServiceBusReceivedMessage deserializeMessage(Message amqpMessage) {
bytes = EMPTY_BYTE_ARRAY;
}
final ServiceBusReceivedMessage brokeredMessage = new ServiceBusReceivedMessage(BinaryData.fromBytes(bytes));
AmqpAnnotatedMessage brokeredAmqpAnnotatedMessage = brokeredMessage.getAmqpAnnotatedMessage();
AmqpAnnotatedMessage brokeredAmqpAnnotatedMessage = brokeredMessage.getRawAmqpMessage();

// Application properties
ApplicationProperties applicationProperties = amqpMessage.getApplicationProperties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public final class ServiceBusReceivedMessage {
*
* @return the {@link AmqpAnnotatedMessage} representing amqp message.
*/
public AmqpAnnotatedMessage getAmqpAnnotatedMessage() {
public AmqpAnnotatedMessage getRawAmqpMessage() {
return amqpAnnotatedMessage;
}

Expand Down Expand Up @@ -407,7 +407,7 @@ public long getSequenceNumber() {
* @return Session Id of the {@link ServiceBusReceivedMessage}.
*/
public String getSessionId() {
return getAmqpAnnotatedMessage().getProperties().getGroupId();
return getRawAmqpMessage().getProperties().getGroupId();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ void deserializeMessage() {
assertEquals(message.getContentType(), actualMessage.getContentType());
assertEquals(message.getCorrelationId(), actualMessage.getCorrelationId());

assertValues(expectedMessageAnnotations, actualMessage.getAmqpAnnotatedMessage().getMessageAnnotations());
assertValues(expectedDeliveryAnnotations, actualMessage.getAmqpAnnotatedMessage().getDeliveryAnnotations());
assertValues(expectedFooterValues, actualMessage.getAmqpAnnotatedMessage().getFooter());
assertValues(expectedMessageAnnotations, actualMessage.getRawAmqpMessage().getMessageAnnotations());
assertValues(expectedDeliveryAnnotations, actualMessage.getRawAmqpMessage().getDeliveryAnnotations());
assertValues(expectedFooterValues, actualMessage.getRawAmqpMessage().getFooter());

// Verifying our application properties are the same.
assertEquals(APPLICATION_PROPERTIES.size(), actualMessage.getApplicationProperties().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ public void copyConstructorTest() {


final ServiceBusReceivedMessage expected = new ServiceBusReceivedMessage(PAYLOAD_BINARY);
expected.getAmqpAnnotatedMessage().getMessageAnnotations().put(SEQUENCE_NUMBER_ANNOTATION_NAME.getValue(), "10");
expected.getAmqpAnnotatedMessage().getMessageAnnotations().put(DEAD_LETTER_SOURCE_KEY_ANNOTATION_NAME.getValue(), "abc");
expected.getAmqpAnnotatedMessage().getMessageAnnotations().put(ENQUEUED_SEQUENCE_NUMBER_ANNOTATION_NAME.getValue(), "11");
expected.getAmqpAnnotatedMessage().getMessageAnnotations().put(ENQUEUED_TIME_UTC_ANNOTATION_NAME.getValue(), "11");
expected.getAmqpAnnotatedMessage().getApplicationProperties().put(DEAD_LETTER_DESCRIPTION_ANNOTATION_NAME.getValue(), "abc");
expected.getAmqpAnnotatedMessage().getApplicationProperties().put(DEAD_LETTER_REASON_ANNOTATION_NAME.getValue(), "abc");
expected.getRawAmqpMessage().getMessageAnnotations().put(SEQUENCE_NUMBER_ANNOTATION_NAME.getValue(), "10");
expected.getRawAmqpMessage().getMessageAnnotations().put(DEAD_LETTER_SOURCE_KEY_ANNOTATION_NAME.getValue(), "abc");
expected.getRawAmqpMessage().getMessageAnnotations().put(ENQUEUED_SEQUENCE_NUMBER_ANNOTATION_NAME.getValue(), "11");
expected.getRawAmqpMessage().getMessageAnnotations().put(ENQUEUED_TIME_UTC_ANNOTATION_NAME.getValue(), "11");
expected.getRawAmqpMessage().getApplicationProperties().put(DEAD_LETTER_DESCRIPTION_ANNOTATION_NAME.getValue(), "abc");
expected.getRawAmqpMessage().getApplicationProperties().put(DEAD_LETTER_REASON_ANNOTATION_NAME.getValue(), "abc");
expected.setSubject(expectedSubject);
expected.setTo(expectedTo);
expected.setReplyTo(expectedReplyTo);
Expand All @@ -74,12 +74,12 @@ public void copyConstructorTest() {
expected.setTimeToLive(expectedTimeToLive);
expected.setPartitionKey(expectedPartitionKey);

expected.getAmqpAnnotatedMessage().getHeader().setPriority(expectedPriority);
expected.getRawAmqpMessage().getHeader().setPriority(expectedPriority);

final Map<String, Object> expectedFooter = expected.getAmqpAnnotatedMessage().getFooter();
final Map<String, Object> expectedFooter = expected.getRawAmqpMessage().getFooter();
expectedFooter.put("foo-1", expectedFooterValue);

final Map<String, Object> expectedDeliveryAnnotations = expected.getAmqpAnnotatedMessage().getDeliveryAnnotations();
final Map<String, Object> expectedDeliveryAnnotations = expected.getRawAmqpMessage().getDeliveryAnnotations();
expectedDeliveryAnnotations.put("da-1", expectedDeliveryAnnotationsValue);

final Map<String, Object> expectedApplicationProperties = expected.getApplicationProperties();
Expand All @@ -98,14 +98,14 @@ public void copyConstructorTest() {
expected.setPartitionKey("new-p-key");

// Change original values
expected.getAmqpAnnotatedMessage().getHeader().setPriority((short) (expectedPriority + 1));
expected.getRawAmqpMessage().getHeader().setPriority((short) (expectedPriority + 1));
expectedFooter.put("foo-1", expectedFooterValue + "-changed");
expected.getAmqpAnnotatedMessage().getDeliveryAnnotations().put("da-1", expectedDeliveryAnnotationsValue + "-changed");
expected.getAmqpAnnotatedMessage().getApplicationProperties().put("ap-1", expectedApplicationValue + "-changed");
expected.getRawAmqpMessage().getDeliveryAnnotations().put("da-1", expectedDeliveryAnnotationsValue + "-changed");
expected.getRawAmqpMessage().getApplicationProperties().put("ap-1", expectedApplicationValue + "-changed");


// Assert
assertNotSame(expected.getAmqpAnnotatedMessage(), actual.getAmqpAnnotatedMessage());
assertNotSame(expected.getRawAmqpMessage(), actual.getRawAmqpMessage());

// Validate updated values
assertEquals(expectedSubject, actual.getSubject());
Expand All @@ -117,21 +117,21 @@ public void copyConstructorTest() {
assertEquals(expectedPartitionKey, actual.getPartitionKey());

// Following values should be reset.
assertNull(actual.getAmqpAnnotatedMessage().getMessageAnnotations().get(LOCKED_UNTIL_KEY_ANNOTATION_NAME.getValue()));
assertNull(actual.getAmqpAnnotatedMessage().getMessageAnnotations().get(SEQUENCE_NUMBER_ANNOTATION_NAME.getValue()));
assertNull(actual.getAmqpAnnotatedMessage().getMessageAnnotations().get(DEAD_LETTER_SOURCE_KEY_ANNOTATION_NAME.getValue()));
assertNull(actual.getAmqpAnnotatedMessage().getMessageAnnotations().get(ENQUEUED_SEQUENCE_NUMBER_ANNOTATION_NAME.getValue()));
assertNull(actual.getAmqpAnnotatedMessage().getMessageAnnotations().get(ENQUEUED_TIME_UTC_ANNOTATION_NAME.getValue()));
assertNull(actual.getRawAmqpMessage().getMessageAnnotations().get(LOCKED_UNTIL_KEY_ANNOTATION_NAME.getValue()));
assertNull(actual.getRawAmqpMessage().getMessageAnnotations().get(SEQUENCE_NUMBER_ANNOTATION_NAME.getValue()));
assertNull(actual.getRawAmqpMessage().getMessageAnnotations().get(DEAD_LETTER_SOURCE_KEY_ANNOTATION_NAME.getValue()));
assertNull(actual.getRawAmqpMessage().getMessageAnnotations().get(ENQUEUED_SEQUENCE_NUMBER_ANNOTATION_NAME.getValue()));
assertNull(actual.getRawAmqpMessage().getMessageAnnotations().get(ENQUEUED_TIME_UTC_ANNOTATION_NAME.getValue()));

assertNull(actual.getAmqpAnnotatedMessage().getApplicationProperties().get(DEAD_LETTER_DESCRIPTION_ANNOTATION_NAME.getValue()));
assertNull(actual.getAmqpAnnotatedMessage().getApplicationProperties().get(DEAD_LETTER_REASON_ANNOTATION_NAME.getValue()));
assertNull(actual.getAmqpAnnotatedMessage().getHeader().getDeliveryCount());
assertNull(actual.getRawAmqpMessage().getApplicationProperties().get(DEAD_LETTER_DESCRIPTION_ANNOTATION_NAME.getValue()));
assertNull(actual.getRawAmqpMessage().getApplicationProperties().get(DEAD_LETTER_REASON_ANNOTATION_NAME.getValue()));
assertNull(actual.getRawAmqpMessage().getHeader().getDeliveryCount());

// Testing , updating original message did not change copied message values..
assertEquals(expectedPriority, actual.getAmqpAnnotatedMessage().getHeader().getPriority());
assertEquals(expectedFooterValue, actual.getAmqpAnnotatedMessage().getFooter().get("foo-1").toString());
assertEquals(expectedDeliveryAnnotationsValue, actual.getAmqpAnnotatedMessage().getDeliveryAnnotations().get("da-1").toString());
assertEquals(expectedApplicationValue, actual.getAmqpAnnotatedMessage().getApplicationProperties().get("ap-1").toString());
assertEquals(expectedPriority, actual.getRawAmqpMessage().getHeader().getPriority());
assertEquals(expectedFooterValue, actual.getRawAmqpMessage().getFooter().get("foo-1").toString());
assertEquals(expectedDeliveryAnnotationsValue, actual.getRawAmqpMessage().getDeliveryAnnotations().get("da-1").toString());
assertEquals(expectedApplicationValue, actual.getRawAmqpMessage().getApplicationProperties().get("ap-1").toString());

}

Expand Down Expand Up @@ -176,15 +176,15 @@ public void copyConstructorModifyAfterCopyTest() {

// Assert
// Validate updated values
assertEquals(expectedSubject, originalMessage.getAmqpAnnotatedMessage().getProperties().getSubject());
assertEquals(expectedTo, originalMessage.getAmqpAnnotatedMessage().getProperties().getTo().toString());
assertEquals(expectedReplyTo, originalMessage.getAmqpAnnotatedMessage().getProperties().getReplyTo().toString());
assertEquals(expectedReplyToSessionId, originalMessage.getAmqpAnnotatedMessage().getProperties().getReplyToGroupId());
assertEquals(expectedCorrelationId, originalMessage.getAmqpAnnotatedMessage().getProperties().getCorrelationId().toString());
assertEquals(expectedSubject, originalMessage.getRawAmqpMessage().getProperties().getSubject());
assertEquals(expectedTo, originalMessage.getRawAmqpMessage().getProperties().getTo().toString());
assertEquals(expectedReplyTo, originalMessage.getRawAmqpMessage().getProperties().getReplyTo().toString());
assertEquals(expectedReplyToSessionId, originalMessage.getRawAmqpMessage().getProperties().getReplyToGroupId());
assertEquals(expectedCorrelationId, originalMessage.getRawAmqpMessage().getProperties().getCorrelationId().toString());

assertEquals(expectedTimeToLive, originalMessage.getAmqpAnnotatedMessage().getHeader().getTimeToLive());
assertEquals(expectedTimeToLive, originalMessage.getRawAmqpMessage().getHeader().getTimeToLive());

assertEquals(expectedPartitionKey, originalMessage.getAmqpAnnotatedMessage().getMessageAnnotations().get(PARTITION_KEY_ANNOTATION_NAME.getValue()));
assertEquals(expectedPartitionKey, originalMessage.getRawAmqpMessage().getMessageAnnotations().get(PARTITION_KEY_ANNOTATION_NAME.getValue()));
}

/**
Expand Down
Loading