Skip to content

Commit

Permalink
Implement sparse outstanding index for dynamo DB outbox
Browse files Browse the repository at this point in the history
  • Loading branch information
dhickie committed Sep 5, 2024
1 parent 2f68e3c commit d8e603f
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 32 deletions.
Binary file removed Docker/dynamodb/shared-local-instance.db
Binary file not shown.
45 changes: 22 additions & 23 deletions src/Paramore.Brighter.Outbox.DynamoDB/DynamoDbOutbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ await _context.SaveAsync(
message,
_dynamoOverwriteTableConfig,
cancellationToken).ConfigureAwait(ContinueOnCapturedContext);
}
}

/// <summary>
/// Marks a set of messages as dispatched in the Outbox
Expand Down Expand Up @@ -404,13 +404,16 @@ public void MarkDispatched(string id, RequestContext requestContext, DateTimeOff
message,
_dynamoOverwriteTableConfig)
.Wait(_configuration.Timeout);

}

private static void MarkMessageDispatched(DateTimeOffset dispatchedAt, MessageItem message)
{
message.DeliveryTime = dispatchedAt.Ticks;
message.DeliveredAt = dispatchedAt.ToString("yyyy-MM-ddTHH:mm:ss.fffZ");

// Set the outstanding created time to null to remove the attribute
// from the item in dynamo
message.OutstandingCreatedTime = null;
}

