Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix an Interop issue: Amqp body section can be AmqpValue or AmqpSequence #66

Merged
merged 7 commits into from
Mar 8, 2017
36 changes: 29 additions & 7 deletions azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventData.java
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Section;

import com.microsoft.azure.servicebus.amqp.AmqpConstants;

Expand All @@ -35,11 +38,14 @@ public class EventData implements Serializable

transient private Binary bodyData;

private final Class bodyType;

private Map<String, Object> properties;
private SystemProperties systemProperties;

private EventData()
{
this.bodyType = Data.class;
}

/**
Expand Down Expand Up @@ -71,19 +77,31 @@ private EventData()
if (amqpMessage.getCorrelationId() != null) receiveProperties.put(AmqpConstants.AMQP_PROPERTY_CORRELATION_ID, amqpMessage.getCorrelationId());
if (amqpMessage.getContentType() != null) receiveProperties.put(AmqpConstants.AMQP_PROPERTY_CONTENT_TYPE, amqpMessage.getContentType());
if (amqpMessage.getContentEncoding() != null) receiveProperties.put(AmqpConstants.AMQP_PROPERTY_CONTENT_ENCODING, amqpMessage.getContentEncoding());
if (amqpMessage.getProperties().getAbsoluteExpiryTime() != null) receiveProperties.put(AmqpConstants.AMQP_PROPERTY_ABSOLUTE_EXPRITY_time, amqpMessage.getExpiryTime());
if (amqpMessage.getProperties().getAbsoluteExpiryTime() != null) receiveProperties.put(AmqpConstants.AMQP_PROPERTY_ABSOLUTE_EXPRITY_TIME, amqpMessage.getExpiryTime());
if (amqpMessage.getProperties().getCreationTime() != null) receiveProperties.put(AmqpConstants.AMQP_PROPERTY_CREATION_TIME, amqpMessage.getCreationTime());
if (amqpMessage.getGroupId() != null) receiveProperties.put(AmqpConstants.AMQP_PROPERTY_GROUP_ID, amqpMessage.getGroupId());
if (amqpMessage.getProperties().getGroupSequence() != null) receiveProperties.put(AmqpConstants.AMQP_PROPERTY_GROUP_SEQUENCE, amqpMessage.getGroupSequence());
if (amqpMessage.getReplyToGroupId() != null) receiveProperties.put(AmqpConstants.AMQP_PROPERTY_REPLY_TO_GROUP_ID, amqpMessage.getReplyToGroupId());
}

this.systemProperties = new SystemProperties(receiveProperties);
this.properties = amqpMessage.getApplicationProperties() == null ? null
: ((Map<String, Object>)(amqpMessage.getApplicationProperties().getValue()));

this.bodyData = amqpMessage.getBody() == null ? null : ((Data) amqpMessage.getBody()).getValue();


Section bodySection = amqpMessage.getBody();
if (bodySection != null) {
this.bodyType = bodySection.getClass();
if (bodySection instanceof Data)
this.bodyData = ((Data) bodySection).getValue();
else if (bodySection instanceof AmqpValue)
receiveProperties.put(AmqpConstants.AMQP_VALUE, ((AmqpValue) bodySection).getValue());
else if (bodySection instanceof AmqpSequence)
receiveProperties.put(AmqpConstants.AMQP_SEQUENCE, ((AmqpSequence) bodySection).getValue());
}
else {
this.bodyType = Data.class;
Copy link
Member

@xinchen10 xinchen10 Mar 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need this? It is set to default in the ctor. #Resolved

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ctor doesn't invoke the default ctor


In reply to: 104077202 [](ancestors = 104077202)

}

this.systemProperties = new SystemProperties(receiveProperties);
amqpMessage.clear();
}

Expand Down Expand Up @@ -180,7 +198,11 @@ public EventData(ByteBuffer buffer)
*/
public byte[] getBody()
{
return this.bodyData == null ? null : this.bodyData.getArray();
if (this.bodyType != Data.class) {
throw new IllegalEventDataBodyException(this.bodyType);
}

return this.bodyData == null ? null : this.bodyData.getArray();
}

