From 5e82c84da3b883b29a4242c2b4379a01517e0af6 Mon Sep 17 00:00:00 2001 From: Tim Salva Date: Fri, 23 Aug 2024 19:41:19 +0800 Subject: [PATCH] feature: Backport #3270 --- .../SnsPublication.cs | 7 ++ .../SqsMessageProducer.cs | 2 +- .../SqsMessagePublisher.cs | 13 ++-- ...ing_a_message_via_the_messaging_gateway.cs | 69 ++++++++++++++++++- 4 files changed, 83 insertions(+), 8 deletions(-) diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SnsPublication.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SnsPublication.cs index 319512d061..64987f9221 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SnsPublication.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SnsPublication.cs @@ -21,6 +21,8 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #endregion +using System; + namespace Paramore.Brighter.MessagingGateway.AWSSQS { public class SnsPublication : Publication @@ -45,5 +47,10 @@ public class SnsPublication : Publication /// as we use the topic from the header to dispatch to an Arn. /// public string TopicArn { get; set; } + + /// + /// An optional delegate for generating the SNS subject for a given message + /// + public Func SnsSubjectGenerator { get; set; } = null; } } diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageProducer.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageProducer.cs index 40d581b208..68611a4486 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageProducer.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageProducer.cs @@ -106,7 +106,7 @@ public void Send(Message message) using (var client = _clientFactory.CreateSnsClient()) { - var publisher = new SqsMessagePublisher(ChannelTopicArn, client); + var publisher = new SqsMessagePublisher(ChannelTopicArn, client, _publication.SnsSubjectGenerator); var messageId = publisher.Publish(message); if (messageId != null) { diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessagePublisher.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessagePublisher.cs index 8f07c032f5..817ff3d2fb 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessagePublisher.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessagePublisher.cs @@ -23,11 +23,9 @@ THE SOFTWARE. */ using System; using System.Collections.Generic; -using System.Net.Mime; using System.Text.Json; using Amazon.SimpleNotificationService; using Amazon.SimpleNotificationService.Model; -using Paramore.Brighter.Transforms.Transformers; namespace Paramore.Brighter.MessagingGateway.AWSSQS { @@ -35,17 +33,24 @@ public class SqsMessagePublisher { private readonly string _topicArn; private readonly AmazonSimpleNotificationServiceClient _client; + private readonly Func _subjectGenerator; - public SqsMessagePublisher(string topicArn, AmazonSimpleNotificationServiceClient client) + public SqsMessagePublisher(string topicArn, AmazonSimpleNotificationServiceClient client, Func subjectGenerator = null) { _topicArn = topicArn; _client = client; + _subjectGenerator = subjectGenerator; } public string Publish(Message message) { var messageString = message.Body.Value; - var publishRequest = new PublishRequest(_topicArn, messageString); + var publishRequest = new PublishRequest() + { + TopicArn = _topicArn, + Subject = _subjectGenerator?.Invoke(message), + Message = messageString + }; var messageAttributes = new Dictionary(); messageAttributes.Add(HeaderNames.Id, new MessageAttributeValue{StringValue = Convert.ToString(message.Header.Id), DataType = "String"}); diff --git a/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs b/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs index 5140209560..75db494e29 100644 --- a/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs +++ b/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs @@ -3,6 +3,8 @@ using System.Threading.Tasks; using Amazon; using Amazon.Runtime; +using Amazon.SQS; +using Amazon.SQS.Model; using FluentAssertions; using Paramore.Brighter.AWS.Tests.Helpers; using Paramore.Brighter.AWS.Tests.TestDoubles; @@ -14,6 +16,8 @@ namespace Paramore.Brighter.AWS.Tests.MessagingGateway [Trait("Category", "AWS")] public class SqsMessageProducerSendTests : IDisposable { + private readonly AmazonSQSClient _sqsClient; + private readonly string _queueName; private readonly Message _message; private readonly IAmAChannel _channel; private readonly SqsMessageProducer _messageProducer; @@ -23,6 +27,7 @@ public class SqsMessageProducerSendTests : IDisposable private readonly string _replyTo; private readonly string _contentType; private readonly string _topicName; + private readonly string _subject; public SqsMessageProducerSendTests() { @@ -33,11 +38,13 @@ public SqsMessageProducerSendTests() var channelName = $"Producer-Send-Tests-{Guid.NewGuid().ToString()}".Truncate(45); _topicName = $"Producer-Send-Tests-{Guid.NewGuid().ToString()}".Truncate(45); var routingKey = new RoutingKey(_topicName); + _subject = "test subject"; SqsSubscription subscription = new( name: new SubscriptionName(channelName), channelName: new ChannelName(channelName), - routingKey: routingKey + routingKey: routingKey, + rawMessageDelivery: false ); _message = new Message( @@ -49,10 +56,20 @@ public SqsMessageProducerSendTests() (AWSCredentials credentials, RegionEndpoint region) = CredentialsChain.GetAwsCredentials(); var awsConnection = new AWSMessagingGatewayConnection(credentials, region); + _sqsClient = new AmazonSQSClient(credentials, region); + _queueName = subscription.ChannelName.ToValidSQSQueueName(); + _channelFactory = new ChannelFactory(awsConnection); _channel = _channelFactory.CreateChannel(subscription); - _messageProducer = new SqsMessageProducer(awsConnection, new SnsPublication{Topic = new RoutingKey(_topicName), MakeChannels = OnMissingChannel.Create}); + _messageProducer = new SqsMessageProducer( + awsConnection, + new SnsPublication + { + Topic = new RoutingKey(_topicName), + MakeChannels = OnMissingChannel.Create, + SnsSubjectGenerator = _ => _subject + }); } @@ -65,7 +82,7 @@ public async Task When_posting_a_message_via_the_producer() await Task.Delay(1000); - var message =_channel.Receive(5000); + var message = _channel.Receive(5000); //clear the queue _channel.Acknowledge(message); @@ -87,6 +104,52 @@ public async Task When_posting_a_message_via_the_producer() //{"Id":"cd581ced-c066-4322-aeaf-d40944de8edd","Value":"Test","WasCancelled":false,"TaskCompleted":false} message.Body.Value.Should().Be(_message.Body.Value); } + + [Fact] + public async Task When_posting_a_message_via_the_producer_with_subject() + { + //arrange + _messageProducer.Send(_message); + + await Task.Delay(1000); + + var message = await ReceiveRaw(5000); + + //clear the queue + await AcknowledgeRaw(message); + + var jsonDocument = JsonDocument.Parse(message.Body); + + jsonDocument.RootElement.TryGetProperty("Subject", out var subject).Should().BeTrue(); + subject.GetString().Should().Be(_subject); + } + + private async Task ReceiveRaw(int timeoutMilliseconds) + { + var urlResponse = await _sqsClient.GetQueueUrlAsync(_queueName); + + var request = new ReceiveMessageRequest(urlResponse.QueueUrl) + { + MaxNumberOfMessages = 1, + WaitTimeSeconds = (int)TimeSpan.FromMilliseconds(timeoutMilliseconds).TotalSeconds, + MessageAttributeNames = ["All"] + }; + + var receiveResponse = await _sqsClient.ReceiveMessageAsync(request); + + if (receiveResponse.Messages.Count == 0) + { + return null; + } + + return receiveResponse.Messages[0]; + } + + private async Task AcknowledgeRaw(Amazon.SQS.Model.Message message) + { + var urlResponse = await _sqsClient.GetQueueUrlAsync(_queueName); + await _sqsClient.DeleteMessageAsync(new DeleteMessageRequest(urlResponse.QueueUrl, message.ReceiptHandle)); + } public void Dispose() {