diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/CHANGELOG.md b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/CHANGELOG.md
index 27add15da9541..f5a047e6bace7 100755
--- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/CHANGELOG.md
+++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/CHANGELOG.md
@@ -2,6 +2,29 @@
## 5.5.0-beta.2 (Unreleased)
+### Acknowledgments
+
+Thank you to our developer community members who helped to make the Event Hubs client libraries better with their contributions to this release:
+
+- Daniel Marbach _([GitHub](https://github.com/danielmarbach))_
+
+### Changes
+
+#### Features Added
+
+- When stopping, the `EventProcessorClient` will now attempt to force-close the connection to the Event Hubs service to abort in-process read operations blocked on their timeout. This should significantly help reduce the amount of time the processor takes to stop in many scenarios. _(Based on a community prototype contribution, courtesy of [danielmarbach](https://github.com/danielmarbach))_
+
+- When the `EventProcessorClient` detects a partition being stolen outside of a load balancing cycle, it will immediately surrender ownership rather than waiting for a load balancing cycle to confirm the ownership change. This will help reduce event duplication from overlapping ownership of processors.
+
+- The `ConnectionOptions` available when creating a processor now support registering a callback delegate for participating in the validation of SSL certificates when connections are established. This delegate may override the built-in validation and allow or deny certificates based on application-specific logic.
+
+- The `ConnectionOptions` available when creating a processor now support setting a custom size for the send and receive buffers of the transport.
+
+#### Key Bugs Fixed
+
+- The `EventProcessorClient` will now properly respect another another consumer stealing ownership of a partition when the service forcibly terminates the active link in the background. Previously, the client did not observe the error directly and attempted to recover the faulted link which reasserted ownership and caused the partition to "bounce" between owners until a load balancing cycle completed.
+
+- The `EventProcessorClient` will now be less aggressive when considering whether or not to steal a partition, doing so only when it will correct an imbalance and preferring the status quo when the overall distribution would not change. This will help reduce event duplication due to partitions moving between owners.
## 5.5.0-beta.1 (2021-06-08)
@@ -13,7 +36,7 @@ Thank you to our developer community members who helped to make the Event Hubs c
### Changes
-#### New Features
+#### Features Added
- When stopping, the `EventProcessorClient` will now attempt to force-close the connection to the Event Hubs service to abort in-process read operations blocked on their timeout. This should significantly help reduce the amount of time the processor takes to stop in many scenarios. _(Based on a community prototype contribution, courtesy of [danielmarbach](https://github.com/danielmarbach))_
@@ -23,7 +46,7 @@ Thank you to our developer community members who helped to make the Event Hubs c
- The `ConnectionOptions` available when creating a processor now support setting a custom size for the send and receive buffers of the transport.
-#### Key Bug Fixes
+#### Key Bugs Fixed
- The `EventProcessorClient` will now properly respect another another consumer stealing ownership of a partition when the service forcibly terminates the active link in the background. Previously, the client did not observe the error directly and attempted to recover the faulted link which reasserted ownership and caused the partition to "bounce" between owners until a load balancing cycle completed.
@@ -33,13 +56,13 @@ Thank you to our developer community members who helped to make the Event Hubs c
### Changes
-#### New Features
+#### Features Added
- The processor will now perform validation of core configuration and permissions at startup, in order to attempt to detect unrecoverable problems more deterministically. Validation is non-blocking and will not delay claiming of partitions. One important note is that validation should be considered point-in-time and best effort; it is not meant to replace monitoring of error handler activity.
- Partition initialization has been moved to a background operation. This will allow partitions to be more efficiently managed and speed up ownership claims, especially when using the `LoadBalancingStrategy.Greedy` configuration or when the processor is recovering from some error conditions.
-#### Key Bug Fixes
+#### Key Bugs Fixed
- Dependencies have been updated to resolve security warnings for CVE-2021-26701. _(The Event Hubs client library does not make use of the vulnerable components, directly or indirectly)_
@@ -55,13 +78,13 @@ Thank you to our developer community members who helped to make the Event Hubs c
### Changes
-#### New Features
+#### Features Added
- The `EventProcessorClient` now supports shared key and shared access signature authentication using the `AzureNamedKeyCredential` and `AzureSasCredential` types in addition to the connection string. Use of the credential allows the shared key or SAS to be updated without the need to create a new processor.
- Multiple enhancements were made to the AMQP transport paths for reading events to reduce memory allocations and increase performance. _(A community contribution, courtesy of [danielmarbach](https://github.com/danielmarbach))_
-#### Key Bug Fixes
+#### Key Bugs Fixed
- The AMQP library used for transport has been updated, fixing several issues including a potential unobserved `ObjectDisposedException` that could cause the host process to crash. _(see: [release notes](https://github.com/Azure/azure-amqp/releases/tag/v2.4.13))_
@@ -75,7 +98,7 @@ Thank you to our developer community members who helped to make the Event Hubs c
### Changes
-#### Key Bug Fixes
+#### Key Bugs Fixed
- Fixed an issue where long-lived credentials (more than 49 days) were overflowing refresh timer limits and being rejected.
@@ -89,7 +112,7 @@ Thank you to our developer community members who helped to make the Event Hubs c
### Changes
-#### New Features
+#### Features Added
- Additional options for tuning load balancing have been added to the `EventProcessorClientOptions`.
@@ -101,7 +124,7 @@ Thank you to our developer community members who helped to make the Event Hubs c
- Documentation used for auto-completion via Intellisense and other tools has been enhanced in many areas, addressing gaps and commonly asked questions.
-#### Key Bug Fixes
+#### Key Bugs Fixed
- Upgraded the `Microsoft.Azure.Amqp` library to resolve crashes occurring in .NET 5.
@@ -111,13 +134,13 @@ Thank you to our developer community members who helped to make the Event Hubs c
### Changes
-#### New Features
+#### Features Added
- Additional options for tuning load balancing have been added to the `EventProcessorClientOptions`.
- Documentation used for auto-completion via Intellisense and other tools has been enhanced in many areas, addressing gaps and commonly asked questions.
-#### Key Bug Fixes
+#### Key Bugs Fixed
- Upgraded the `Microsoft.Azure.Amqp` library to resolve crashes occurring in .NET 5.
@@ -127,7 +150,7 @@ Thank you to our developer community members who helped to make the Event Hubs c
### Changes
-#### Key Bug Fixes
+#### Key Bugs Fixed
- An issue with package publishing which blocked referencing and use has been fixed.
@@ -135,7 +158,7 @@ Thank you to our developer community members who helped to make the Event Hubs c
### Changes
-#### New Features
+#### Features Added
- The `EventData` representation has been extended with the ability to treat the `Body` as `BinaryData`. `BinaryData` supports a variety of data transformations and allows the ability to provide serialization logic when sending or receiving events. Any type that derives from `ObjectSerializer`, such as `JsonObjectSerializer` can be used, with Schema Registry support available via the `SchemaRegistryAvroObjectSerializer`.
@@ -147,7 +170,7 @@ Thank you to our developer community members who helped to make the Event Hubs c
### Changes
-#### New Features
+#### Features Added
- Introduction of an option for the various event consumers allowing the prefetch cache to be filled based on a size-based heuristic rather than a count of events. This feature is considered a special case, helpful in scenarios where the size of events being read is not able to be known or predicted upfront and limiting resource use is valued over consistent and predictable performance.
@@ -161,7 +184,7 @@ Thank you to our developer community members who helped to make the Event Hubs c
### Changes
-#### Key Bug Fixes
+#### Key Bugs Fixed
- The approach used for creation of checkpoints has been updated to interact with Azure Blob storage more efficiently. This will yield major performance improvements when soft delete was enabled and minor improvements otherwise.
@@ -169,7 +192,7 @@ Thank you to our developer community members who helped to make the Event Hubs c
- Fixed an issue where failure to create an AMQP link would lead to an AMQP session not being explicitly closed, causing connections to the Event Hubs service to remain open until a garbage collection pass was performed.
-#### New Features
+#### Features Added
- Load balancing will now detect when it has reached a balanced state more accurately; this will allow it to operate more efficiently when `LoadBalancingStrategy.Greedy` is in use.
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Resources.Designer.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Resources.Designer.cs
index 41e3b800bc4cd..27d5f83044c62 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Resources.Designer.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Resources.Designer.cs
@@ -780,5 +780,27 @@ internal static string DictionaryKeyNotFoundMask
return ResourceManager.GetString("DictionaryKeyNotFoundMask", resourceCulture);
}
}
+
+ ///
+ /// Looks up a localized string similar to {0} is not a supported value body type..
+ ///
+ internal static string InvalidAmqpMessageValueBodyMask
+ {
+ get
+ {
+ return ResourceManager.GetString("InvalidAmqpMessageValueBodyMask", resourceCulture);
+ }
+ }
+
+ ///
+ /// Looks up a localized string similar to The {0} key `{1}` has a value of type `{2}` which is not supported for AMQP transport..
+ ///
+ internal static string InvalidAmqpMessageDictionaryTypeMask
+ {
+ get
+ {
+ return ResourceManager.GetString("InvalidAmqpMessageDictionaryTypeMask", resourceCulture);
+ }
+ }
}
}
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Resources.resx b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Resources.resx
index 13e7214092763..a71183654e770 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Resources.resx
+++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Resources.resx
@@ -309,4 +309,10 @@
The given key '{0}' was not present in the dictionary.
+
+ {0} is not a supported value body type.
+
+
+ The {0} key `{1}` has a value of type `{2}` which is not supported for AMQP transport.
+
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md b/sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md
index d79f55449bd70..9e415237a0e6e 100755
--- a/sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md
@@ -2,6 +2,43 @@
## 5.5.0-beta.2 (Unreleased)
+### Acknowledgments
+
+Thank you to our developer community members who helped to make the Event Hubs client libraries better with their contributions to this release:
+
+- Daniel Marbach _([GitHub](https://github.com/danielmarbach))_
+
+### Changes
+
+#### Features Added
+
+- The `EventData` type offers a curated set of the information available for messages using the AMQP protocol. While this results in a simpler and more easily understood API surface for an event, it limits interoperability with other message brokers. To support heterogeneous environments or those with specialized needs, the full AMQP message is now available using the `GetRawAmqpMessage` method. _(Based on a community prototype contribution, courtesy of [danielmarbach](https://github.com/danielmarbach))_
+
+- `EventData` now supports construction using a `string` to specify the event body; this will be represented as a set of UTF-8 encoded bytes for transport.
+
+- `EventData` has been extended to include properties for applications to assign a `MessageId`, `ContentType`, and `CorrelationId` as well-known members rather than embedding them in the `Properties` dictionary. It is important to note that these properties are intended for application use and are not recognized by the Event Hubs service.
+
+- When stopping, the `EventProcessor` will now attempt to force-close the connection to the Event Hubs service to abort in-process read operations blocked on their timeout. This should significantly help reduce the amount of time the processor takes to stop in many scenarios. _(Based on a community prototype contribution, courtesy of [danielmarbach](https://github.com/danielmarbach))_
+
+- When the `EventProcessor` detects a partition being stolen outside of a load balancing cycle, it will immediately surrender ownership rather than waiting for a load balancing cycle to confirm the ownership change. This will help reduce event duplication from overlapping ownership of processors.
+
+- The `EventProcessor` now exposes the `ListPartitionIdsAsync` method, allowing custom processors to control the set of partitions known to the processor. This can be used to reduce complexity when a custom processor is directly assigned a set of partitions to process rather than using load balancing to control ownership.
+
+- The `ConnectionOptions` available when creating client types now support registering a callback delegate for participating in the validation of SSL certificates when connections are established. This delegate may override the built-in validation and allow or deny certificates based on application-specific logic.
+
+- The `ConnectionOptions` available when creating client types now support setting a custom size for the send and receive buffers of the transport.
+
+- Additional verbose logging has been added to allow monitoring of lower-level AMQP operations such as creating links, terminal exceptions that fault a link without an active operation, and when the service force-closes links.
+
+#### Key Bugs Fixed
+
+- The `EventProcessor` will now properly respect another another consumer stealing ownership of a partition when the service forcibly terminates the active link in the background. Previously, the client did not observe the error directly and attempted to recover the faulted link which reasserted ownership and caused the partition to "bounce" between owners until a load balancing cycle completed.
+
+- The `EventProcessor` will now be less aggressive when considering whether or not to steal a partition, doing so only when it will correct an imbalance and preferring the status quo when the overall distribution would not change. This will help reduce event duplication due to partitions moving between owners.
+
+- The `EventHubConsumerClient` and `PartitionReceiver` will now properly surface an exception when another another consumer stealing ownership of a partition when the service forcibly terminates the active link in the background. Previously, the client did not observe the error directly and did not make callers attempted to recover the faulted link which reasserted ownership and caused the partition to "bounce" between owners until a load balancing cycle completed.
+
+- The retry policy used by clients will no longer overflow the `TimeSpan` maximum when using an `Exponential` strategy with a large number of retries and long delay set.
## 5.5.0-beta.1 (2021-06-08)
@@ -13,7 +50,7 @@ Thank you to our developer community members who helped to make the Event Hubs c
### Changes
-#### New Features
+#### Features Added
- When stopping, the `EventProcessor` will now attempt to force-close the connection to the Event Hubs service to abort in-process read operations blocked on their timeout. This should significantly help reduce the amount of time the processor takes to stop in many scenarios. _(Based on a community prototype contribution, courtesy of [danielmarbach](https://github.com/danielmarbach))_
@@ -27,7 +64,7 @@ Thank you to our developer community members who helped to make the Event Hubs c
- Additional verbose logging has been added to allow monitoring of lower-level AMQP operations such as creating links, terminal exceptions that fault a link without an active operation, and when the service force-closes links.
-#### Key Bug Fixes
+#### Key Bugs Fixed
- The `EventProcessor` will now properly respect another another consumer stealing ownership of a partition when the service forcibly terminates the active link in the background. Previously, the client did not observe the error directly and attempted to recover the faulted link which reasserted ownership and caused the partition to "bounce" between owners until a load balancing cycle completed.
@@ -41,13 +78,13 @@ Thank you to our developer community members who helped to make the Event Hubs c
### Changes
-#### New Features
+#### Features Added
- `EventProcessor` will now perform validation of core configuration and permissions at startup, in order to attempt to detect unrecoverable problems more deterministically. Validation is non-blocking and will not delay claiming of partitions. One important note is that validation should be considered point-in-time and best effort; it is not meant to replace monitoring of error handler activity.
- Partition initialization for `EventProcessor` has been moved to a background operation. This will allow partitions to be more efficiently managed and speed up ownership claims, especially when using the `LoadBalancingStrategy.Greedy` configuration or when the processor is recovering from some error conditions.
-#### Key Bug Fixes
+#### Key Bugs Fixed
- Dependencies have been updated to resolve security warnings for CVE-2021-26701. _(The Event Hubs client library does not make use of the vulnerable components, directly or indirectly)_
@@ -67,7 +104,7 @@ Thank you to our developer community members who helped to make the Event Hubs c
### Changes
-#### New Features
+#### Features Added
- The Event Hubs clients now support shared key and shared access signature authentication using the `AzureNamedKeyCredential` and `AzureSasCredential` types in addition to the connection string. Use of the credential allows the shared key or SAS to be updated without the need to create a new Event Hubs client.
@@ -77,7 +114,7 @@ Thank you to our developer community members who helped to make the Event Hubs c
- Multiple enhancements were made to the transport paths for publishing and reading events to reduce memory allocations and increase performance. _(A community contribution, courtesy of [danielmarbach](https://github.com/danielmarbach))_
-#### Key Bug Fixes
+#### Key Bugs Fixed
- The AMQP library used for transport has been updated, fixing several issues including a potential unobserved `ObjectDisposedException` that could cause the host process to crash. _(see: [release notes](https://github.com/Azure/azure-amqp/releases/tag/v2.4.13))_
@@ -85,7 +122,7 @@ Thank you to our developer community members who helped to make the Event Hubs c
### Changes
-#### New Features
+#### Features Added
- Returned the idempotent publishing feature to the public API surface.
@@ -93,7 +130,7 @@ Thank you to our developer community members who helped to make the Event Hubs c
### Changes
-#### Key Bug Fixes
+#### Key Bugs Fixed
- Fixed an issue where long-lived credentials (more than 49 days) were overflowing refresh timer limits and being rejected.
@@ -107,7 +144,7 @@ Thank you to our developer community members who helped to make the Event Hubs c
### Changes
-#### New Features
+#### Features Added
- Connection strings can now be parsed into their key/value pairs using the `EventHubsConnectionStringProperties` class.
@@ -121,7 +158,7 @@ Thank you to our developer community members who helped to make the Event Hubs c
- Documentation used for auto-completion via Intellisense and other tools has been enhanced in many areas, addressing gaps and commonly asked questions.
-#### Key Bug Fixes
+#### Key Bugs Fixed
- Upgraded the `Microsoft.Azure.Amqp` library to resolve crashes occurring in .NET 5.
@@ -135,7 +172,7 @@ Thank you to our developer community members who helped to make the Event Hubs c
### Changes
-#### New Features
+#### Features Added
- Connection strings can now be parsed into their key/value pairs using the `EventHubsConnectionStringProperties` class.
@@ -143,7 +180,7 @@ Thank you to our developer community members who helped to make the Event Hubs c
- Documentation used for auto-completion via Intellisense and other tools has been enhanced in many areas, addressing gaps and commonly asked questions.
-#### Key Bug Fixes
+#### Key Bugs Fixed
- The `EventHubsException.ToString` result will now properly follow the format of other .NET exception output.
@@ -155,7 +192,7 @@ Thank you to our developer community members who helped to make the Event Hubs c
### Changes
-#### Key Bug Fixes
+#### Key Bugs Fixed
- An issue with package publishing which blocked referencing and use has been fixed.
@@ -163,7 +200,7 @@ Thank you to our developer community members who helped to make the Event Hubs c
### Changes
-#### New Features
+#### Features Added
- The `EventData` representation has been extended with the ability to treat the `Body` as `BinaryData`. `BinaryData` supports a variety of data transformations and allows the ability to provide serialization logic when sending or receiving events. Any type that derives from `ObjectSerializer`, such as `JsonObjectSerializer` can be used, with Schema Registry support available via the `SchemaRegistryAvroObjectSerializer`.
@@ -177,7 +214,7 @@ Thank you to our developer community members who helped to make the Event Hubs c
### Changes
-#### New Features
+#### Features Added
- Introduction of an option for the various event consumers allowing the prefetch cache to be filled based on a size-based heuristic rather than a count of events. This feature is considered a special case, helpful in scenarios where the size of events being read is not able to be known or predicted upfront and limiting resource use is valued over consistent and predictable performance.
@@ -191,7 +228,7 @@ Thank you to our developer community members who helped to make the Event Hubs c
### Changes
-#### Key Bug Fixes
+#### Key Bugs Fixed
- The underlying AMQP library has been enhanced for more efficient resource usage; this will result in a noticeable reduction in memory use in common consuming scenarios. (A community contribution, courtesy of _[danielmarbach](https://github.com/danielmarbach))_
@@ -201,7 +238,7 @@ Thank you to our developer community members who helped to make the Event Hubs c
- Fixed an issue where failure to create an AMQP link would lead to an AMQP session not being explicitly closed, causing connections to the Event Hubs service to remain open until a garbage collection pass was performed.
-#### New Features
+#### Features Added
- The `EventProcessor` now supports a configurable strategy for load balancing, allowing control over whether it claims ownership of partitions in a balanced manner _(default)_ or more aggressively. The strategy may be set in the `EventProcessorOptions` when creating the processor. More details about strategies can be found in the associated [documentation](https://docs.microsoft.com/dotnet/api/azure.messaging.eventhubs.processor.loadbalancingstrategy?view=azure-dotnet).
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpMessageConverter.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpMessageConverter.cs
index 0b6e7419c47d8..f61413a15d764 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpMessageConverter.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpMessageConverter.cs
@@ -9,6 +9,7 @@
using System.Runtime.InteropServices;
using System.Runtime.Serialization;
using Azure.Core;
+using Azure.Core.Amqp;
using Azure.Messaging.EventHubs.Diagnostics;
using Microsoft.Azure.Amqp;
using Microsoft.Azure.Amqp.Encoding;
@@ -26,6 +27,23 @@ internal class AmqpMessageConverter
/// The size, in bytes, to use as a buffer for stream operations.
private const int StreamBufferSizeInBytes = 512;
+ /// The set of key names for annotations known to be DateTime-based system properties.
+ private static readonly HashSet SystemPropertyDateTimeKeys = new()
+ {
+ AmqpProperty.EnqueuedTime.ToString(),
+ AmqpProperty.PartitionLastEnqueuedTimeUtc.ToString(),
+ AmqpProperty.LastPartitionPropertiesRetrievalTimeUtc.ToString()
+ };
+
+ /// The set of key names for annotations known to be long-based system properties.
+ private static readonly HashSet SystemPropertyLongKeys = new()
+ {
+ AmqpProperty.SequenceNumber.ToString(),
+ AmqpProperty.Offset.ToString(),
+ AmqpProperty.PartitionLastEnqueuedSequenceNumber.ToString(),
+ AmqpProperty.PartitionLastEnqueuedOffset.ToString()
+ };
+
///
/// Converts a given source into its corresponding
/// AMQP representation.
@@ -315,26 +333,200 @@ private static AmqpMessage BuildAmqpBatchFromMessages(IEnumerable s
private static AmqpMessage BuildAmqpMessageFromEvent(EventData source,
string partitionKey)
{
- if (!MemoryMarshal.TryGetArray(source.EventBody.ToMemory(), out var bodySegment))
+ var sourceMessage = source.GetRawAmqpMessage();
+
+ var message = sourceMessage switch
{
- bodySegment = new ArraySegment(source.EventBody.ToArray());
+ _ when sourceMessage.Body.TryGetData(out var dataBody) => AmqpMessage.Create(TranslateDataBody(dataBody)),
+ _ when sourceMessage.Body.TryGetSequence(out var sequenceBody) => AmqpMessage.Create(TranslateSequenceBody(sequenceBody)),
+ _ when sourceMessage.Body.TryGetValue(out var valueBody) => AmqpMessage.Create(TranslateValueBody(valueBody)),
+ _ => AmqpMessage.Create(new Data { Value = new ArraySegment(Array.Empty()) })
+ };
+
+ // Header
+
+ if (sourceMessage.HasSection(AmqpMessageSection.Header))
+ {
+ if (sourceMessage.Header.DeliveryCount.HasValue)
+ {
+ message.Header.DeliveryCount = sourceMessage.Header.DeliveryCount;
+ }
+
+ if (sourceMessage.Header.Durable.HasValue)
+ {
+ message.Header.Durable = sourceMessage.Header.Durable;
+ }
+
+ if (sourceMessage.Header.Priority.HasValue)
+ {
+ message.Header.Priority = sourceMessage.Header.Priority;
+ }
+
+ if (sourceMessage.Header.TimeToLive.HasValue)
+ {
+ message.Header.Ttl = (uint?)sourceMessage.Header.TimeToLive.Value.TotalMilliseconds;
+ }
+
+ if (sourceMessage.Header.FirstAcquirer.HasValue)
+ {
+ message.Header.FirstAcquirer = sourceMessage.Header.FirstAcquirer;
+ }
+
+ if (sourceMessage.Header.DeliveryCount.HasValue)
+ {
+ message.Header.DeliveryCount = sourceMessage.Header.DeliveryCount;
+ }
}
- var message = AmqpMessage.Create(new Data { Value = bodySegment });
+ // Properties
- if ((source.HasProperties) && (source.Properties.Count > 0))
+ if (sourceMessage.HasSection(AmqpMessageSection.Properties))
+ {
+ if (sourceMessage.Properties.AbsoluteExpiryTime.HasValue)
+ {
+ message.Properties.AbsoluteExpiryTime = sourceMessage.Properties.AbsoluteExpiryTime.Value.UtcDateTime;
+ }
+
+ if (!string.IsNullOrEmpty(sourceMessage.Properties.ContentEncoding))
+ {
+ message.Properties.ContentEncoding = sourceMessage.Properties.ContentEncoding;
+ }
+
+ if (!string.IsNullOrEmpty(sourceMessage.Properties.ContentType))
+ {
+ message.Properties.ContentType = sourceMessage.Properties.ContentType;
+ }
+
+ if (sourceMessage.Properties.CorrelationId.HasValue)
+ {
+ message.Properties.CorrelationId = sourceMessage.Properties.CorrelationId.Value.ToString();
+ }
+
+ if (sourceMessage.Properties.CreationTime.HasValue)
+ {
+ message.Properties.CreationTime = sourceMessage.Properties.CreationTime.Value.UtcDateTime;
+ }
+
+ if (!string.IsNullOrEmpty(sourceMessage.Properties.GroupId))
+ {
+ message.Properties.GroupId = sourceMessage.Properties.GroupId;
+ }
+
+ if (sourceMessage.Properties.GroupSequence.HasValue)
+ {
+ message.Properties.GroupSequence = sourceMessage.Properties.GroupSequence;
+ }
+
+ if (sourceMessage.Properties.MessageId.HasValue)
+ {
+ message.Properties.MessageId = sourceMessage.Properties.MessageId.Value.ToString();
+ }
+
+ if (sourceMessage.Properties.ReplyTo.HasValue)
+ {
+ message.Properties.ReplyTo = sourceMessage.Properties.ReplyTo.Value.ToString();
+ }
+
+ if (!string.IsNullOrEmpty(sourceMessage.Properties.ReplyToGroupId))
+ {
+ message.Properties.ReplyToGroupId = sourceMessage.Properties.ReplyToGroupId;
+ }
+
+ if (!string.IsNullOrEmpty(sourceMessage.Properties.Subject))
+ {
+ message.Properties.Subject = sourceMessage.Properties.Subject;
+ }
+
+ if (sourceMessage.Properties.To.HasValue)
+ {
+ message.Properties.To = sourceMessage.Properties.To.Value.ToString();
+ }
+
+ if (sourceMessage.Properties.UserId.HasValue)
+ {
+ if (MemoryMarshal.TryGetArray(sourceMessage.Properties.UserId.Value, out var segment))
+ {
+ message.Properties.UserId = segment;
+ }
+ else
+ {
+ message.Properties.UserId = new ArraySegment(sourceMessage.Properties.UserId.Value.ToArray());
+ }
+ }
+ }
+
+ // Application Properties
+
+ if ((sourceMessage.HasSection(AmqpMessageSection.ApplicationProperties)) && (sourceMessage.ApplicationProperties.Count > 0))
{
message.ApplicationProperties ??= new ApplicationProperties();
- foreach (KeyValuePair pair in source.Properties)
+ foreach (var pair in sourceMessage.ApplicationProperties)
{
if (TryCreateAmqpPropertyValueForEventProperty(pair.Value, out var amqpValue))
{
message.ApplicationProperties.Map[pair.Key] = amqpValue;
}
+ else
+ {
+ throw new NotSupportedException(string.Format(CultureInfo.CurrentCulture, Resources.InvalidAmqpMessageDictionaryTypeMask, nameof(sourceMessage.ApplicationProperties), pair.Key, pair.Value.GetType().Name));
+ }
+ }
+ }
+
+ // Message Annotations
+
+ if (sourceMessage.HasSection(AmqpMessageSection.MessageAnnotations))
+ {
+ foreach (var pair in sourceMessage.MessageAnnotations)
+ {
+ if (TryCreateAmqpPropertyValueForEventProperty(pair.Value, out var amqpValue))
+ {
+ message.MessageAnnotations.Map[pair.Key] = amqpValue;
+ }
+ else
+ {
+ throw new NotSupportedException(string.Format(CultureInfo.CurrentCulture, Resources.InvalidAmqpMessageDictionaryTypeMask, nameof(sourceMessage.MessageAnnotations), pair.Key, pair.Value.GetType().Name));
+ }
+ }
+ }
+
+ // Delivery Annotations
+
+ if (sourceMessage.HasSection(AmqpMessageSection.DeliveryAnnotations))
+ {
+ foreach (var pair in sourceMessage.DeliveryAnnotations)
+ {
+ if (TryCreateAmqpPropertyValueForEventProperty(pair.Value, out var amqpValue))
+ {
+ message.DeliveryAnnotations.Map[pair.Key] = amqpValue;
+ }
+ else
+ {
+ throw new NotSupportedException(string.Format(CultureInfo.CurrentCulture, Resources.InvalidAmqpMessageDictionaryTypeMask, nameof(sourceMessage.DeliveryAnnotations), pair.Key, pair.Value.GetType().Name));
+ }
+ }
+ }
+
+ // Footer
+
+ if (sourceMessage.HasSection(AmqpMessageSection.Footer))
+ {
+ foreach (var pair in sourceMessage.Footer)
+ {
+ if (TryCreateAmqpPropertyValueForEventProperty(pair.Value, out var amqpValue))
+ {
+ message.Footer.Map[pair.Key] = amqpValue;
+ }
+ else
+ {
+ throw new NotSupportedException(string.Format(CultureInfo.CurrentCulture, Resources.InvalidAmqpMessageDictionaryTypeMask, nameof(sourceMessage.Footer), pair.Key, pair.Value.GetType().Name));
+ }
}
}
+ // Special cases
+
if (!string.IsNullOrEmpty(partitionKey))
{
message.MessageAnnotations.Map[AmqpProperty.PartitionKey] = partitionKey;
@@ -368,190 +560,337 @@ private static AmqpMessage BuildAmqpMessageFromEvent(EventData source,
///
private static EventData BuildEventFromAmqpMessage(AmqpMessage source)
{
- var body = (source.BodyType.HasFlag(SectionFlag.Data))
- ? ReadAmqpDataBody(source.DataBody)
- : new BinaryData(ReadOnlyMemory.Empty);
+ var message = source switch
+ {
+ _ when TryGetDataBody(source, out var dataBody) => new AmqpAnnotatedMessage(dataBody),
+ _ when TryGetSequenceBody(source, out var sequenceBody) => new AmqpAnnotatedMessage(sequenceBody),
+ _ when TryGetValueBody(source, out var valueBody) => new AmqpAnnotatedMessage(valueBody),
+ _ => new AmqpAnnotatedMessage(AmqpMessageBody.FromData(MessageBody.FromReadOnlyMemorySegment(ReadOnlyMemory.Empty)))
+ };
- ParsedAnnotations systemAnnotations = ParseSystemAnnotations(source);
+ // Header
- // If there were application properties associated with the message, translate them
- // to the event.
+ if ((source.Sections & SectionFlag.Header) > 0)
+ {
+ if (source.Header.DeliveryCount.HasValue)
+ {
+ message.Header.DeliveryCount = source.Header.DeliveryCount;
+ }
- var properties = default(Dictionary);
+ if (source.Header.Durable.HasValue)
+ {
+ message.Header.Durable = source.Header.Durable;
+ }
- if (source.Sections.HasFlag(SectionFlag.ApplicationProperties))
- {
- properties = new Dictionary();
+ if (source.Header.Priority.HasValue)
+ {
+ message.Header.Priority = source.Header.Priority;
+ }
- foreach (KeyValuePair pair in source.ApplicationProperties.Map)
+ if (source.Header.FirstAcquirer.HasValue)
{
- if (TryCreateEventPropertyForAmqpProperty(pair.Value, out object propertyValue))
- {
- properties[pair.Key.ToString()] = propertyValue;
- }
+ message.Header.FirstAcquirer = source.Header.FirstAcquirer;
+ }
+
+ if (source.Header.DeliveryCount.HasValue)
+ {
+ message.Header.DeliveryCount = source.Header.DeliveryCount;
+ }
+
+ if (source.Header.Ttl.HasValue)
+ {
+ message.Header.TimeToLive = TimeSpan.FromMilliseconds(source.Header.Ttl.Value);
}
}
- return new EventData(
- eventBody: body,
- properties: properties,
- systemProperties: systemAnnotations.ServiceAnnotations,
- sequenceNumber: systemAnnotations.SequenceNumber ?? long.MinValue,
- offset: systemAnnotations.Offset ?? long.MinValue,
- enqueuedTime: systemAnnotations.EnqueuedTime ?? default,
- partitionKey: systemAnnotations.PartitionKey,
- lastPartitionSequenceNumber: systemAnnotations.LastSequenceNumber,
- lastPartitionOffset: systemAnnotations.LastOffset,
- lastPartitionEnqueuedTime: systemAnnotations.LastEnqueuedTime,
- lastPartitionPropertiesRetrievalTime: systemAnnotations.LastReceivedTime);
- }
+ // Properties
- ///
- /// Parses the annotations set by the Event Hubs service on the
- /// associated with an event, extracting them into a consumable form.
- ///
- ///
- /// The message to use as the source of the event.
- ///
- /// The parsed from the source message.
- ///
- private static ParsedAnnotations ParseSystemAnnotations(AmqpMessage source)
- {
- var systemProperties = new ParsedAnnotations();
+ if ((source.Sections & SectionFlag.Properties) > 0)
+ {
+ if (source.Properties.AbsoluteExpiryTime.HasValue)
+ {
+ message.Properties.AbsoluteExpiryTime = source.Properties.AbsoluteExpiryTime;
+ }
- object amqpValue;
- object propertyValue;
+ if (!string.IsNullOrEmpty(source.Properties.ContentEncoding.Value))
+ {
+ message.Properties.ContentEncoding = source.Properties.ContentEncoding.Value;
+ }
- // Process the message annotations.
+ if (!string.IsNullOrEmpty(source.Properties.ContentType.Value))
+ {
+ message.Properties.ContentType = source.Properties.ContentType.Value;
+ }
- if (source.Sections.HasFlag(SectionFlag.MessageAnnotations))
- {
- systemProperties.ServiceAnnotations ??= new Dictionary();
+ if (source.Properties.CorrelationId != null)
+ {
+ message.Properties.CorrelationId = new AmqpMessageId(source.Properties.CorrelationId.ToString());
+ }
- var annotations = source.MessageAnnotations.Map;
- var processed = new HashSet();
+ if (source.Properties.CreationTime.HasValue)
+ {
+ message.Properties.CreationTime = source.Properties.CreationTime;
+ }
- if ((annotations.TryGetValue(AmqpProperty.EnqueuedTime, out amqpValue))
- && (TryCreateEventPropertyForAmqpProperty(amqpValue, out propertyValue)))
+ if (!string.IsNullOrEmpty(source.Properties.GroupId))
{
- systemProperties.EnqueuedTime = propertyValue switch
- {
- DateTime dateValue => new DateTimeOffset(dateValue, TimeSpan.Zero),
- long longValue => new DateTimeOffset(longValue, TimeSpan.Zero),
- _ => (DateTimeOffset)propertyValue
- };
+ message.Properties.GroupId = source.Properties.GroupId;
+ }
- processed.Add(AmqpProperty.EnqueuedTime.ToString());
+ if (source.Properties.GroupSequence.HasValue)
+ {
+ message.Properties.GroupSequence = source.Properties.GroupSequence;
}
- if ((annotations.TryGetValue(AmqpProperty.SequenceNumber, out amqpValue))
- && (TryCreateEventPropertyForAmqpProperty(amqpValue, out propertyValue)))
+ if (source.Properties.MessageId != null)
{
- systemProperties.SequenceNumber = (long)propertyValue;
- processed.Add(AmqpProperty.SequenceNumber.ToString());
+ message.Properties.MessageId = new AmqpMessageId(source.Properties.MessageId.ToString());
}
- if ((annotations.TryGetValue(AmqpProperty.Offset, out amqpValue))
- && (TryCreateEventPropertyForAmqpProperty(amqpValue, out propertyValue))
- && (long.TryParse((string)propertyValue, NumberStyles.Integer, CultureInfo.InvariantCulture, out var offset)))
+ if (source.Properties.ReplyTo != null)
{
- systemProperties.Offset = offset;
- processed.Add(AmqpProperty.Offset.ToString());
+ message.Properties.ReplyTo = new AmqpAddress(source.Properties.ReplyTo.ToString());
}
- if ((annotations.TryGetValue(AmqpProperty.PartitionKey, out amqpValue))
- && (TryCreateEventPropertyForAmqpProperty(amqpValue, out propertyValue)))
+ if (!string.IsNullOrEmpty(source.Properties.ReplyToGroupId))
{
- systemProperties.PartitionKey = (string)propertyValue;
- processed.Add(AmqpProperty.PartitionKey.ToString());
+ message.Properties.ReplyToGroupId = source.Properties.ReplyToGroupId;
}
- string key;
+ if (!string.IsNullOrEmpty(source.Properties.Subject))
+ {
+ message.Properties.Subject = source.Properties.Subject;
+ }
- foreach (KeyValuePair pair in annotations)
+ if (source.Properties.To != null)
{
- key = pair.Key.ToString();
+ message.Properties.To = new AmqpAddress(source.Properties.To.ToString());
+ }
- if ((!processed.Contains(key))
- && (TryCreateEventPropertyForAmqpProperty(pair.Value, out propertyValue)))
- {
- systemProperties.ServiceAnnotations.Add(key, propertyValue);
- processed.Add(key);
- }
+ if (source.Properties.UserId != null)
+ {
+ message.Properties.UserId = source.Properties.UserId;
}
}
- // Process the delivery annotations.
+ // Application Properties
- if (source.Sections.HasFlag(SectionFlag.DeliveryAnnotations))
+ if ((source.Sections & SectionFlag.ApplicationProperties) > 0)
{
- if ((source.DeliveryAnnotations.Map.TryGetValue(AmqpProperty.PartitionLastEnqueuedTimeUtc, out amqpValue))
- && (TryCreateEventPropertyForAmqpProperty(amqpValue, out propertyValue)))
+ foreach (var pair in source.ApplicationProperties.Map)
{
- systemProperties.LastEnqueuedTime = propertyValue switch
+ if (TryCreateEventPropertyForAmqpProperty(pair.Value, out var eventValue))
{
- DateTime dateValue => new DateTimeOffset(dateValue, TimeSpan.Zero),
- long longValue => new DateTimeOffset(longValue, TimeSpan.Zero),
- _ => (DateTimeOffset)propertyValue
- };
+ message.ApplicationProperties[pair.Key.ToString()] = eventValue;
+ }
}
+ }
- if ((source.DeliveryAnnotations.Map.TryGetValue(AmqpProperty.PartitionLastEnqueuedSequenceNumber, out amqpValue))
- && (TryCreateEventPropertyForAmqpProperty(amqpValue, out propertyValue)))
- {
- systemProperties.LastSequenceNumber = (long)propertyValue;
- }
+ // Message Annotations
- if ((source.DeliveryAnnotations.Map.TryGetValue(AmqpProperty.PartitionLastEnqueuedOffset, out amqpValue))
- && (TryCreateEventPropertyForAmqpProperty(amqpValue, out propertyValue))
- && (long.TryParse((string)propertyValue, NumberStyles.Integer, CultureInfo.InvariantCulture, out var offset)))
+ if ((source.Sections & SectionFlag.MessageAnnotations) > 0)
+ {
+ foreach (var pair in source.MessageAnnotations.Map)
{
- systemProperties.LastOffset = offset;
+ if (TryCreateEventPropertyForAmqpProperty(pair.Value, out var eventValue))
+ {
+ if (SystemPropertyDateTimeKeys.Contains(pair.Key.ToString()))
+ {
+ eventValue = eventValue switch
+ {
+ DateTime dateValue => new DateTimeOffset(dateValue, TimeSpan.Zero),
+ long longValue => new DateTimeOffset(longValue, TimeSpan.Zero),
+ _ => eventValue
+ };
+ }
+ else if (SystemPropertyLongKeys.Contains(pair.Key.ToString()))
+ {
+ eventValue = eventValue switch
+ {
+ string stringValue when long.TryParse(stringValue, NumberStyles.Integer, CultureInfo.InvariantCulture, out var longValue) => longValue,
+ _ => eventValue
+ };
+ }
+
+ message.MessageAnnotations[pair.Key.ToString()] = eventValue;
+ }
}
+ }
- if ((source.DeliveryAnnotations.Map.TryGetValue(AmqpProperty.LastPartitionPropertiesRetrievalTimeUtc, out amqpValue))
- && (TryCreateEventPropertyForAmqpProperty(amqpValue, out propertyValue)))
+ // Delivery Annotations
+
+ if ((source.Sections & SectionFlag.DeliveryAnnotations) > 0)
+ {
+ foreach (var pair in source.DeliveryAnnotations.Map)
{
- systemProperties.LastReceivedTime = propertyValue switch
+ if (TryCreateEventPropertyForAmqpProperty(pair.Value, out var eventValue))
{
- DateTime dateValue => new DateTimeOffset(dateValue, TimeSpan.Zero),
- long longValue => new DateTimeOffset(longValue, TimeSpan.Zero),
- _ => (DateTimeOffset)propertyValue
- };
+ if (SystemPropertyDateTimeKeys.Contains(pair.Key.ToString()))
+ {
+ eventValue = eventValue switch
+ {
+ DateTime dateValue => new DateTimeOffset(dateValue, TimeSpan.Zero),
+ long longValue => new DateTimeOffset(longValue, TimeSpan.Zero),
+ _ => eventValue
+ };
+ }
+ else if (SystemPropertyLongKeys.Contains(pair.Key.ToString()))
+ {
+ eventValue = eventValue switch
+ {
+ string stringValue when long.TryParse(stringValue, NumberStyles.Integer, CultureInfo.InvariantCulture, out var longValue) => longValue,
+ _ => eventValue
+ };
+ }
+
+ message.DeliveryAnnotations[pair.Key.ToString()] = eventValue;
+ }
}
}
- // Process the properties annotations
+ // Footer
- if (source.Sections.HasFlag(SectionFlag.Properties))
+ if ((source.Sections & SectionFlag.Footer) > 0)
{
- var properties = source.Properties;
-
- void conditionalAdd(string name, object value, bool condition)
+ foreach (var pair in source.Footer.Map)
{
- if (condition)
+ if (TryCreateEventPropertyForAmqpProperty(pair.Value, out var eventValue))
{
- systemProperties.ServiceAnnotations ??= new Dictionary();
- systemProperties.ServiceAnnotations.Add(name, value);
+ message.Footer[pair.Key.ToString()] = eventValue;
}
}
+ }
+
+ return new EventData(message);
+ }
+
+ ///
+ /// Translates the data body segments into the corresponding set of
+ /// instances.
+ ///
+ ///
+ /// The data body to translate.
+ ///
+ /// The set of instances that represents the .
+ ///
+ private static IEnumerable TranslateDataBody(IEnumerable> dataBody)
+ {
+ foreach (var bodySegment in dataBody)
+ {
+ if (!MemoryMarshal.TryGetArray(bodySegment, out ArraySegment dataSegment))
+ {
+ dataSegment = new ArraySegment(bodySegment.ToArray());
+ }
+
+ yield return new Data
+ {
+ Value = dataSegment
+ };
+ }
+ }
+
+ ///
+ /// Translates the data body elements into the corresponding set of
+ /// instances.
+ ///
+ ///
+ /// The sequence body to translate.
+ ///
+ /// The set of instances that represents the in AMQP format.
+ ///
+ private static IEnumerable TranslateSequenceBody(IEnumerable> sequenceBody)
+ {
+ foreach (var item in sequenceBody)
+ {
+ yield return new AmqpSequence((System.Collections.IList)item);
+ }
+ }
+
+ ///
+ /// Translates the data body into the corresponding set of
+ /// instance.
+ ///
+ ///
+ /// The sequence body to translate.
+ ///
+ /// The instance that represents the in AMQP format.
+ ///
+ private static AmqpValue TranslateValueBody(object valueBody)
+ {
+ if (TryCreateAmqpPropertyValueForEventProperty(valueBody, out var amqpValue, allowBodyTypes: true))
+ {
+ return new AmqpValue { Value = amqpValue };
+ }
+
+ throw new NotSupportedException(string.Format(CultureInfo.CurrentCulture, Resources.InvalidAmqpMessageValueBodyMask, valueBody.GetType().Name));
+ }
+
+ ///
+ /// Attempts to read the data body of an .
+ ///
+ ///
+ /// The to read from.
+ /// The value of the data body, if read.
+ ///
+ /// true if the body was successfully read; otherwise, false.
+ ///
+ private static bool TryGetDataBody(AmqpMessage source, out AmqpMessageBody dataBody)
+ {
+ if (((source.BodyType & SectionFlag.Data) == 0) || (source.DataBody == null))
+ {
+ dataBody = null;
+ return false;
+ }
+
+ dataBody = AmqpMessageBody.FromData(MessageBody.FromDataSegments(source.DataBody));
+ return true;
+ }
+
+ ///
+ /// Attempts to read the sequence body of an .
+ ///
+ ///
+ /// The to read from.
+ /// The value of the sequence body, if read.
+ ///
+ /// true if the body was successfully read; otherwise, false.
+ ///
+ private static bool TryGetSequenceBody(AmqpMessage source, out AmqpMessageBody sequenceBody)
+ {
+ if ((source.BodyType & SectionFlag.AmqpSequence) == 0)
+ {
+ sequenceBody = null;
+ return false;
+ }
+
+ sequenceBody = AmqpMessageBody.FromSequence(source.SequenceBody.Select(item => (IList