/**
Expand Down Expand Up @@ -278,7 +300,7 @@ Message toAmqpMessage()
case AmqpConstants.AMQP_PROPERTY_CORRELATION_ID: amqpMessage.setCorrelationId(systemProperty.getValue()); break;
case AmqpConstants.AMQP_PROPERTY_CONTENT_TYPE: amqpMessage.setContentType((String) systemProperty.getValue()); break;
case AmqpConstants.AMQP_PROPERTY_CONTENT_ENCODING: amqpMessage.setContentEncoding((String) systemProperty.getValue()); break;
case AmqpConstants.AMQP_PROPERTY_ABSOLUTE_EXPRITY_time: amqpMessage.setExpiryTime((long) systemProperty.getValue()); break;
case AmqpConstants.AMQP_PROPERTY_ABSOLUTE_EXPRITY_TIME: amqpMessage.setExpiryTime((long) systemProperty.getValue()); break;
case AmqpConstants.AMQP_PROPERTY_CREATION_TIME: amqpMessage.setCreationTime((long) systemProperty.getValue()); break;
case AmqpConstants.AMQP_PROPERTY_GROUP_ID: amqpMessage.setGroupId((String) systemProperty.getValue()); break;
case AmqpConstants.AMQP_PROPERTY_GROUP_SEQUENCE: amqpMessage.setGroupSequence((long) systemProperty.getValue()); break;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE file in the project root for full license information.
*/
package com.microsoft.azure.eventhubs;

import java.util.HashMap;
import java.util.Map;

import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Data;

import com.microsoft.azure.servicebus.amqp.AmqpConstants;

public class IllegalEventDataBodyException extends RuntimeException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnexpectedEventDataBodyException?


private static final Map<Class, String> KNOWN_SECTIONS = new HashMap<Class, String>() {{
put(AmqpValue.class, AmqpConstants.AMQP_VALUE);
put(AmqpSequence.class, AmqpConstants.AMQP_SEQUENCE);
}};

private final Class bodySection;

public IllegalEventDataBodyException(final Class actualBodySection) {
super(KNOWN_SECTIONS.containsKey(actualBodySection)
? String.format("AmqpMessage Body Section will be available in %s.getBody() only if it is of type: %s. If AmqpMessage is sent with any other Body type - it will be added to %s.getSystemProperties(). Use thisException.getSystemPropertyName() method to find this value in %s.getSystemProperties()",
EventData.class, Data.class, EventData.class, EventData.class)
: "AmqpMessage Body Section cannot be mapped to any EventData section.");
this.bodySection = actualBodySection;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are no other body types. If it is not Data, it must be either Value or Sequence. This should be ensured by Proton-J.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While as per the SPEC this is correct - this happened a while back: -we saw ApplicationProperties as part of Data section (I am still investigating how this was possible):

17/02/02 20:34:20 ERROR ReceiverTracker: Deregistered receiver for stream 19: Restarting receiver with delay 2000ms: Error handling message, restarting receiver for partition 19 - java.util.concurrent.ExecutionException: java.lang.ClassCastException: org.apache.qpid.proton.amqp.messaging.ApplicationProperties cannot be cast to org.apache.qpid.proton.amqp.messaging.Data
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at org.apache.spark.streaming.eventhubs.EventHubsClientWrapper.receive(EventHubsClientWrapper.scala:158)
at org.apache.spark.streaming.eventhubs.EventHubsReceiver$EventHubsMessageHandler.run(EventHubsReceiver.scala:117)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: org.apache.qpid.proton.amqp.messaging.ApplicationProperties cannot be cast to org.apache.qpid.proton.amqp.messaging.Data
at com.microsoft.azure.eventhubs.EventData.(EventData.java:85)
at com.microsoft.azure.eventhubs.EventDataUtil.toEventDataCollection(EventDataUtil.java:46)
at com.microsoft.azure.eventhubs.PartitionReceiver$3.apply(PartitionReceiver.java:285)
at com.microsoft.azure.eventhubs.PartitionReceiver$3.apply(PartitionReceiver.java:281)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at com.microsoft.azure.servicebus.MessageReceiver$4.onEvent(MessageReceiver.java:305)
at com.microsoft.azure.servicebus.amqp.DispatchHandler.onTimerTask(DispatchHandler.java:10)
at com.microsoft.azure.servicebus.amqp.ReactorDispatcher$ScheduleHandler.run(ReactorDispatcher.java:126)
at org.apache.qpid.proton.reactor.impl.SelectableImpl.readable(SelectableImpl.java:118)
at org.apache.qpid.proton.reactor.impl.IOHandler.handleQuiesced(IOHandler.java:61)
at org.apache.qpid.proton.reactor.impl.IOHandler.onUnhandled(IOHandler.java:381)
at org.apache.qpid.proton.engine.BaseHandler.onReactorQuiesced(BaseHandler.java:87)
at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:206)
at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:309)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:277)
at com.microsoft.azure.servicebus.MessagingFactory$RunReactor.run(MessagingFactory.java:340)
... 1 more


