From c7103e1ba0ea1d99c8409c24371f8d9bc435b233 Mon Sep 17 00:00:00 2001 From: Tim Salva Date: Wed, 21 Aug 2024 11:56:08 +0800 Subject: [PATCH 1/7] feature: Support SNS subject --- .github/workflows/ci.yml | 4 ++++ .../ChannelFactory.cs | 2 +- .../SnsPublication.cs | 7 +++++++ .../SqsMessageProducer.cs | 2 +- .../SqsMessagePublisher.cs | 8 ++++---- src/Paramore.Brighter/Publication.cs | 1 - 6 files changed, 17 insertions(+), 7 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d6824c19f6..f5e3c48240 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -15,6 +15,10 @@ on: - '*.*.*' pull_request: branches: [ master, release/9X ] + +permissions: + id-token: write + contents: read env: # Stop wasting time caching packages diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/ChannelFactory.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/ChannelFactory.cs index f6818119a0..5011eec15f 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/ChannelFactory.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/ChannelFactory.cs @@ -308,7 +308,7 @@ private void SubscribeToTopic(AmazonSQSClient sqsClient, AmazonSimpleNotificatio } } - private string ToSecondsAsString(int timeoutInMilliseconds) + private static string ToSecondsAsString(int timeoutInMilliseconds) { int timeOutInSeconds = 0; if (timeoutInMilliseconds >= 1000) diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SnsPublication.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SnsPublication.cs index 319512d061..b71d63bdac 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SnsPublication.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SnsPublication.cs @@ -21,6 +21,8 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #endregion +using System; + namespace Paramore.Brighter.MessagingGateway.AWSSQS { public class SnsPublication : Publication @@ -45,5 +47,10 @@ public class SnsPublication : Publication /// as we use the topic from the header to dispatch to an Arn. /// public string TopicArn { get; set; } + + /// + /// The optional Subject passed through to the published SNS message + /// + public Func SnsSubject { get; set; } = null; } } diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageProducer.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageProducer.cs index a807cbdccd..902552b75b 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageProducer.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageProducer.cs @@ -90,7 +90,7 @@ public void Send(Message message) ConfirmTopicExists(message.Header.Topic); using var client = _clientFactory.CreateSnsClient(); - var publisher = new SqsMessagePublisher(ChannelTopicArn, client); + var publisher = new SqsMessagePublisher(ChannelTopicArn, client, _publication.SnsSubject); var messageId = publisher.Publish(message); if (messageId != null) { diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessagePublisher.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessagePublisher.cs index 8f07c032f5..6da316bf83 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessagePublisher.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessagePublisher.cs @@ -23,11 +23,9 @@ THE SOFTWARE. */ using System; using System.Collections.Generic; -using System.Net.Mime; using System.Text.Json; using Amazon.SimpleNotificationService; using Amazon.SimpleNotificationService.Model; -using Paramore.Brighter.Transforms.Transformers; namespace Paramore.Brighter.MessagingGateway.AWSSQS { @@ -35,17 +33,19 @@ public class SqsMessagePublisher { private readonly string _topicArn; private readonly AmazonSimpleNotificationServiceClient _client; + private readonly Func _subject; - public SqsMessagePublisher(string topicArn, AmazonSimpleNotificationServiceClient client) + public SqsMessagePublisher(string topicArn, AmazonSimpleNotificationServiceClient client, Func subject = null) { _topicArn = topicArn; _client = client; + _subject = subject; } public string Publish(Message message) { var messageString = message.Body.Value; - var publishRequest = new PublishRequest(_topicArn, messageString); + var publishRequest = new PublishRequest(_topicArn, messageString, _subject?.Invoke(message)); var messageAttributes = new Dictionary(); messageAttributes.Add(HeaderNames.Id, new MessageAttributeValue{StringValue = Convert.ToString(message.Header.Id), DataType = "String"}); diff --git a/src/Paramore.Brighter/Publication.cs b/src/Paramore.Brighter/Publication.cs index 089ede3d77..bcee041f07 100644 --- a/src/Paramore.Brighter/Publication.cs +++ b/src/Paramore.Brighter/Publication.cs @@ -23,7 +23,6 @@ THE SOFTWARE. */ #endregion using System; -using System.Collections.Generic; namespace Paramore.Brighter { From 94085814e48aeaab178e3572a9813a9200efe95d Mon Sep 17 00:00:00 2001 From: Tim Salva Date: Wed, 21 Aug 2024 12:31:13 +0800 Subject: [PATCH 2/7] Update test --- .github/workflows/ci.yml | 4 ---- .../When_posting_a_message_via_the_messaging_gateway.cs | 7 ++++++- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f5e3c48240..d6824c19f6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -15,10 +15,6 @@ on: - '*.*.*' pull_request: branches: [ master, release/9X ] - -permissions: - id-token: write - contents: read env: # Stop wasting time caching packages diff --git a/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs b/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs index 53149d4018..bc111b7820 100644 --- a/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs +++ b/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs @@ -53,7 +53,12 @@ public SqsMessageProducerSendTests() _channelFactory = new ChannelFactory(awsConnection); _channel = _channelFactory.CreateChannel(subscription); - _messageProducer = new SqsMessageProducer(awsConnection, new SnsPublication{Topic = new RoutingKey(_topicName), MakeChannels = OnMissingChannel.Create}); + _messageProducer = new SqsMessageProducer(awsConnection, new SnsPublication + { + Topic = new RoutingKey(_topicName), + MakeChannels = OnMissingChannel.Create, + SnsSubject = _ => "test" + }); } From 7f89ad88f9e79bfd003a9e6189982088ebc1afa7 Mon Sep 17 00:00:00 2001 From: Tim Salva Date: Fri, 23 Aug 2024 18:55:04 +0800 Subject: [PATCH 3/7] Address review comments --- .../SnsPublication.cs | 4 +- .../SqsMessageProducer.cs | 2 +- .../SqsMessagePublisher.cs | 13 ++-- ...ing_a_message_via_the_messaging_gateway.cs | 65 +++++++++++++++++-- 4 files changed, 73 insertions(+), 11 deletions(-) diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SnsPublication.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SnsPublication.cs index b71d63bdac..137e05ed4e 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SnsPublication.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SnsPublication.cs @@ -49,8 +49,8 @@ public class SnsPublication : Publication public string TopicArn { get; set; } /// - /// The optional Subject passed through to the published SNS message + /// An optional delegate for generating the SNS subject for a given message /// - public Func SnsSubject { get; set; } = null; + public Func SnsSubjectGenerator { get; set; } = null; } } diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageProducer.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageProducer.cs index 902552b75b..06b2b0bec6 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageProducer.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageProducer.cs @@ -90,7 +90,7 @@ public void Send(Message message) ConfirmTopicExists(message.Header.Topic); using var client = _clientFactory.CreateSnsClient(); - var publisher = new SqsMessagePublisher(ChannelTopicArn, client, _publication.SnsSubject); + var publisher = new SqsMessagePublisher(ChannelTopicArn, client, _publication.SnsSubjectGenerator); var messageId = publisher.Publish(message); if (messageId != null) { diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessagePublisher.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessagePublisher.cs index 6da316bf83..9e06834398 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessagePublisher.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessagePublisher.cs @@ -33,19 +33,24 @@ public class SqsMessagePublisher { private readonly string _topicArn; private readonly AmazonSimpleNotificationServiceClient _client; - private readonly Func _subject; + private readonly Func _subjectGenerator; - public SqsMessagePublisher(string topicArn, AmazonSimpleNotificationServiceClient client, Func subject = null) + public SqsMessagePublisher(string topicArn, AmazonSimpleNotificationServiceClient client, Func subjectGenerator = null) { _topicArn = topicArn; _client = client; - _subject = subject; + _subjectGenerator = subjectGenerator; } public string Publish(Message message) { var messageString = message.Body.Value; - var publishRequest = new PublishRequest(_topicArn, messageString, _subject?.Invoke(message)); + var publishRequest = new PublishRequest() + { + TopicArn = _topicArn, + Subject = _subjectGenerator?.Invoke(message), + Message = messageString, + }; var messageAttributes = new Dictionary(); messageAttributes.Add(HeaderNames.Id, new MessageAttributeValue{StringValue = Convert.ToString(message.Header.Id), DataType = "String"}); diff --git a/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs b/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs index bc111b7820..999576da2e 100644 --- a/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs +++ b/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs @@ -3,6 +3,8 @@ using System.Threading.Tasks; using Amazon; using Amazon.Runtime; +using Amazon.SQS; +using Amazon.SQS.Model; using FluentAssertions; using Paramore.Brighter.AWS.Tests.Helpers; using Paramore.Brighter.AWS.Tests.TestDoubles; @@ -14,6 +16,8 @@ namespace Paramore.Brighter.AWS.Tests.MessagingGateway [Trait("Category", "AWS")] public class SqsMessageProducerSendTests : IDisposable { + private readonly AmazonSQSClient _sqsClient; + private readonly string _queueName; private readonly Message _message; private readonly IAmAChannel _channel; private readonly SqsMessageProducer _messageProducer; @@ -23,21 +27,24 @@ public class SqsMessageProducerSendTests : IDisposable private readonly string _replyTo; private readonly string _contentType; private readonly string _topicName; + private readonly string _subject; public SqsMessageProducerSendTests() { - _myCommand = new MyCommand{Value = "Test"}; + _myCommand = new MyCommand{Value = "Testttttttt"}; _correlationId = Guid.NewGuid().ToString(); _replyTo = "http:\\queueUrl"; _contentType = "text\\plain"; var channelName = $"Producer-Send-Tests-{Guid.NewGuid().ToString()}".Truncate(45); _topicName = $"Producer-Send-Tests-{Guid.NewGuid().ToString()}".Truncate(45); var routingKey = new RoutingKey(_topicName); + _subject = "test subject"; SqsSubscription subscription = new( name: new SubscriptionName(channelName), channelName: new ChannelName(channelName), - routingKey: routingKey + routingKey: routingKey, + rawMessageDelivery: false ); _message = new Message( @@ -49,6 +56,9 @@ public SqsMessageProducerSendTests() (AWSCredentials credentials, RegionEndpoint region) = CredentialsChain.GetAwsCredentials(); var awsConnection = new AWSMessagingGatewayConnection(credentials, region); + + _sqsClient = new AmazonSQSClient(credentials, region); + _queueName = subscription.ChannelName.ToValidSQSQueueName(); _channelFactory = new ChannelFactory(awsConnection); _channel = _channelFactory.CreateChannel(subscription); @@ -57,7 +67,7 @@ public SqsMessageProducerSendTests() { Topic = new RoutingKey(_topicName), MakeChannels = OnMissingChannel.Create, - SnsSubject = _ => "test" + SnsSubjectGenerator = _ => _subject }); } @@ -71,7 +81,7 @@ public async Task When_posting_a_message_via_the_producer() await Task.Delay(1000); - var message =_channel.Receive(5000); + var message = _channel.Receive(5000); //clear the queue _channel.Acknowledge(message); @@ -93,6 +103,53 @@ public async Task When_posting_a_message_via_the_producer() //{"Id":"cd581ced-c066-4322-aeaf-d40944de8edd","Value":"Test","WasCancelled":false,"TaskCompleted":false} message.Body.Value.Should().Be(_message.Body.Value); } + + [Fact] + public async Task When_posting_a_message_via_the_producer_with_subject() + { + //arrange + _messageProducer.Send(_message); + + await Task.Delay(1000); + + var message = await ReceiveRaw(5000); + + //clear the queue + await AcknowledgeRaw(message); + + var jsonDocument = JsonDocument.Parse(message.Body); + + jsonDocument.RootElement.TryGetProperty("Subject", out var subject).Should().BeTrue(); + subject.GetString().Should().Be(_subject); + } + + private async Task ReceiveRaw(int timeoutMilliseconds) + { + var urlResponse = await _sqsClient.GetQueueUrlAsync(_queueName); + + var request = new ReceiveMessageRequest(urlResponse.QueueUrl) + { + MaxNumberOfMessages = 1, + WaitTimeSeconds = (int)TimeSpan.FromMilliseconds(timeoutMilliseconds).TotalSeconds, + MessageAttributeNames = ["All"], + MessageSystemAttributeNames = ["All"], + }; + + var receiveResponse = await _sqsClient.ReceiveMessageAsync(request); + + if (receiveResponse.Messages.Count == 0) + { + return null; + } + + return receiveResponse.Messages[0]; + } + + private async Task AcknowledgeRaw(Amazon.SQS.Model.Message message) + { + var urlResponse = await _sqsClient.GetQueueUrlAsync(_queueName); + await _sqsClient.DeleteMessageAsync(new DeleteMessageRequest(urlResponse.QueueUrl, message.ReceiptHandle)); + } public void Dispose() { From 59e58d267e1f401db8394f4ea342745f87cfbed9 Mon Sep 17 00:00:00 2001 From: Tim Salva Date: Fri, 23 Aug 2024 19:04:00 +0800 Subject: [PATCH 4/7] Retrigger build --- .../SqsMessagePublisher.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessagePublisher.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessagePublisher.cs index 9e06834398..817ff3d2fb 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessagePublisher.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessagePublisher.cs @@ -49,7 +49,7 @@ public string Publish(Message message) { TopicArn = _topicArn, Subject = _subjectGenerator?.Invoke(message), - Message = messageString, + Message = messageString }; var messageAttributes = new Dictionary(); From c0276508d8ea9491e9e9b1dcc9414075afef28a9 Mon Sep 17 00:00:00 2001 From: Tim Salva Date: Tue, 27 Aug 2024 22:53:48 +0800 Subject: [PATCH 5/7] Revert publication level subject generator commits --- .../ChannelFactory.cs | 2 +- .../SnsPublication.cs | 7 -- .../SqsMessageProducer.cs | 2 +- .../SqsMessagePublisher.cs | 13 ++-- src/Paramore.Brighter/Publication.cs | 1 + ...ing_a_message_via_the_messaging_gateway.cs | 70 ++----------------- 6 files changed, 11 insertions(+), 84 deletions(-) diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/ChannelFactory.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/ChannelFactory.cs index 5011eec15f..f6818119a0 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/ChannelFactory.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/ChannelFactory.cs @@ -308,7 +308,7 @@ private void SubscribeToTopic(AmazonSQSClient sqsClient, AmazonSimpleNotificatio } } - private static string ToSecondsAsString(int timeoutInMilliseconds) + private string ToSecondsAsString(int timeoutInMilliseconds) { int timeOutInSeconds = 0; if (timeoutInMilliseconds >= 1000) diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SnsPublication.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SnsPublication.cs index 137e05ed4e..319512d061 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SnsPublication.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SnsPublication.cs @@ -21,8 +21,6 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #endregion -using System; - namespace Paramore.Brighter.MessagingGateway.AWSSQS { public class SnsPublication : Publication @@ -47,10 +45,5 @@ public class SnsPublication : Publication /// as we use the topic from the header to dispatch to an Arn. /// public string TopicArn { get; set; } - - /// - /// An optional delegate for generating the SNS subject for a given message - /// - public Func SnsSubjectGenerator { get; set; } = null; } } diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageProducer.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageProducer.cs index 06b2b0bec6..a807cbdccd 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageProducer.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageProducer.cs @@ -90,7 +90,7 @@ public void Send(Message message) ConfirmTopicExists(message.Header.Topic); using var client = _clientFactory.CreateSnsClient(); - var publisher = new SqsMessagePublisher(ChannelTopicArn, client, _publication.SnsSubjectGenerator); + var publisher = new SqsMessagePublisher(ChannelTopicArn, client); var messageId = publisher.Publish(message); if (messageId != null) { diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessagePublisher.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessagePublisher.cs index 817ff3d2fb..8f07c032f5 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessagePublisher.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessagePublisher.cs @@ -23,9 +23,11 @@ THE SOFTWARE. */ using System; using System.Collections.Generic; +using System.Net.Mime; using System.Text.Json; using Amazon.SimpleNotificationService; using Amazon.SimpleNotificationService.Model; +using Paramore.Brighter.Transforms.Transformers; namespace Paramore.Brighter.MessagingGateway.AWSSQS { @@ -33,24 +35,17 @@ public class SqsMessagePublisher { private readonly string _topicArn; private readonly AmazonSimpleNotificationServiceClient _client; - private readonly Func _subjectGenerator; - public SqsMessagePublisher(string topicArn, AmazonSimpleNotificationServiceClient client, Func subjectGenerator = null) + public SqsMessagePublisher(string topicArn, AmazonSimpleNotificationServiceClient client) { _topicArn = topicArn; _client = client; - _subjectGenerator = subjectGenerator; } public string Publish(Message message) { var messageString = message.Body.Value; - var publishRequest = new PublishRequest() - { - TopicArn = _topicArn, - Subject = _subjectGenerator?.Invoke(message), - Message = messageString - }; + var publishRequest = new PublishRequest(_topicArn, messageString); var messageAttributes = new Dictionary(); messageAttributes.Add(HeaderNames.Id, new MessageAttributeValue{StringValue = Convert.ToString(message.Header.Id), DataType = "String"}); diff --git a/src/Paramore.Brighter/Publication.cs b/src/Paramore.Brighter/Publication.cs index bcee041f07..089ede3d77 100644 --- a/src/Paramore.Brighter/Publication.cs +++ b/src/Paramore.Brighter/Publication.cs @@ -23,6 +23,7 @@ THE SOFTWARE. */ #endregion using System; +using System.Collections.Generic; namespace Paramore.Brighter { diff --git a/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs b/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs index 999576da2e..53149d4018 100644 --- a/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs +++ b/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs @@ -3,8 +3,6 @@ using System.Threading.Tasks; using Amazon; using Amazon.Runtime; -using Amazon.SQS; -using Amazon.SQS.Model; using FluentAssertions; using Paramore.Brighter.AWS.Tests.Helpers; using Paramore.Brighter.AWS.Tests.TestDoubles; @@ -16,8 +14,6 @@ namespace Paramore.Brighter.AWS.Tests.MessagingGateway [Trait("Category", "AWS")] public class SqsMessageProducerSendTests : IDisposable { - private readonly AmazonSQSClient _sqsClient; - private readonly string _queueName; private readonly Message _message; private readonly IAmAChannel _channel; private readonly SqsMessageProducer _messageProducer; @@ -27,24 +23,21 @@ public class SqsMessageProducerSendTests : IDisposable private readonly string _replyTo; private readonly string _contentType; private readonly string _topicName; - private readonly string _subject; public SqsMessageProducerSendTests() { - _myCommand = new MyCommand{Value = "Testttttttt"}; + _myCommand = new MyCommand{Value = "Test"}; _correlationId = Guid.NewGuid().ToString(); _replyTo = "http:\\queueUrl"; _contentType = "text\\plain"; var channelName = $"Producer-Send-Tests-{Guid.NewGuid().ToString()}".Truncate(45); _topicName = $"Producer-Send-Tests-{Guid.NewGuid().ToString()}".Truncate(45); var routingKey = new RoutingKey(_topicName); - _subject = "test subject"; SqsSubscription subscription = new( name: new SubscriptionName(channelName), channelName: new ChannelName(channelName), - routingKey: routingKey, - rawMessageDelivery: false + routingKey: routingKey ); _message = new Message( @@ -56,19 +49,11 @@ public SqsMessageProducerSendTests() (AWSCredentials credentials, RegionEndpoint region) = CredentialsChain.GetAwsCredentials(); var awsConnection = new AWSMessagingGatewayConnection(credentials, region); - - _sqsClient = new AmazonSQSClient(credentials, region); - _queueName = subscription.ChannelName.ToValidSQSQueueName(); _channelFactory = new ChannelFactory(awsConnection); _channel = _channelFactory.CreateChannel(subscription); - _messageProducer = new SqsMessageProducer(awsConnection, new SnsPublication - { - Topic = new RoutingKey(_topicName), - MakeChannels = OnMissingChannel.Create, - SnsSubjectGenerator = _ => _subject - }); + _messageProducer = new SqsMessageProducer(awsConnection, new SnsPublication{Topic = new RoutingKey(_topicName), MakeChannels = OnMissingChannel.Create}); } @@ -81,7 +66,7 @@ public async Task When_posting_a_message_via_the_producer() await Task.Delay(1000); - var message = _channel.Receive(5000); + var message =_channel.Receive(5000); //clear the queue _channel.Acknowledge(message); @@ -103,53 +88,6 @@ public async Task When_posting_a_message_via_the_producer() //{"Id":"cd581ced-c066-4322-aeaf-d40944de8edd","Value":"Test","WasCancelled":false,"TaskCompleted":false} message.Body.Value.Should().Be(_message.Body.Value); } - - [Fact] - public async Task When_posting_a_message_via_the_producer_with_subject() - { - //arrange - _messageProducer.Send(_message); - - await Task.Delay(1000); - - var message = await ReceiveRaw(5000); - - //clear the queue - await AcknowledgeRaw(message); - - var jsonDocument = JsonDocument.Parse(message.Body); - - jsonDocument.RootElement.TryGetProperty("Subject", out var subject).Should().BeTrue(); - subject.GetString().Should().Be(_subject); - } - - private async Task ReceiveRaw(int timeoutMilliseconds) - { - var urlResponse = await _sqsClient.GetQueueUrlAsync(_queueName); - - var request = new ReceiveMessageRequest(urlResponse.QueueUrl) - { - MaxNumberOfMessages = 1, - WaitTimeSeconds = (int)TimeSpan.FromMilliseconds(timeoutMilliseconds).TotalSeconds, - MessageAttributeNames = ["All"], - MessageSystemAttributeNames = ["All"], - }; - - var receiveResponse = await _sqsClient.ReceiveMessageAsync(request); - - if (receiveResponse.Messages.Count == 0) - { - return null; - } - - return receiveResponse.Messages[0]; - } - - private async Task AcknowledgeRaw(Amazon.SQS.Model.Message message) - { - var urlResponse = await _sqsClient.GetQueueUrlAsync(_queueName); - await _sqsClient.DeleteMessageAsync(new DeleteMessageRequest(urlResponse.QueueUrl, message.ReceiptHandle)); - } public void Dispose() { From 73d08745a471d591441623af798f10b4a76459e3 Mon Sep 17 00:00:00 2001 From: Tim Salva Date: Tue, 27 Aug 2024 23:36:58 +0800 Subject: [PATCH 6/7] Use Message.Header.Subject as SNS subject --- .../SqsInlineMessageCreator.cs | 32 ++++++++++++++----- .../SqsMessagePublisher.cs | 2 +- ...ing_a_message_via_the_messaging_gateway.cs | 14 +++++--- 3 files changed, 34 insertions(+), 14 deletions(-) diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsInlineMessageCreator.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsInlineMessageCreator.cs index a3d45db22a..3bb0468a35 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsInlineMessageCreator.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsInlineMessageCreator.cs @@ -46,11 +46,13 @@ public Message CreateMessage(Amazon.SQS.Model.Message sqsMessage) var timeStamp = HeaderResult.Empty(); var receiptHandle = HeaderResult.Empty(); var replyTo = HeaderResult.Empty(); + var subject = HeaderResult.Empty(); Message message; try { - _messageAttributes = ReadMessageAttributes(sqsMessage); + var jsonDocument = JsonDocument.Parse(sqsMessage.Body); + _messageAttributes = ReadMessageAttributes(jsonDocument); topic = ReadTopic(); messageId = ReadMessageId(); @@ -60,6 +62,7 @@ public Message CreateMessage(Amazon.SQS.Model.Message sqsMessage) messageType = ReadMessageType(); timeStamp = ReadTimestamp(); replyTo = ReadReplyTo(); + subject = ReadMessageSubject(jsonDocument); receiptHandle = ReadReceiptHandle(sqsMessage); //TODO:CLOUD_EVENTS parse from headers @@ -76,10 +79,10 @@ public Message CreateMessage(Amazon.SQS.Model.Message sqsMessage) contentType: contentType.Result, handledCount: handledCount.Result, dataSchema: null, - subject: null, + subject: subject.Result, delayedMilliseconds: 0); - message = new Message(messageHeader, ReadMessageBody(sqsMessage)); + message = new Message(messageHeader, ReadMessageBody(jsonDocument)); //deserialize the bag var bag = ReadMessageBag(); @@ -101,14 +104,12 @@ public Message CreateMessage(Amazon.SQS.Model.Message sqsMessage) return message; } - private Dictionary ReadMessageAttributes(Amazon.SQS.Model.Message sqsMessage) + private static Dictionary ReadMessageAttributes(JsonDocument jsonDocument) { var messageAttributes = new Dictionary(); try { - var jsonDocument = JsonDocument.Parse(sqsMessage.Body); - if (jsonDocument.RootElement.TryGetProperty("MessageAttributes", out var attributes)) { messageAttributes = JsonSerializer.Deserialize>( @@ -238,12 +239,27 @@ private HeaderResult ReadTopic() return new HeaderResult(string.Empty, true); } - private MessageBody ReadMessageBody(Amazon.SQS.Model.Message sqsMessage) + private static HeaderResult ReadMessageSubject(JsonDocument jsonDocument) { try { - var jsonDocument = JsonDocument.Parse(sqsMessage.Body); + if (jsonDocument.RootElement.TryGetProperty("Subject", out var value)) + { + return new HeaderResult(value.GetString(), true); + } + } + catch (Exception ex) + { + s_logger.LogWarning($"Failed to parse Sqs Message Body to valid Json Document, ex: {ex}"); + } + + return new HeaderResult(string.Empty, true);; + } + private static MessageBody ReadMessageBody(JsonDocument jsonDocument) + { + try + { if (jsonDocument.RootElement.TryGetProperty("Message", out var value)) { return new MessageBody(value.GetString()); diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessagePublisher.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessagePublisher.cs index 8f07c032f5..183b3df3d5 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessagePublisher.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessagePublisher.cs @@ -45,7 +45,7 @@ public SqsMessagePublisher(string topicArn, AmazonSimpleNotificationServiceClien public string Publish(Message message) { var messageString = message.Body.Value; - var publishRequest = new PublishRequest(_topicArn, messageString); + var publishRequest = new PublishRequest(_topicArn, messageString, message.Header.Subject); var messageAttributes = new Dictionary(); messageAttributes.Add(HeaderNames.Id, new MessageAttributeValue{StringValue = Convert.ToString(message.Header.Id), DataType = "String"}); diff --git a/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs b/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs index 53149d4018..c4cbf7a6b3 100644 --- a/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs +++ b/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs @@ -23,26 +23,29 @@ public class SqsMessageProducerSendTests : IDisposable private readonly string _replyTo; private readonly string _contentType; private readonly string _topicName; + private readonly string _subject; public SqsMessageProducerSendTests() { - _myCommand = new MyCommand{Value = "Test"}; + _myCommand = new MyCommand{Value = "Testttttttt"}; _correlationId = Guid.NewGuid().ToString(); _replyTo = "http:\\queueUrl"; _contentType = "text\\plain"; var channelName = $"Producer-Send-Tests-{Guid.NewGuid().ToString()}".Truncate(45); _topicName = $"Producer-Send-Tests-{Guid.NewGuid().ToString()}".Truncate(45); var routingKey = new RoutingKey(_topicName); + _subject = "test subject"; SqsSubscription subscription = new( name: new SubscriptionName(channelName), channelName: new ChannelName(channelName), - routingKey: routingKey + routingKey: routingKey, + rawMessageDelivery: false ); _message = new Message( new MessageHeader(_myCommand.Id, _topicName, MessageType.MT_COMMAND, correlationId: _correlationId, - replyTo: _replyTo, contentType: _contentType), + replyTo: _replyTo, contentType: _contentType, subject: _subject), new MessageBody(JsonSerializer.Serialize((object) _myCommand, JsonSerialisationOptions.Options)) ); @@ -66,7 +69,7 @@ public async Task When_posting_a_message_via_the_producer() await Task.Delay(1000); - var message =_channel.Receive(5000); + var message = _channel.Receive(5000); //clear the queue _channel.Acknowledge(message); @@ -82,6 +85,7 @@ public async Task When_posting_a_message_via_the_producer() message.Header.ReplyTo.Should().Be(_replyTo); message.Header.ContentType.Should().Be(_contentType); message.Header.HandledCount.Should().Be(0); + message.Header.Subject.Should().Be(_subject); //allow for clock drift in the following test, more important to have a contemporary timestamp than anything message.Header.TimeStamp.Should().BeAfter(RoundToSeconds(DateTime.UtcNow.AddMinutes(-1))); message.Header.DelayedMilliseconds.Should().Be(0); @@ -96,7 +100,7 @@ public void Dispose() _messageProducer?.Dispose(); } - private DateTime RoundToSeconds(DateTime dateTime) + private static DateTime RoundToSeconds(DateTime dateTime) { return new DateTime(dateTime.Ticks - (dateTime.Ticks % TimeSpan.TicksPerSecond), dateTime.Kind); } From b53a1aec1518dd141085d3726e5f5f519ca993fe Mon Sep 17 00:00:00 2001 From: Tim Salva Date: Wed, 28 Aug 2024 00:11:47 +0800 Subject: [PATCH 7/7] Fix commit --- .../SqsInlineMessageCreator.cs | 2 +- ...sting_a_message_via_the_messaging_gateway.cs | 17 +++++++++-------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsInlineMessageCreator.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsInlineMessageCreator.cs index 3bb0468a35..20a2c90f69 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsInlineMessageCreator.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsInlineMessageCreator.cs @@ -253,7 +253,7 @@ private static HeaderResult ReadMessageSubject(JsonDocument jsonDocument s_logger.LogWarning($"Failed to parse Sqs Message Body to valid Json Document, ex: {ex}"); } - return new HeaderResult(string.Empty, true);; + return new HeaderResult(null, true);; } private static MessageBody ReadMessageBody(JsonDocument jsonDocument) diff --git a/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs b/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs index c4cbf7a6b3..b58a7f0d01 100644 --- a/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs +++ b/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs @@ -23,18 +23,16 @@ public class SqsMessageProducerSendTests : IDisposable private readonly string _replyTo; private readonly string _contentType; private readonly string _topicName; - private readonly string _subject; public SqsMessageProducerSendTests() { - _myCommand = new MyCommand{Value = "Testttttttt"}; + _myCommand = new MyCommand{Value = "Test"}; _correlationId = Guid.NewGuid().ToString(); _replyTo = "http:\\queueUrl"; _contentType = "text\\plain"; var channelName = $"Producer-Send-Tests-{Guid.NewGuid().ToString()}".Truncate(45); _topicName = $"Producer-Send-Tests-{Guid.NewGuid().ToString()}".Truncate(45); var routingKey = new RoutingKey(_topicName); - _subject = "test subject"; SqsSubscription subscription = new( name: new SubscriptionName(channelName), @@ -45,7 +43,7 @@ public SqsMessageProducerSendTests() _message = new Message( new MessageHeader(_myCommand.Id, _topicName, MessageType.MT_COMMAND, correlationId: _correlationId, - replyTo: _replyTo, contentType: _contentType, subject: _subject), + replyTo: _replyTo, contentType: _contentType), new MessageBody(JsonSerializer.Serialize((object) _myCommand, JsonSerialisationOptions.Options)) ); @@ -61,12 +59,15 @@ public SqsMessageProducerSendTests() - [Fact] - public async Task When_posting_a_message_via_the_producer() + [Theory] + [InlineData("test subject")] + [InlineData(null)] + public async Task When_posting_a_message_via_the_producer(string subject) { //arrange + _message.Header.Subject = subject; _messageProducer.Send(_message); - + await Task.Delay(1000); var message = _channel.Receive(5000); @@ -85,7 +86,7 @@ public async Task When_posting_a_message_via_the_producer() message.Header.ReplyTo.Should().Be(_replyTo); message.Header.ContentType.Should().Be(_contentType); message.Header.HandledCount.Should().Be(0); - message.Header.Subject.Should().Be(_subject); + message.Header.Subject.Should().Be(subject); //allow for clock drift in the following test, more important to have a contemporary timestamp than anything message.Header.TimeStamp.Should().BeAfter(RoundToSeconds(DateTime.UtcNow.AddMinutes(-1))); message.Header.DelayedMilliseconds.Should().Be(0);