Skip to content

Commit

Permalink
feature: Support the Subject property when publishing to SNS (#3270)
Browse files Browse the repository at this point in the history
* feature: Support SNS subject

* Update test

* Address review comments

* Retrigger build

* Revert publication level subject generator commits

* Use Message.Header.Subject as SNS subject

* Fix commit
  • Loading branch information
jtsalva authored Aug 27, 2024
1 parent 4674fdc commit 1234f63
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,13 @@ public Message CreateMessage(Amazon.SQS.Model.Message sqsMessage)
var timeStamp = HeaderResult<DateTime>.Empty();
var receiptHandle = HeaderResult<string>.Empty();
var replyTo = HeaderResult<string>.Empty();
var subject = HeaderResult<string>.Empty();

Message message;
try
{
_messageAttributes = ReadMessageAttributes(sqsMessage);
var jsonDocument = JsonDocument.Parse(sqsMessage.Body);
_messageAttributes = ReadMessageAttributes(jsonDocument);

topic = ReadTopic();
messageId = ReadMessageId();
Expand All @@ -60,6 +62,7 @@ public Message CreateMessage(Amazon.SQS.Model.Message sqsMessage)
messageType = ReadMessageType();
timeStamp = ReadTimestamp();
replyTo = ReadReplyTo();
subject = ReadMessageSubject(jsonDocument);
receiptHandle = ReadReceiptHandle(sqsMessage);

//TODO:CLOUD_EVENTS parse from headers
Expand All @@ -76,10 +79,10 @@ public Message CreateMessage(Amazon.SQS.Model.Message sqsMessage)
contentType: contentType.Result,
handledCount: handledCount.Result,
dataSchema: null,
subject: null,
subject: subject.Result,
delayedMilliseconds: 0);

message = new Message(messageHeader, ReadMessageBody(sqsMessage));
message = new Message(messageHeader, ReadMessageBody(jsonDocument));

//deserialize the bag
var bag = ReadMessageBag();
Expand All @@ -101,14 +104,12 @@ public Message CreateMessage(Amazon.SQS.Model.Message sqsMessage)
return message;
}

private Dictionary<string, JsonElement> ReadMessageAttributes(Amazon.SQS.Model.Message sqsMessage)
private static Dictionary<string, JsonElement> ReadMessageAttributes(JsonDocument jsonDocument)
{
var messageAttributes = new Dictionary<string, JsonElement>();

try
{
var jsonDocument = JsonDocument.Parse(sqsMessage.Body);

if (jsonDocument.RootElement.TryGetProperty("MessageAttributes", out var attributes))
{
messageAttributes = JsonSerializer.Deserialize<Dictionary<string, JsonElement>>(
Expand Down Expand Up @@ -238,12 +239,27 @@ private HeaderResult<string> ReadTopic()
return new HeaderResult<string>(string.Empty, true);
}

private MessageBody ReadMessageBody(Amazon.SQS.Model.Message sqsMessage)
private static HeaderResult<string> ReadMessageSubject(JsonDocument jsonDocument)
{
try
{
var jsonDocument = JsonDocument.Parse(sqsMessage.Body);
if (jsonDocument.RootElement.TryGetProperty("Subject", out var value))
{
return new HeaderResult<string>(value.GetString(), true);
}
}
catch (Exception ex)
{
s_logger.LogWarning($"Failed to parse Sqs Message Body to valid Json Document, ex: {ex}");
}

return new HeaderResult<string>(null, true);;
}

private static MessageBody ReadMessageBody(JsonDocument jsonDocument)
{
try
{
if (jsonDocument.RootElement.TryGetProperty("Message", out var value))
{
return new MessageBody(value.GetString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public SqsMessagePublisher(string topicArn, AmazonSimpleNotificationServiceClien
public string Publish(Message message)
{
var messageString = message.Body.Value;
var publishRequest = new PublishRequest(_topicArn, messageString);
var publishRequest = new PublishRequest(_topicArn, messageString, message.Header.Subject);

var messageAttributes = new Dictionary<string, MessageAttributeValue>();
messageAttributes.Add(HeaderNames.Id, new MessageAttributeValue{StringValue = Convert.ToString(message.Header.Id), DataType = "String"});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public SqsMessageProducerSendTests()
SqsSubscription<MyCommand> subscription = new(
name: new SubscriptionName(channelName),
channelName: new ChannelName(channelName),
routingKey: routingKey
routingKey: routingKey,
rawMessageDelivery: false
);

_message = new Message(
Expand All @@ -58,15 +59,18 @@ public SqsMessageProducerSendTests()



[Fact]
public async Task When_posting_a_message_via_the_producer()
[Theory]
[InlineData("test subject")]
[InlineData(null)]
public async Task When_posting_a_message_via_the_producer(string subject)
{
//arrange
_message.Header.Subject = subject;
_messageProducer.Send(_message);

await Task.Delay(1000);

var message =_channel.Receive(5000);
var message = _channel.Receive(5000);

//clear the queue
_channel.Acknowledge(message);
Expand All @@ -82,6 +86,7 @@ public async Task When_posting_a_message_via_the_producer()
message.Header.ReplyTo.Should().Be(_replyTo);
message.Header.ContentType.Should().Be(_contentType);
message.Header.HandledCount.Should().Be(0);
message.Header.Subject.Should().Be(subject);
//allow for clock drift in the following test, more important to have a contemporary timestamp than anything
message.Header.TimeStamp.Should().BeAfter(RoundToSeconds(DateTime.UtcNow.AddMinutes(-1)));
message.Header.DelayedMilliseconds.Should().Be(0);
Expand All @@ -96,7 +101,7 @@ public void Dispose()
_messageProducer?.Dispose();
}

private DateTime RoundToSeconds(DateTime dateTime)
private static DateTime RoundToSeconds(DateTime dateTime)
{
return new DateTime(dateTime.Ticks - (dateTime.Ticks % TimeSpan.TicksPerSecond), dateTime.Kind);
}
Expand Down

0 comments on commit 1234f63

Please sign in to comment.