Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: Support the Subject property when publishing to SNS #3270

Merged
merged 8 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,13 @@ public Message CreateMessage(Amazon.SQS.Model.Message sqsMessage)
var timeStamp = HeaderResult<DateTime>.Empty();
var receiptHandle = HeaderResult<string>.Empty();
var replyTo = HeaderResult<string>.Empty();
var subject = HeaderResult<string>.Empty();

Message message;
try
{
_messageAttributes = ReadMessageAttributes(sqsMessage);
var jsonDocument = JsonDocument.Parse(sqsMessage.Body);
_messageAttributes = ReadMessageAttributes(jsonDocument);

topic = ReadTopic();
messageId = ReadMessageId();
Expand All @@ -60,6 +62,7 @@ public Message CreateMessage(Amazon.SQS.Model.Message sqsMessage)
messageType = ReadMessageType();
timeStamp = ReadTimestamp();
replyTo = ReadReplyTo();
subject = ReadMessageSubject(jsonDocument);
receiptHandle = ReadReceiptHandle(sqsMessage);

//TODO:CLOUD_EVENTS parse from headers
Expand All @@ -76,10 +79,10 @@ public Message CreateMessage(Amazon.SQS.Model.Message sqsMessage)
contentType: contentType.Result,
handledCount: handledCount.Result,
dataSchema: null,
subject: null,
subject: subject.Result,
delayedMilliseconds: 0);

message = new Message(messageHeader, ReadMessageBody(sqsMessage));
message = new Message(messageHeader, ReadMessageBody(jsonDocument));

//deserialize the bag
var bag = ReadMessageBag();
Expand All @@ -101,14 +104,12 @@ public Message CreateMessage(Amazon.SQS.Model.Message sqsMessage)
return message;
}

private Dictionary<string, JsonElement> ReadMessageAttributes(Amazon.SQS.Model.Message sqsMessage)
private static Dictionary<string, JsonElement> ReadMessageAttributes(JsonDocument jsonDocument)
{
var messageAttributes = new Dictionary<string, JsonElement>();

try
{
var jsonDocument = JsonDocument.Parse(sqsMessage.Body);

if (jsonDocument.RootElement.TryGetProperty("MessageAttributes", out var attributes))
{
messageAttributes = JsonSerializer.Deserialize<Dictionary<string, JsonElement>>(
Expand Down Expand Up @@ -238,12 +239,27 @@ private HeaderResult<string> ReadTopic()
return new HeaderResult<string>(string.Empty, true);
}

private MessageBody ReadMessageBody(Amazon.SQS.Model.Message sqsMessage)
private static HeaderResult<string> ReadMessageSubject(JsonDocument jsonDocument)
{
try
{
var jsonDocument = JsonDocument.Parse(sqsMessage.Body);
if (jsonDocument.RootElement.TryGetProperty("Subject", out var value))
{
return new HeaderResult<string>(value.GetString(), true);
}
}
catch (Exception ex)
{
s_logger.LogWarning($"Failed to parse Sqs Message Body to valid Json Document, ex: {ex}");
}

return new HeaderResult<string>(string.Empty, true);;
}

private static MessageBody ReadMessageBody(JsonDocument jsonDocument)
{
try
{
if (jsonDocument.RootElement.TryGetProperty("Message", out var value))
{
return new MessageBody(value.GetString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public SqsMessagePublisher(string topicArn, AmazonSimpleNotificationServiceClien
public string Publish(Message message)
{
var messageString = message.Body.Value;
var publishRequest = new PublishRequest(_topicArn, messageString);
var publishRequest = new PublishRequest(_topicArn, messageString, message.Header.Subject);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know what happens if the subject is null or empty? We probably need a test for that, as modifying the existing test won't tell us if it is breaking for existing consumers. It just needs a conditional if it breaks anyone.


var messageAttributes = new Dictionary<string, MessageAttributeValue>();
messageAttributes.Add(HeaderNames.Id, new MessageAttributeValue{StringValue = Convert.ToString(message.Header.Id), DataType = "String"});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,29 @@ public class SqsMessageProducerSendTests : IDisposable
private readonly string _replyTo;
private readonly string _contentType;
private readonly string _topicName;
private readonly string _subject;

public SqsMessageProducerSendTests()
{
_myCommand = new MyCommand{Value = "Test"};
_myCommand = new MyCommand{Value = "Testttttttt"};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, we need a test for when there is no subject.

_correlationId = Guid.NewGuid().ToString();
_replyTo = "http:\\queueUrl";
_contentType = "text\\plain";
var channelName = $"Producer-Send-Tests-{Guid.NewGuid().ToString()}".Truncate(45);
_topicName = $"Producer-Send-Tests-{Guid.NewGuid().ToString()}".Truncate(45);
var routingKey = new RoutingKey(_topicName);
_subject = "test subject";

SqsSubscription<MyCommand> subscription = new(
name: new SubscriptionName(channelName),
channelName: new ChannelName(channelName),
routingKey: routingKey
routingKey: routingKey,
rawMessageDelivery: false
);

_message = new Message(
new MessageHeader(_myCommand.Id, _topicName, MessageType.MT_COMMAND, correlationId: _correlationId,
replyTo: _replyTo, contentType: _contentType),
replyTo: _replyTo, contentType: _contentType, subject: _subject),
new MessageBody(JsonSerializer.Serialize((object) _myCommand, JsonSerialisationOptions.Options))
);

Expand All @@ -66,7 +69,7 @@ public async Task When_posting_a_message_via_the_producer()

await Task.Delay(1000);

var message =_channel.Receive(5000);
var message = _channel.Receive(5000);

//clear the queue
_channel.Acknowledge(message);
Expand All @@ -82,6 +85,7 @@ public async Task When_posting_a_message_via_the_producer()
message.Header.ReplyTo.Should().Be(_replyTo);
message.Header.ContentType.Should().Be(_contentType);
message.Header.HandledCount.Should().Be(0);
message.Header.Subject.Should().Be(_subject);
//allow for clock drift in the following test, more important to have a contemporary timestamp than anything
message.Header.TimeStamp.Should().BeAfter(RoundToSeconds(DateTime.UtcNow.AddMinutes(-1)));
message.Header.DelayedMilliseconds.Should().Be(0);
Expand All @@ -96,7 +100,7 @@ public void Dispose()
_messageProducer?.Dispose();
}

private DateTime RoundToSeconds(DateTime dateTime)
private static DateTime RoundToSeconds(DateTime dateTime)
{
return new DateTime(dateTime.Ticks - (dateTime.Ticks % TimeSpan.TicksPerSecond), dateTime.Kind);
}
Expand Down
Loading