-
Notifications
You must be signed in to change notification settings - Fork 61
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix an Interop issue: Amqp body section can be AmqpValue or AmqpSequence #66
Conversation
…nce (#3) * support interop with 2 other body types - amqp-value & amqp-sequence * receivedEvent.getBody() should throw and communicate - in case of interop issues
@SreeramGarlapati, |
receiveProperties.put(AmqpConstants.AMQP_SEQUENCE, ((AmqpSequence) bodySection).getValue()); | ||
} | ||
else { | ||
this.bodyType = Data.class; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need this? It is set to default in the ctor. #Resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
import com.microsoft.azure.servicebus.amqp.AmqpConstants; | ||
|
||
public class IllegalEventDataBodyException extends RuntimeException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UnexpectedEventDataBodyException?
? String.format("AmqpMessage Body Section will be available in %s.getBody() only if it is of type: %s. If AmqpMessage is sent with any other Body type - it will be added to %s.getSystemProperties(). Use thisException.getSystemPropertyName() method to find this value in %s.getSystemProperties()", | ||
EventData.class, Data.class, EventData.class, EventData.class) | ||
: "AmqpMessage Body Section cannot be mapped to any EventData section."); | ||
this.bodySection = actualBodySection; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are no other body types. If it is not Data, it must be either Value or Sequence. This should be ensured by Proton-J.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While as per the SPEC this is correct - this happened a while back: -we saw ApplicationProperties as part of Data section (I am still investigating how this was possible):
17/02/02 20:34:20 ERROR ReceiverTracker: Deregistered receiver for stream 19: Restarting receiver with delay 2000ms: Error handling message, restarting receiver for partition 19 - java.util.concurrent.ExecutionException: java.lang.ClassCastException: org.apache.qpid.proton.amqp.messaging.ApplicationProperties cannot be cast to org.apache.qpid.proton.amqp.messaging.Data
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at org.apache.spark.streaming.eventhubs.EventHubsClientWrapper.receive(EventHubsClientWrapper.scala:158)
at org.apache.spark.streaming.eventhubs.EventHubsReceiver$EventHubsMessageHandler.run(EventHubsReceiver.scala:117)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: org.apache.qpid.proton.amqp.messaging.ApplicationProperties cannot be cast to org.apache.qpid.proton.amqp.messaging.Data
at com.microsoft.azure.eventhubs.EventData.(EventData.java:85)
at com.microsoft.azure.eventhubs.EventDataUtil.toEventDataCollection(EventDataUtil.java:46)
at com.microsoft.azure.eventhubs.PartitionReceiver$3.apply(PartitionReceiver.java:285)
at com.microsoft.azure.eventhubs.PartitionReceiver$3.apply(PartitionReceiver.java:281)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at com.microsoft.azure.servicebus.MessageReceiver$4.onEvent(MessageReceiver.java:305)
at com.microsoft.azure.servicebus.amqp.DispatchHandler.onTimerTask(DispatchHandler.java:10)
at com.microsoft.azure.servicebus.amqp.ReactorDispatcher$ScheduleHandler.run(ReactorDispatcher.java:126)
at org.apache.qpid.proton.reactor.impl.SelectableImpl.readable(SelectableImpl.java:118)
at org.apache.qpid.proton.reactor.impl.IOHandler.handleQuiesced(IOHandler.java:61)
at org.apache.qpid.proton.reactor.impl.IOHandler.onUnhandled(IOHandler.java:381)
at org.apache.qpid.proton.engine.BaseHandler.onReactorQuiesced(BaseHandler.java:87)
at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:206)
at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:309)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:277)
at com.microsoft.azure.servicebus.MessagingFactory$RunReactor.run(MessagingFactory.java:340)
... 1 more
In reply to: 104077500 [](ancestors = 104077500)
…Azure#66) * support interop with 2 other body types - amqp-value & amqp-sequence * receivedEvent.getBody() should throw and communicate - in case of interop issues * fix flaky CIT issues * Change exception name from Illegal to Unexpected * implement reSend path for other Body section types
…Azure#66) * support interop with 2 other body types - amqp-value & amqp-sequence * receivedEvent.getBody() should throw and communicate - in case of interop issues * fix flaky CIT issues * Change exception name from Illegal to Unexpected * implement reSend path for other Body section types
* Fix Interop issue: Amqp body section can be AmqpValue or AmqpSequence (#66) * support interop with 2 other body types - amqp-value & amqp-sequence * receivedEvent.getBody() should throw and communicate - in case of interop issues * fix flaky CIT issues * Change exception name from Illegal to Unexpected * implement reSend path for other Body section types * update proton-j to latest (0.18.0) * handle interop amqpmessage body with value and sequence sections (#81) * partial changes * interop body method * remove getbody recommendation in javadoc * Remove array copy solution in getBytes() and fix javadoc * bump up the version of javaClient * fix javadoc
…#66) * support interop with 2 other body types - amqp-value & amqp-sequence * receivedEvent.getBody() should throw and communicate - in case of interop issues * fix flaky CIT issues * Change exception name from Illegal to Unexpected * implement reSend path for other Body section types
* Fix Interop issue: Amqp body section can be AmqpValue or AmqpSequence (#66) * support interop with 2 other body types - amqp-value & amqp-sequence * receivedEvent.getBody() should throw and communicate - in case of interop issues * fix flaky CIT issues * Change exception name from Illegal to Unexpected * implement reSend path for other Body section types * update proton-j to latest (0.18.0) * handle interop amqpmessage body with value and sequence sections (#81) * partial changes * interop body method * remove getbody recommendation in javadoc * Remove array copy solution in getBytes() and fix javadoc * bump up the version of javaClient * fix javadoc
fix for issue: #59