In reply to: 104077500 [](ancestors = 104077500)

}

public String getSystemPropertyName() {
return KNOWN_SECTIONS.get(this.bodySection);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ private AmqpConstants() { }
add(AMQP_PROPERTY_CORRELATION_ID);
add(AMQP_PROPERTY_CONTENT_TYPE);
add(AMQP_PROPERTY_CONTENT_ENCODING);
add(AMQP_PROPERTY_ABSOLUTE_EXPRITY_time);
add(AMQP_PROPERTY_ABSOLUTE_EXPRITY_TIME);
add(AMQP_PROPERTY_CREATION_TIME);
add(AMQP_PROPERTY_GROUP_ID);
add(AMQP_PROPERTY_GROUP_SEQUENCE);
Expand Down Expand Up @@ -65,11 +65,14 @@ private AmqpConstants() { }
public static final String AMQP_PROPERTY_CORRELATION_ID = "correlation-id";
public static final String AMQP_PROPERTY_CONTENT_TYPE = "content-type";
public static final String AMQP_PROPERTY_CONTENT_ENCODING = "content-encoding";
public static final String AMQP_PROPERTY_ABSOLUTE_EXPRITY_time = "absolute-expiry-time";
public static final String AMQP_PROPERTY_ABSOLUTE_EXPRITY_TIME = "absolute-expiry-time";
public static final String AMQP_PROPERTY_CREATION_TIME = "creation-time";
public static final String AMQP_PROPERTY_GROUP_ID = "group-id";
public static final String AMQP_PROPERTY_GROUP_SEQUENCE = "group-sequence";
public static final String AMQP_PROPERTY_REPLY_TO_GROUP_ID = "reply-to-group-id";

public static final String AMQP_VALUE = "amqp:amqp-value:*";
public static final String AMQP_SEQUENCE = "amqp:amqp-sequence:list";

public static final Symbol ENABLE_RECEIVER_RUNTIME_METRIC_NAME = Symbol.valueOf(VENDOR + ":enable-receiver-runtime-metric");
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.message.Message;

import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
Expand All @@ -36,7 +37,7 @@
import com.microsoft.azure.servicebus.ServiceBusException;
import com.microsoft.azure.servicebus.amqp.AmqpConstants;

public class InteropTest extends ApiTestBase
public class InteropAmqpPropertiesTest extends ApiTestBase
{
static EventHubClient ehClient;
static MessagingFactory msgFactory;
Expand Down Expand Up @@ -83,8 +84,8 @@ public void accept(EventData eData)
&& eData.getSystemProperties().get(AmqpConstants.AMQP_PROPERTY_REPLY_TO_GROUP_ID).equals(originalMessage.getReplyToGroupId()));
Assert.assertTrue(eData.getSystemProperties().containsKey(AmqpConstants.AMQP_PROPERTY_REPLY_TO)
&& eData.getSystemProperties().get(AmqpConstants.AMQP_PROPERTY_REPLY_TO).equals(originalMessage.getReplyTo()));
Assert.assertTrue(eData.getSystemProperties().containsKey(AmqpConstants.AMQP_PROPERTY_ABSOLUTE_EXPRITY_time)
&& eData.getSystemProperties().get(AmqpConstants.AMQP_PROPERTY_ABSOLUTE_EXPRITY_time).equals(originalMessage.getExpiryTime()));
Assert.assertTrue(eData.getSystemProperties().containsKey(AmqpConstants.AMQP_PROPERTY_ABSOLUTE_EXPRITY_TIME)
&& eData.getSystemProperties().get(AmqpConstants.AMQP_PROPERTY_ABSOLUTE_EXPRITY_TIME).equals(originalMessage.getExpiryTime()));

Assert.assertTrue(eData.getSystemProperties().containsKey(msgAnnotation)
&& eData.getSystemProperties().get(msgAnnotation).equals(originalMessage.getMessageAnnotations().getValue().get(Symbol.getSymbol(msgAnnotation))));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright (c) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE file in the project root for full license information.
*/
package com.microsoft.azure.eventhubs.eventdata;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import java.util.LinkedList;
import java.util.List;

import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.message.Message;

import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.IllegalEventDataBodyException;
import com.microsoft.azure.eventhubs.PartitionReceiver;
import com.microsoft.azure.eventhubs.lib.ApiTestBase;
import com.microsoft.azure.eventhubs.lib.TestContext;
import com.microsoft.azure.servicebus.ConnectionStringBuilder;
import com.microsoft.azure.servicebus.MessageSender;
import com.microsoft.azure.servicebus.MessagingFactory;
import com.microsoft.azure.servicebus.ServiceBusException;
import com.microsoft.azure.servicebus.amqp.AmqpConstants;

public class InteropEventBodyTest extends ApiTestBase {

static EventHubClient ehClient;
static MessagingFactory msgFactory;
static PartitionReceiver receiver;
static MessageSender partitionMsgSender;

static final String partitionId = "0";
static EventData receivedEvent;
static EventData reSentAndReceivedEvent;
static Message reSendAndReceivedMessage;

@BeforeClass
public static void initialize() throws ServiceBusException, IOException, InterruptedException, ExecutionException
{
final ConnectionStringBuilder connStrBuilder = TestContext.getConnectionString();
final String connectionString = connStrBuilder.toString();

ehClient = EventHubClient.createFromConnectionStringSync(connectionString);
msgFactory = MessagingFactory.createFromConnectionString(connectionString).get();
receiver = ehClient.createReceiverSync(TestContext.getConsumerGroupName(), partitionId, Instant.now());
partitionMsgSender = MessageSender.create(msgFactory, "link1", connStrBuilder.getEntityPath() + "/partitions/" + partitionId).get();

// run out of messages in that specific partition - to account for clock-skew with Instant.now() on test machine vs eventhubs service
receiver.setReceiveTimeout(Duration.ofSeconds(5));
Iterable<EventData> clockSkewEvents;
do {
clockSkewEvents = receiver.receiveSync(100);
} while (clockSkewEvents != null && clockSkewEvents.iterator().hasNext());
}

@Test
public void interopWithProtonAmqpMessageBodyAsAmqpValue() throws ServiceBusException, InterruptedException, ExecutionException
{
Message originalMessage = Proton.message();
String payload = "testmsg";
originalMessage.setBody(new AmqpValue(payload));
partitionMsgSender.send(originalMessage).get();
receivedEvent = receiver.receiveSync(10).iterator().next();

Assert.assertTrue(receivedEvent.getSystemProperties().get(AmqpConstants.AMQP_VALUE).equals(payload));

try {
receivedEvent.getBody();
Assert.assertTrue(false);
} catch (IllegalEventDataBodyException exception) {
Assert.assertTrue(exception.getSystemPropertyName().equals(AmqpConstants.AMQP_VALUE));
}
}

@Test
public void interopWithProtonAmqpMessageBodyAsAmqpSequence() throws ServiceBusException, InterruptedException, ExecutionException
{
Message originalMessage = Proton.message();
String payload = "testmsg";
LinkedList<Data> datas = new LinkedList<>();
datas.add(new Data(new Binary(payload.getBytes())));
originalMessage.setBody(new AmqpSequence(datas));

partitionMsgSender.send(originalMessage).get();
receivedEvent = receiver.receiveSync(10).iterator().next();

Assert.assertTrue(new String(((List<Data>)(receivedEvent.getSystemProperties().get(AmqpConstants.AMQP_SEQUENCE))).get(0).getValue().getArray()).equals(payload));

try {
receivedEvent.getBody();
Assert.assertTrue(false);
} catch (IllegalEventDataBodyException exception) {
Assert.assertTrue(exception.getSystemPropertyName().equals(AmqpConstants.AMQP_SEQUENCE));
}
}

@AfterClass
public static void cleanup() throws ServiceBusException
{
if (partitionMsgSender != null)
partitionMsgSender.closeSync();

if (receiver != null)
receiver.closeSync();

if (ehClient != null)
ehClient.closeSync();

if (msgFactory != null)
msgFactory.closeSync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,17 @@ public void sendBatchRetainsOrderWithinBatch() throws ServiceBusException, Inter
this.receivers.add(receiver);
receiver.setReceiveTimeout(Duration.ofSeconds(1));
receiver.setReceiveHandler(new OrderValidator(validator, batchSize));

// run out of messages in that specific partition - to account for clock-skew with Instant.now() on test machine vs eventhubs service
Iterable<EventData> clockSkewEvents;
do {
clockSkewEvents = receiver.receiveSync(100);
} while (clockSkewEvents != null && clockSkewEvents.iterator().hasNext());

sender = ehClient.createPartitionSenderSync(partitionId);
sender.sendSync(batchEvents);

validator.get(10, TimeUnit.SECONDS);
validator.get(25, TimeUnit.SECONDS);
}

@Test
Expand All @@ -82,7 +89,15 @@ public void sendResultsInSysPropertiesWithPartitionKey() throws ServiceBusExcept
{
final PartitionReceiver receiver = ehClient.createReceiverSync(cgName, Integer.toString(receiversCount), Instant.now());
receivers.add(receiver);
receiver.setReceiveHandler(validator);

// run out of messages in that specific partition - to account for clock-skew with Instant.now() on test machine vs eventhubs service
receiver.setReceiveTimeout(Duration.ofSeconds(5));
Iterable<EventData> clockSkewEvents;
do {
clockSkewEvents = receiver.receiveSync(100);
} while (clockSkewEvents != null && clockSkewEvents.iterator().hasNext());

receiver.setReceiveHandler(validator);
}

ehClient.sendSync(new EventData("TestMessage".getBytes()), partitionKey);
Expand All @@ -101,7 +116,15 @@ public void sendBatchResultsInSysPropertiesWithPartitionKey() throws ServiceBusE
{
final PartitionReceiver receiver = ehClient.createReceiverSync(cgName, Integer.toString(receiversCount), Instant.now());
receivers.add(receiver);
receiver.setReceiveHandler(validator);

// run out of messages in that specific partition - to account for clock-skew with Instant.now() on test machine vs eventhubs service
receiver.setReceiveTimeout(Duration.ofSeconds(5));
Iterable<EventData> clockSkewEvents;
do {
clockSkewEvents = receiver.receiveSync(100);
} while (clockSkewEvents != null && clockSkewEvents.iterator().hasNext());

receiver.setReceiveHandler(validator);
}

List<EventData> events = new LinkedList<>();
Expand Down Expand Up @@ -158,7 +181,7 @@ public void onReceive(Iterable<EventData> events)
{
for(EventData event : events)
{
if (!event.getSystemProperties().getPartitionKey().equals(partitionKey))
if (!partitionKey.equals(event.getSystemProperties().getPartitionKey()))
this.validateSignal.completeExceptionally(
new AssertionFailedError(String.format("received partitionKey: %s, expected partitionKey: %s", event.getSystemProperties().getPartitionKey(), partitionKey)));

Expand Down