/// <summary>
Expand All @@ -423,11 +426,11 @@ private static void MarkMessageDispatched(DateTimeOffset dispatchedAt, MessageIt
/// <param name="args"></param>
/// <returns>A list of messages that are outstanding for dispatch</returns>
public IEnumerable<Message> OutstandingMessages(
TimeSpan dispatchedSince,
RequestContext requestContext,
int pageSize = 100,
int pageNumber = 1,
Dictionary<string, object> args = null)
TimeSpan dispatchedSince,
RequestContext requestContext,
int pageSize = 100,
int pageNumber = 1,
Dictionary<string, object> args = null)
{
return OutstandingMessagesAsync(dispatchedSince, requestContext, pageSize, pageNumber, args)
.GetAwaiter()
Expand Down Expand Up @@ -584,20 +587,20 @@ private async Task<IEnumerable<Message>> OutstandingMessagesForTopicAsync(TimeSp
return queryResult.Messages.Select(msg => msg.ConvertToMessage());
}

private Task<TransactWriteItemsRequest> AddToTransactionWrite(MessageItem messageToStore, DynamoDbUnitOfWork dynamoDbUnitOfWork)
{
var tcs = new TaskCompletionSource<TransactWriteItemsRequest>();
var attributes = _context.ToDocument(messageToStore, _dynamoOverwriteTableConfig).ToAttributeMap();

var transaction = dynamoDbUnitOfWork.GetTransaction();
transaction.TransactItems.Add(new TransactWriteItem{Put = new Put{TableName = _configuration.TableName, Item = attributes}});
tcs.SetResult(transaction);
return tcs.Task;
}
private Task<TransactWriteItemsRequest> AddToTransactionWrite(MessageItem messageToStore, DynamoDbUnitOfWork dynamoDbUnitOfWork)
{
var tcs = new TaskCompletionSource<TransactWriteItemsRequest>();
var attributes = _context.ToDocument(messageToStore, _dynamoOverwriteTableConfig).ToAttributeMap();
var transaction = dynamoDbUnitOfWork.GetTransaction();
transaction.TransactItems.Add(new TransactWriteItem{Put = new Put{TableName = _configuration.TableName, Item = attributes}});
tcs.SetResult(transaction);
return tcs.Task;
}

private async Task<Message> GetMessage(string id, CancellationToken cancellationToken = default)
{
MessageItem messageItem = await _context.LoadAsync<MessageItem>(id, _dynamoOverwriteTableConfig, cancellationToken);
var messageItem = await _context.LoadAsync<MessageItem>(id, _dynamoOverwriteTableConfig, cancellationToken);
return messageItem?.ConvertToMessage() ?? new Message();
}

Expand Down Expand Up @@ -734,14 +737,10 @@ private async Task<OutstandingMessagesQueryResult> PageOutstandingMessagesToBatc
{
do
{
// We get all the messages for topic, added within a time range
// There should be few enough of those that we can efficiently filter for those
// that don't have a delivery date.
var queryConfig = new QueryOperationConfig
{
IndexName = _configuration.OutstandingIndexName,
KeyExpression = new KeyTopicCreatedTimeExpression().Generate(topicName, olderThan, shard),
FilterExpression = new NoDispatchTimeExpression().Generate(),
KeyExpression = new KeyTopicOutstandingCreatedTimeExpression().Generate(topicName, olderThan, shard),
Limit = batchSize - results.Count,
PaginationToken = paginationToken,
ConsistentRead = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@

namespace Paramore.Brighter.Outbox.DynamoDB
{
internal class KeyTopicCreatedTimeExpression
internal class KeyTopicOutstandingCreatedTimeExpression
{
private readonly Expression _expression;

public KeyTopicCreatedTimeExpression()
public KeyTopicOutstandingCreatedTimeExpression()
{
_expression = new Expression { ExpressionStatement = "TopicShard = :v_TopicShard and CreatedTime < :v_CreatedTime" };
_expression = new Expression { ExpressionStatement = "TopicShard = :v_TopicShard and OutstandingCreatedTime < :v_OutstandingCreatedTime" };
}

public override string ToString()
Expand All @@ -22,7 +22,7 @@ public Expression Generate(string topicName, DateTimeOffset createdTime, int sha
{
var values = new Dictionary<string, DynamoDBEntry>();
values.Add(":v_TopicShard", $"{topicName}_{shard}");
values.Add(":v_CreatedTime", createdTime.Ticks);
values.Add(":v_OutstandingCreatedTime", createdTime.Ticks);

_expression.ExpressionAttributeValues = values;

Expand Down
9 changes: 8 additions & 1 deletion src/Paramore.Brighter.Outbox.DynamoDB/MessageItem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,16 @@ public class MessageItem
/// <summary>
/// The time at which the message was created, in ticks
/// </summary>
[DynamoDBGlobalSecondaryIndexRangeKey(indexName: "Outstanding")]
[DynamoDBProperty]
public long CreatedTime { get; set; }

/// <summary>
/// The time at which the message was created, in ticks. Null if the message has been dispatched.
/// </summary>
[DynamoDBGlobalSecondaryIndexRangeKey(indexName: "Outstanding")]
[DynamoDBProperty]
public long? OutstandingCreatedTime { get; set; }

/// <summary>
/// The time at which the message was delivered, formatted as a string yyyy-MM-dd
/// </summary>
Expand Down Expand Up @@ -123,6 +129,7 @@ public MessageItem(Message message, int shard = 0, long? expiresAt = null)
CharacterEncoding = message.Body.CharacterEncoding.ToString();
CreatedAt = date.ToString("yyyy-MM-ddTHH:mm:ss.fffZ");
CreatedTime = date.Ticks;
OutstandingCreatedTime = date.Ticks;
DeliveryTime = 0;
HeaderBag = JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);
MessageId = message.Id.ToString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public async Task When_there_are_outstanding_messages_in_the_outbox_async()
}

[Fact]
public async Task When_there_are_outstanding_messages_in_the_outbox()
public void When_there_are_outstanding_messages_in_the_outbox()
{
var context = new RequestContext();
_dynamoDbOutbox.Add(_message, context);
Expand Down Expand Up @@ -87,7 +87,7 @@ public async Task When_there_are_outstanding_messages_for_multiple_topics_async(
}

[Fact]
public async Task When_there_are_outstanding_messages_for_multiple_topics()
public void When_there_are_outstanding_messages_for_multiple_topics()
{
var messages = new List<Message>();
var context = new RequestContext();
Expand Down Expand Up @@ -149,7 +149,7 @@ public async Task When_there_are_multiple_pages_of_outstanding_messages_for_a_to
}

[Fact]
public async Task When_there_are_multiple_pages_of_outstanding_messages_for_a_topic()
public void When_there_are_multiple_pages_of_outstanding_messages_for_a_topic()
{
var context = new RequestContext();
var messages = new List<Message>();
Expand Down Expand Up @@ -227,7 +227,7 @@ public async Task When_there_are_multiple_pages_of_outstanding_messages_for_all_
}

[Fact]
public async Task When_there_are_multiple_pages_of_outstanding_messages_for_all_topics()
public void When_there_are_multiple_pages_of_outstanding_messages_for_all_topics()
{
var context = new RequestContext();
var messages = new List<Message>();
Expand Down Expand Up @@ -269,6 +269,56 @@ public async Task When_there_are_multiple_pages_of_outstanding_messages_for_all_
}
}

[Fact]
public async Task When_an_outstanding_message_is_dispatched_async()
{
var context = new RequestContext();
await _dynamoDbOutbox.AddAsync(_message, context);

await Task.Delay(1000);

var args = new Dictionary<string, object> { { "Topic", "test_topic" } };

var messages = await _dynamoDbOutbox.OutstandingMessagesAsync(TimeSpan.Zero, context, 100, 1, args);

//Other tests may leave messages, so make sure that we grab ours
var message = messages.Single(m => m.Id == _message.Id);
message.Should().NotBeNull();

await _dynamoDbOutbox.MarkDispatchedAsync(_message.Id, context);

// Give the GSI a second to catch up
await Task.Delay(1000);

messages = await _dynamoDbOutbox.OutstandingMessagesAsync(TimeSpan.Zero, context, 100, 1, args);
messages.All(m => m.Id != _message.Id);
}

[Fact]
public async Task When_an_outstanding_message_is_dispatched()
{
var context = new RequestContext();
_dynamoDbOutbox.Add(_message, context);

await Task.Delay(1000);

var args = new Dictionary<string, object> { { "Topic", "test_topic" } };

var messages = _dynamoDbOutbox.OutstandingMessages(TimeSpan.Zero, context, 100, 1, args);

//Other tests may leave messages, so make sure that we grab ours
var message = messages.Single(m => m.Id == _message.Id);
message.Should().NotBeNull();

_dynamoDbOutbox.MarkDispatched(_message.Id, context);

// Give the GSI a second to catch up
await Task.Delay(1000);

messages = _dynamoDbOutbox.OutstandingMessages(TimeSpan.Zero, context, 100, 1, args);
messages.All(m => m.Id != _message.Id);
}

private Message CreateMessage(string topic)
{
return new Message(
Expand Down

0 comments on commit d8e603f

Please sign in to comment.