Skip to content

Commit

Permalink
feature: Backport BrighterCommand#3270
Browse files Browse the repository at this point in the history
  • Loading branch information
jtsalva committed Aug 23, 2024
1 parent 02cbf36 commit 5e82c84
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE. */
#endregion

using System;

namespace Paramore.Brighter.MessagingGateway.AWSSQS
{
public class SnsPublication : Publication
Expand All @@ -45,5 +47,10 @@ public class SnsPublication : Publication
/// as we use the topic from the header to dispatch to an Arn.
/// </summary>
public string TopicArn { get; set; }

/// <summary>
/// An optional delegate for generating the SNS subject for a given message
/// </summary>
public Func<Message, string> SnsSubjectGenerator { get; set; } = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void Send(Message message)

using (var client = _clientFactory.CreateSnsClient())
{
var publisher = new SqsMessagePublisher(ChannelTopicArn, client);
var publisher = new SqsMessagePublisher(ChannelTopicArn, client, _publication.SnsSubjectGenerator);
var messageId = publisher.Publish(message);
if (messageId != null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,34 @@ THE SOFTWARE. */

using System;
using System.Collections.Generic;
using System.Net.Mime;
using System.Text.Json;
using Amazon.SimpleNotificationService;
using Amazon.SimpleNotificationService.Model;
using Paramore.Brighter.Transforms.Transformers;

namespace Paramore.Brighter.MessagingGateway.AWSSQS
{
public class SqsMessagePublisher
{
private readonly string _topicArn;
private readonly AmazonSimpleNotificationServiceClient _client;
private readonly Func<Message, string> _subjectGenerator;

public SqsMessagePublisher(string topicArn, AmazonSimpleNotificationServiceClient client)
public SqsMessagePublisher(string topicArn, AmazonSimpleNotificationServiceClient client, Func<Message, string> subjectGenerator = null)
{
_topicArn = topicArn;
_client = client;
_subjectGenerator = subjectGenerator;
}

public string Publish(Message message)
{
var messageString = message.Body.Value;
var publishRequest = new PublishRequest(_topicArn, messageString);
var publishRequest = new PublishRequest()
{
TopicArn = _topicArn,
Subject = _subjectGenerator?.Invoke(message),
Message = messageString
};

var messageAttributes = new Dictionary<string, MessageAttributeValue>();
messageAttributes.Add(HeaderNames.Id, new MessageAttributeValue{StringValue = Convert.ToString(message.Header.Id), DataType = "String"});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
using System.Threading.Tasks;
using Amazon;
using Amazon.Runtime;
using Amazon.SQS;
using Amazon.SQS.Model;
using FluentAssertions;
using Paramore.Brighter.AWS.Tests.Helpers;
using Paramore.Brighter.AWS.Tests.TestDoubles;
Expand All @@ -14,6 +16,8 @@ namespace Paramore.Brighter.AWS.Tests.MessagingGateway
[Trait("Category", "AWS")]
public class SqsMessageProducerSendTests : IDisposable
{
private readonly AmazonSQSClient _sqsClient;
private readonly string _queueName;
private readonly Message _message;
private readonly IAmAChannel _channel;
private readonly SqsMessageProducer _messageProducer;
Expand All @@ -23,6 +27,7 @@ public class SqsMessageProducerSendTests : IDisposable
private readonly string _replyTo;
private readonly string _contentType;
private readonly string _topicName;
private readonly string _subject;

public SqsMessageProducerSendTests()
{
Expand All @@ -33,11 +38,13 @@ public SqsMessageProducerSendTests()
var channelName = $"Producer-Send-Tests-{Guid.NewGuid().ToString()}".Truncate(45);
_topicName = $"Producer-Send-Tests-{Guid.NewGuid().ToString()}".Truncate(45);
var routingKey = new RoutingKey(_topicName);
_subject = "test subject";

SqsSubscription<MyCommand> subscription = new(
name: new SubscriptionName(channelName),
channelName: new ChannelName(channelName),
routingKey: routingKey
routingKey: routingKey,
rawMessageDelivery: false
);

_message = new Message(
Expand All @@ -49,10 +56,20 @@ public SqsMessageProducerSendTests()
(AWSCredentials credentials, RegionEndpoint region) = CredentialsChain.GetAwsCredentials();
var awsConnection = new AWSMessagingGatewayConnection(credentials, region);

_sqsClient = new AmazonSQSClient(credentials, region);
_queueName = subscription.ChannelName.ToValidSQSQueueName();

_channelFactory = new ChannelFactory(awsConnection);
_channel = _channelFactory.CreateChannel(subscription);

_messageProducer = new SqsMessageProducer(awsConnection, new SnsPublication{Topic = new RoutingKey(_topicName), MakeChannels = OnMissingChannel.Create});
_messageProducer = new SqsMessageProducer(
awsConnection,
new SnsPublication
{
Topic = new RoutingKey(_topicName),
MakeChannels = OnMissingChannel.Create,
SnsSubjectGenerator = _ => _subject
});
}


Expand All @@ -65,7 +82,7 @@ public async Task When_posting_a_message_via_the_producer()

await Task.Delay(1000);

var message =_channel.Receive(5000);
var message = _channel.Receive(5000);

//clear the queue
_channel.Acknowledge(message);
Expand All @@ -87,6 +104,52 @@ public async Task When_posting_a_message_via_the_producer()
//{"Id":"cd581ced-c066-4322-aeaf-d40944de8edd","Value":"Test","WasCancelled":false,"TaskCompleted":false}
message.Body.Value.Should().Be(_message.Body.Value);
}

[Fact]
public async Task When_posting_a_message_via_the_producer_with_subject()
{
//arrange
_messageProducer.Send(_message);

await Task.Delay(1000);

var message = await ReceiveRaw(5000);

//clear the queue
await AcknowledgeRaw(message);

var jsonDocument = JsonDocument.Parse(message.Body);

jsonDocument.RootElement.TryGetProperty("Subject", out var subject).Should().BeTrue();
subject.GetString().Should().Be(_subject);
}

private async Task<Amazon.SQS.Model.Message> ReceiveRaw(int timeoutMilliseconds)
{
var urlResponse = await _sqsClient.GetQueueUrlAsync(_queueName);

var request = new ReceiveMessageRequest(urlResponse.QueueUrl)
{
MaxNumberOfMessages = 1,
WaitTimeSeconds = (int)TimeSpan.FromMilliseconds(timeoutMilliseconds).TotalSeconds,
MessageAttributeNames = ["All"]
};

var receiveResponse = await _sqsClient.ReceiveMessageAsync(request);

if (receiveResponse.Messages.Count == 0)
{
return null;
}

return receiveResponse.Messages[0];
}

private async Task AcknowledgeRaw(Amazon.SQS.Model.Message message)
{
var urlResponse = await _sqsClient.GetQueueUrlAsync(_queueName);
await _sqsClient.DeleteMessageAsync(new DeleteMessageRequest(urlResponse.QueueUrl, message.ReceiptHandle));
}

public void Dispose()
{
Expand Down

0 comments on commit 5e82c84

Please sign in to comment.