-
Notifications
You must be signed in to change notification settings - Fork 61
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix an Interop issue: Amqp body section can be AmqpValue or AmqpSequence #66
Changes from 2 commits
98c638e
d364a8e
6517f4c
ab8c196
0e9e945
ff730b9
d219f47
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
/* | ||
* Copyright (c) Microsoft. All rights reserved. | ||
* Licensed under the MIT license. See LICENSE file in the project root for full license information. | ||
*/ | ||
package com.microsoft.azure.eventhubs; | ||
|
||
import java.util.HashMap; | ||
import java.util.Map; | ||
|
||
import org.apache.qpid.proton.amqp.messaging.AmqpSequence; | ||
import org.apache.qpid.proton.amqp.messaging.AmqpValue; | ||
import org.apache.qpid.proton.amqp.messaging.Data; | ||
|
||
import com.microsoft.azure.servicebus.amqp.AmqpConstants; | ||
|
||
public class IllegalEventDataBodyException extends RuntimeException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. UnexpectedEventDataBodyException? |
||
|
||
private static final Map<Class, String> KNOWN_SECTIONS = new HashMap<Class, String>() {{ | ||
put(AmqpValue.class, AmqpConstants.AMQP_VALUE); | ||
put(AmqpSequence.class, AmqpConstants.AMQP_SEQUENCE); | ||
}}; | ||
|
||
private final Class bodySection; | ||
|
||
public IllegalEventDataBodyException(final Class actualBodySection) { | ||
super(KNOWN_SECTIONS.containsKey(actualBodySection) | ||
? String.format("AmqpMessage Body Section will be available in %s.getBody() only if it is of type: %s. If AmqpMessage is sent with any other Body type - it will be added to %s.getSystemProperties(). Use thisException.getSystemPropertyName() method to find this value in %s.getSystemProperties()", | ||
EventData.class, Data.class, EventData.class, EventData.class) | ||
: "AmqpMessage Body Section cannot be mapped to any EventData section."); | ||
this.bodySection = actualBodySection; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are no other body types. If it is not Data, it must be either Value or Sequence. This should be ensured by Proton-J. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While as per the SPEC this is correct - this happened a while back: -we saw ApplicationProperties as part of Data section (I am still investigating how this was possible): 17/02/02 20:34:20 ERROR ReceiverTracker: Deregistered receiver for stream 19: Restarting receiver with delay 2000ms: Error handling message, restarting receiver for partition 19 - java.util.concurrent.ExecutionException: java.lang.ClassCastException: org.apache.qpid.proton.amqp.messaging.ApplicationProperties cannot be cast to org.apache.qpid.proton.amqp.messaging.Data In reply to: 104077500 [](ancestors = 104077500) |
||
} | ||
|
||
public String getSystemPropertyName() { | ||
return KNOWN_SECTIONS.get(this.bodySection); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
/* | ||
* Copyright (c) Microsoft. All rights reserved. | ||
* Licensed under the MIT license. See LICENSE file in the project root for full license information. | ||
*/ | ||
package com.microsoft.azure.eventhubs.eventdata; | ||
|
||
import java.io.IOException; | ||
import java.time.Duration; | ||
import java.time.Instant; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.LinkedList; | ||
import java.util.List; | ||
|
||
import org.apache.qpid.proton.amqp.Binary; | ||
import org.apache.qpid.proton.amqp.messaging.AmqpSequence; | ||
import org.apache.qpid.proton.amqp.messaging.AmqpValue; | ||
import org.apache.qpid.proton.amqp.messaging.Data; | ||
import org.apache.qpid.proton.Proton; | ||
import org.apache.qpid.proton.message.Message; | ||
|
||
import org.junit.AfterClass; | ||
import org.junit.Assert; | ||
import org.junit.BeforeClass; | ||
import org.junit.Test; | ||
|
||
import com.microsoft.azure.eventhubs.EventData; | ||
import com.microsoft.azure.eventhubs.EventHubClient; | ||
import com.microsoft.azure.eventhubs.IllegalEventDataBodyException; | ||
import com.microsoft.azure.eventhubs.PartitionReceiver; | ||
import com.microsoft.azure.eventhubs.lib.ApiTestBase; | ||
import com.microsoft.azure.eventhubs.lib.TestContext; | ||
import com.microsoft.azure.servicebus.ConnectionStringBuilder; | ||
import com.microsoft.azure.servicebus.MessageSender; | ||
import com.microsoft.azure.servicebus.MessagingFactory; | ||
import com.microsoft.azure.servicebus.ServiceBusException; | ||
import com.microsoft.azure.servicebus.amqp.AmqpConstants; | ||
|
||
public class InteropEventBodyTest extends ApiTestBase { | ||
|
||
static EventHubClient ehClient; | ||
static MessagingFactory msgFactory; | ||
static PartitionReceiver receiver; | ||
static MessageSender partitionMsgSender; | ||
|
||
static final String partitionId = "0"; | ||
static EventData receivedEvent; | ||
static EventData reSentAndReceivedEvent; | ||
static Message reSendAndReceivedMessage; | ||
|
||
@BeforeClass | ||
public static void initialize() throws ServiceBusException, IOException, InterruptedException, ExecutionException | ||
{ | ||
final ConnectionStringBuilder connStrBuilder = TestContext.getConnectionString(); | ||
final String connectionString = connStrBuilder.toString(); | ||
|
||
ehClient = EventHubClient.createFromConnectionStringSync(connectionString); | ||
msgFactory = MessagingFactory.createFromConnectionString(connectionString).get(); | ||
receiver = ehClient.createReceiverSync(TestContext.getConsumerGroupName(), partitionId, Instant.now()); | ||
partitionMsgSender = MessageSender.create(msgFactory, "link1", connStrBuilder.getEntityPath() + "/partitions/" + partitionId).get(); | ||
|
||
// run out of messages in that specific partition - to account for clock-skew with Instant.now() on test machine vs eventhubs service | ||
receiver.setReceiveTimeout(Duration.ofSeconds(5)); | ||
Iterable<EventData> clockSkewEvents; | ||
do { | ||
clockSkewEvents = receiver.receiveSync(100); | ||
} while (clockSkewEvents != null && clockSkewEvents.iterator().hasNext()); | ||
} | ||
|
||
@Test | ||
public void interopWithProtonAmqpMessageBodyAsAmqpValue() throws ServiceBusException, InterruptedException, ExecutionException | ||
{ | ||
Message originalMessage = Proton.message(); | ||
String payload = "testmsg"; | ||
originalMessage.setBody(new AmqpValue(payload)); | ||
partitionMsgSender.send(originalMessage).get(); | ||
receivedEvent = receiver.receiveSync(10).iterator().next(); | ||
|
||
Assert.assertTrue(receivedEvent.getSystemProperties().get(AmqpConstants.AMQP_VALUE).equals(payload)); | ||
|
||
try { | ||
receivedEvent.getBody(); | ||
Assert.assertTrue(false); | ||
} catch (IllegalEventDataBodyException exception) { | ||
Assert.assertTrue(exception.getSystemPropertyName().equals(AmqpConstants.AMQP_VALUE)); | ||
} | ||
} | ||
|
||
@Test | ||
public void interopWithProtonAmqpMessageBodyAsAmqpSequence() throws ServiceBusException, InterruptedException, ExecutionException | ||
{ | ||
Message originalMessage = Proton.message(); | ||
String payload = "testmsg"; | ||
LinkedList<Data> datas = new LinkedList<>(); | ||
datas.add(new Data(new Binary(payload.getBytes()))); | ||
originalMessage.setBody(new AmqpSequence(datas)); | ||
|
||
partitionMsgSender.send(originalMessage).get(); | ||
receivedEvent = receiver.receiveSync(10).iterator().next(); | ||
|
||
Assert.assertTrue(new String(((List<Data>)(receivedEvent.getSystemProperties().get(AmqpConstants.AMQP_SEQUENCE))).get(0).getValue().getArray()).equals(payload)); | ||
|
||
try { | ||
receivedEvent.getBody(); | ||
Assert.assertTrue(false); | ||
} catch (IllegalEventDataBodyException exception) { | ||
Assert.assertTrue(exception.getSystemPropertyName().equals(AmqpConstants.AMQP_SEQUENCE)); | ||
} | ||
} | ||
|
||
@AfterClass | ||
public static void cleanup() throws ServiceBusException | ||
{ | ||
if (partitionMsgSender != null) | ||
partitionMsgSender.closeSync(); | ||
|
||
if (receiver != null) | ||
receiver.closeSync(); | ||
|
||
if (ehClient != null) | ||
ehClient.closeSync(); | ||
|
||
if (msgFactory != null) | ||
msgFactory.closeSync(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need this? It is set to default in the ctor. #Resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This ctor doesn't invoke the default ctor
In reply to: 104077202 [](ancestors = 104077202)