diff --git a/src/Paramore.Brighter.Outbox.DynamoDB/DynamoDbOutbox.cs b/src/Paramore.Brighter.Outbox.DynamoDB/DynamoDbOutbox.cs index 17efcd774..1943ad7ec 100644 --- a/src/Paramore.Brighter.Outbox.DynamoDB/DynamoDbOutbox.cs +++ b/src/Paramore.Brighter.Outbox.DynamoDB/DynamoDbOutbox.cs @@ -365,7 +365,7 @@ await _context.SaveAsync( message, _dynamoOverwriteTableConfig, cancellationToken).ConfigureAwait(ContinueOnCapturedContext); - } + } /// /// Marks a set of messages as dispatched in the Outbox @@ -404,13 +404,16 @@ public void MarkDispatched(string id, RequestContext requestContext, DateTimeOff message, _dynamoOverwriteTableConfig) .Wait(_configuration.Timeout); - } private static void MarkMessageDispatched(DateTimeOffset dispatchedAt, MessageItem message) { message.DeliveryTime = dispatchedAt.Ticks; message.DeliveredAt = dispatchedAt.ToString("yyyy-MM-ddTHH:mm:ss.fffZ"); + + // Set the outstanding created time to null to remove the attribute + // from the item in dynamo + message.OutstandingCreatedTime = null; } /// @@ -423,11 +426,11 @@ private static void MarkMessageDispatched(DateTimeOffset dispatchedAt, MessageIt /// /// A list of messages that are outstanding for dispatch public IEnumerable OutstandingMessages( - TimeSpan dispatchedSince, - RequestContext requestContext, - int pageSize = 100, - int pageNumber = 1, - Dictionary args = null) + TimeSpan dispatchedSince, + RequestContext requestContext, + int pageSize = 100, + int pageNumber = 1, + Dictionary args = null) { return OutstandingMessagesAsync(dispatchedSince, requestContext, pageSize, pageNumber, args) .GetAwaiter() @@ -584,20 +587,20 @@ private async Task> OutstandingMessagesForTopicAsync(TimeSp return queryResult.Messages.Select(msg => msg.ConvertToMessage()); } - private Task AddToTransactionWrite(MessageItem messageToStore, DynamoDbUnitOfWork dynamoDbUnitOfWork) - { - var tcs = new TaskCompletionSource(); - var attributes = _context.ToDocument(messageToStore, _dynamoOverwriteTableConfig).ToAttributeMap(); - - var transaction = dynamoDbUnitOfWork.GetTransaction(); - transaction.TransactItems.Add(new TransactWriteItem{Put = new Put{TableName = _configuration.TableName, Item = attributes}}); - tcs.SetResult(transaction); - return tcs.Task; - } + private Task AddToTransactionWrite(MessageItem messageToStore, DynamoDbUnitOfWork dynamoDbUnitOfWork) + { + var tcs = new TaskCompletionSource(); + var attributes = _context.ToDocument(messageToStore, _dynamoOverwriteTableConfig).ToAttributeMap(); + + var transaction = dynamoDbUnitOfWork.GetTransaction(); + transaction.TransactItems.Add(new TransactWriteItem{Put = new Put{TableName = _configuration.TableName, Item = attributes}}); + tcs.SetResult(transaction); + return tcs.Task; + } private async Task GetMessage(string id, CancellationToken cancellationToken = default) { - MessageItem messageItem = await _context.LoadAsync(id, _dynamoOverwriteTableConfig, cancellationToken); + var messageItem = await _context.LoadAsync(id, _dynamoOverwriteTableConfig, cancellationToken); return messageItem?.ConvertToMessage() ?? new Message(); } @@ -734,14 +737,10 @@ private async Task PageOutstandingMessagesToBatc { do { - // We get all the messages for topic, added within a time range - // There should be few enough of those that we can efficiently filter for those - // that don't have a delivery date. var queryConfig = new QueryOperationConfig { IndexName = _configuration.OutstandingIndexName, - KeyExpression = new KeyTopicCreatedTimeExpression().Generate(topicName, olderThan, shard), - FilterExpression = new NoDispatchTimeExpression().Generate(), + KeyExpression = new KeyTopicOutstandingCreatedTimeExpression().Generate(topicName, olderThan, shard), Limit = batchSize - results.Count, PaginationToken = paginationToken, ConsistentRead = false diff --git a/src/Paramore.Brighter.Outbox.DynamoDB/KeyTopicCreatedTimeExpression.cs b/src/Paramore.Brighter.Outbox.DynamoDB/KeyTopicOutstandingCreatedTimeExpression.cs similarity index 72% rename from src/Paramore.Brighter.Outbox.DynamoDB/KeyTopicCreatedTimeExpression.cs rename to src/Paramore.Brighter.Outbox.DynamoDB/KeyTopicOutstandingCreatedTimeExpression.cs index af1f2902f..fbff217e9 100644 --- a/src/Paramore.Brighter.Outbox.DynamoDB/KeyTopicCreatedTimeExpression.cs +++ b/src/Paramore.Brighter.Outbox.DynamoDB/KeyTopicOutstandingCreatedTimeExpression.cs @@ -4,13 +4,13 @@ namespace Paramore.Brighter.Outbox.DynamoDB { - internal class KeyTopicCreatedTimeExpression + internal class KeyTopicOutstandingCreatedTimeExpression { private readonly Expression _expression; - public KeyTopicCreatedTimeExpression() + public KeyTopicOutstandingCreatedTimeExpression() { - _expression = new Expression { ExpressionStatement = "TopicShard = :v_TopicShard and CreatedTime < :v_CreatedTime" }; + _expression = new Expression { ExpressionStatement = "TopicShard = :v_TopicShard and OutstandingCreatedTime < :v_OutstandingCreatedTime" }; } public override string ToString() @@ -22,7 +22,7 @@ public Expression Generate(string topicName, DateTimeOffset createdTime, int sha { var values = new Dictionary(); values.Add(":v_TopicShard", $"{topicName}_{shard}"); - values.Add(":v_CreatedTime", createdTime.Ticks); + values.Add(":v_OutstandingCreatedTime", createdTime.Ticks); _expression.ExpressionAttributeValues = values; diff --git a/src/Paramore.Brighter.Outbox.DynamoDB/MessageItem.cs b/src/Paramore.Brighter.Outbox.DynamoDB/MessageItem.cs index 428fe1dfa..377e9075a 100644 --- a/src/Paramore.Brighter.Outbox.DynamoDB/MessageItem.cs +++ b/src/Paramore.Brighter.Outbox.DynamoDB/MessageItem.cs @@ -42,10 +42,16 @@ public class MessageItem /// /// The time at which the message was created, in ticks /// - [DynamoDBGlobalSecondaryIndexRangeKey(indexName: "Outstanding")] [DynamoDBProperty] public long CreatedTime { get; set; } + /// + /// The time at which the message was created, in ticks. Null if the message has been dispatched. + /// + [DynamoDBGlobalSecondaryIndexRangeKey(indexName: "Outstanding")] + [DynamoDBProperty] + public long? OutstandingCreatedTime { get; set; } + /// /// The time at which the message was delivered, formatted as a string yyyy-MM-dd /// @@ -123,6 +129,7 @@ public MessageItem(Message message, int shard = 0, long? expiresAt = null) CharacterEncoding = message.Body.CharacterEncoding.ToString(); CreatedAt = date.ToString("yyyy-MM-ddTHH:mm:ss.fffZ"); CreatedTime = date.Ticks; + OutstandingCreatedTime = date.Ticks; DeliveryTime = 0; HeaderBag = JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options); MessageId = message.Id.ToString(); diff --git a/tests/Paramore.Brighter.DynamoDB.Tests/Outbox/When_there_are_outstanding_messages_in_the_outbox.cs b/tests/Paramore.Brighter.DynamoDB.Tests/Outbox/When_there_are_outstanding_messages_in_the_outbox.cs index 7035856ba..dcdac185e 100644 --- a/tests/Paramore.Brighter.DynamoDB.Tests/Outbox/When_there_are_outstanding_messages_in_the_outbox.cs +++ b/tests/Paramore.Brighter.DynamoDB.Tests/Outbox/When_there_are_outstanding_messages_in_the_outbox.cs @@ -42,7 +42,7 @@ public async Task When_there_are_outstanding_messages_in_the_outbox_async() } [Fact] - public async Task When_there_are_outstanding_messages_in_the_outbox() + public void When_there_are_outstanding_messages_in_the_outbox() { var context = new RequestContext(); _dynamoDbOutbox.Add(_message, context); @@ -87,7 +87,7 @@ public async Task When_there_are_outstanding_messages_for_multiple_topics_async( } [Fact] - public async Task When_there_are_outstanding_messages_for_multiple_topics() + public void When_there_are_outstanding_messages_for_multiple_topics() { var messages = new List(); var context = new RequestContext(); @@ -149,7 +149,7 @@ public async Task When_there_are_multiple_pages_of_outstanding_messages_for_a_to } [Fact] - public async Task When_there_are_multiple_pages_of_outstanding_messages_for_a_topic() + public void When_there_are_multiple_pages_of_outstanding_messages_for_a_topic() { var context = new RequestContext(); var messages = new List(); @@ -227,7 +227,7 @@ public async Task When_there_are_multiple_pages_of_outstanding_messages_for_all_ } [Fact] - public async Task When_there_are_multiple_pages_of_outstanding_messages_for_all_topics() + public void When_there_are_multiple_pages_of_outstanding_messages_for_all_topics() { var context = new RequestContext(); var messages = new List(); @@ -269,6 +269,56 @@ public async Task When_there_are_multiple_pages_of_outstanding_messages_for_all_ } } + [Fact] + public async Task When_an_outstanding_message_is_dispatched_async() + { + var context = new RequestContext(); + await _dynamoDbOutbox.AddAsync(_message, context); + + _fakeTimeProvider.Advance(TimeSpan.FromSeconds(1)); + + var args = new Dictionary { { "Topic", "test_topic" } }; + + var messages = await _dynamoDbOutbox.OutstandingMessagesAsync(TimeSpan.Zero, context, 100, 1, args); + + //Other tests may leave messages, so make sure that we grab ours + var message = messages.Single(m => m.Id == _message.Id); + message.Should().NotBeNull(); + + await _dynamoDbOutbox.MarkDispatchedAsync(_message.Id, context); + + // Give the GSI a second to catch up + await Task.Delay(1000); + + messages = await _dynamoDbOutbox.OutstandingMessagesAsync(TimeSpan.Zero, context, 100, 1, args); + messages.All(m => m.Id != _message.Id); + } + + [Fact] + public async Task When_an_outstanding_message_is_dispatched() + { + var context = new RequestContext(); + _dynamoDbOutbox.Add(_message, context); + + _fakeTimeProvider.Advance(TimeSpan.FromSeconds(1)); + + var args = new Dictionary { { "Topic", "test_topic" } }; + + var messages = _dynamoDbOutbox.OutstandingMessages(TimeSpan.Zero, context, 100, 1, args); + + //Other tests may leave messages, so make sure that we grab ours + var message = messages.Single(m => m.Id == _message.Id); + message.Should().NotBeNull(); + + _dynamoDbOutbox.MarkDispatched(_message.Id, context); + + // Give the GSI a second to catch up + await Task.Delay(1000); + + messages = _dynamoDbOutbox.OutstandingMessages(TimeSpan.Zero, context, 100, 1, args); + messages.All(m => m.Id != _message.Id); + } + private Message CreateMessage(string topic) { return new Message(