diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsInlineMessageCreator.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsInlineMessageCreator.cs index a3d45db22a..20a2c90f69 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsInlineMessageCreator.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsInlineMessageCreator.cs @@ -46,11 +46,13 @@ public Message CreateMessage(Amazon.SQS.Model.Message sqsMessage) var timeStamp = HeaderResult.Empty(); var receiptHandle = HeaderResult.Empty(); var replyTo = HeaderResult.Empty(); + var subject = HeaderResult.Empty(); Message message; try { - _messageAttributes = ReadMessageAttributes(sqsMessage); + var jsonDocument = JsonDocument.Parse(sqsMessage.Body); + _messageAttributes = ReadMessageAttributes(jsonDocument); topic = ReadTopic(); messageId = ReadMessageId(); @@ -60,6 +62,7 @@ public Message CreateMessage(Amazon.SQS.Model.Message sqsMessage) messageType = ReadMessageType(); timeStamp = ReadTimestamp(); replyTo = ReadReplyTo(); + subject = ReadMessageSubject(jsonDocument); receiptHandle = ReadReceiptHandle(sqsMessage); //TODO:CLOUD_EVENTS parse from headers @@ -76,10 +79,10 @@ public Message CreateMessage(Amazon.SQS.Model.Message sqsMessage) contentType: contentType.Result, handledCount: handledCount.Result, dataSchema: null, - subject: null, + subject: subject.Result, delayedMilliseconds: 0); - message = new Message(messageHeader, ReadMessageBody(sqsMessage)); + message = new Message(messageHeader, ReadMessageBody(jsonDocument)); //deserialize the bag var bag = ReadMessageBag(); @@ -101,14 +104,12 @@ public Message CreateMessage(Amazon.SQS.Model.Message sqsMessage) return message; } - private Dictionary ReadMessageAttributes(Amazon.SQS.Model.Message sqsMessage) + private static Dictionary ReadMessageAttributes(JsonDocument jsonDocument) { var messageAttributes = new Dictionary(); try { - var jsonDocument = JsonDocument.Parse(sqsMessage.Body); - if (jsonDocument.RootElement.TryGetProperty("MessageAttributes", out var attributes)) { messageAttributes = JsonSerializer.Deserialize>( @@ -238,12 +239,27 @@ private HeaderResult ReadTopic() return new HeaderResult(string.Empty, true); } - private MessageBody ReadMessageBody(Amazon.SQS.Model.Message sqsMessage) + private static HeaderResult ReadMessageSubject(JsonDocument jsonDocument) { try { - var jsonDocument = JsonDocument.Parse(sqsMessage.Body); + if (jsonDocument.RootElement.TryGetProperty("Subject", out var value)) + { + return new HeaderResult(value.GetString(), true); + } + } + catch (Exception ex) + { + s_logger.LogWarning($"Failed to parse Sqs Message Body to valid Json Document, ex: {ex}"); + } + + return new HeaderResult(null, true);; + } + private static MessageBody ReadMessageBody(JsonDocument jsonDocument) + { + try + { if (jsonDocument.RootElement.TryGetProperty("Message", out var value)) { return new MessageBody(value.GetString()); diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessagePublisher.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessagePublisher.cs index 8f07c032f5..183b3df3d5 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessagePublisher.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessagePublisher.cs @@ -45,7 +45,7 @@ public SqsMessagePublisher(string topicArn, AmazonSimpleNotificationServiceClien public string Publish(Message message) { var messageString = message.Body.Value; - var publishRequest = new PublishRequest(_topicArn, messageString); + var publishRequest = new PublishRequest(_topicArn, messageString, message.Header.Subject); var messageAttributes = new Dictionary(); messageAttributes.Add(HeaderNames.Id, new MessageAttributeValue{StringValue = Convert.ToString(message.Header.Id), DataType = "String"}); diff --git a/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs b/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs index 53149d4018..b58a7f0d01 100644 --- a/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs +++ b/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs @@ -37,7 +37,8 @@ public SqsMessageProducerSendTests() SqsSubscription subscription = new( name: new SubscriptionName(channelName), channelName: new ChannelName(channelName), - routingKey: routingKey + routingKey: routingKey, + rawMessageDelivery: false ); _message = new Message( @@ -58,15 +59,18 @@ public SqsMessageProducerSendTests() - [Fact] - public async Task When_posting_a_message_via_the_producer() + [Theory] + [InlineData("test subject")] + [InlineData(null)] + public async Task When_posting_a_message_via_the_producer(string subject) { //arrange + _message.Header.Subject = subject; _messageProducer.Send(_message); - + await Task.Delay(1000); - var message =_channel.Receive(5000); + var message = _channel.Receive(5000); //clear the queue _channel.Acknowledge(message); @@ -82,6 +86,7 @@ public async Task When_posting_a_message_via_the_producer() message.Header.ReplyTo.Should().Be(_replyTo); message.Header.ContentType.Should().Be(_contentType); message.Header.HandledCount.Should().Be(0); + message.Header.Subject.Should().Be(subject); //allow for clock drift in the following test, more important to have a contemporary timestamp than anything message.Header.TimeStamp.Should().BeAfter(RoundToSeconds(DateTime.UtcNow.AddMinutes(-1))); message.Header.DelayedMilliseconds.Should().Be(0); @@ -96,7 +101,7 @@ public void Dispose() _messageProducer?.Dispose(); } - private DateTime RoundToSeconds(DateTime dateTime) + private static DateTime RoundToSeconds(DateTime dateTime) { return new DateTime(dateTime.Ticks - (dateTime.Ticks % TimeSpan.TicksPerSecond), dateTime.Kind); }