Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement sparse outstanding index for dynamo DB outbox [v10] #3296

Merged
merged 4 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 22 additions & 23 deletions src/Paramore.Brighter.Outbox.DynamoDB/DynamoDbOutbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ await _context.SaveAsync(
message,
_dynamoOverwriteTableConfig,
cancellationToken).ConfigureAwait(ContinueOnCapturedContext);
}
}

/// <summary>
/// Marks a set of messages as dispatched in the Outbox
Expand Down Expand Up @@ -404,13 +404,16 @@ public void MarkDispatched(string id, RequestContext requestContext, DateTimeOff
message,
_dynamoOverwriteTableConfig)
.Wait(_configuration.Timeout);

}

private static void MarkMessageDispatched(DateTimeOffset dispatchedAt, MessageItem message)
{
message.DeliveryTime = dispatchedAt.Ticks;
message.DeliveredAt = dispatchedAt.ToString("yyyy-MM-ddTHH:mm:ss.fffZ");

// Set the outstanding created time to null to remove the attribute
// from the item in dynamo
message.OutstandingCreatedTime = null;
}

/// <summary>
Expand All @@ -423,11 +426,11 @@ private static void MarkMessageDispatched(DateTimeOffset dispatchedAt, MessageIt
/// <param name="args"></param>
/// <returns>A list of messages that are outstanding for dispatch</returns>
public IEnumerable<Message> OutstandingMessages(
TimeSpan dispatchedSince,
RequestContext requestContext,
int pageSize = 100,
int pageNumber = 1,
Dictionary<string, object> args = null)
TimeSpan dispatchedSince,
RequestContext requestContext,
int pageSize = 100,
int pageNumber = 1,
Dictionary<string, object> args = null)
{
return OutstandingMessagesAsync(dispatchedSince, requestContext, pageSize, pageNumber, args)
.GetAwaiter()
Expand Down Expand Up @@ -584,20 +587,20 @@ private async Task<IEnumerable<Message>> OutstandingMessagesForTopicAsync(TimeSp
return queryResult.Messages.Select(msg => msg.ConvertToMessage());
}

private Task<TransactWriteItemsRequest> AddToTransactionWrite(MessageItem messageToStore, DynamoDbUnitOfWork dynamoDbUnitOfWork)
{
var tcs = new TaskCompletionSource<TransactWriteItemsRequest>();
var attributes = _context.ToDocument(messageToStore, _dynamoOverwriteTableConfig).ToAttributeMap();

var transaction = dynamoDbUnitOfWork.GetTransaction();
transaction.TransactItems.Add(new TransactWriteItem{Put = new Put{TableName = _configuration.TableName, Item = attributes}});
tcs.SetResult(transaction);
return tcs.Task;
}
private Task<TransactWriteItemsRequest> AddToTransactionWrite(MessageItem messageToStore, DynamoDbUnitOfWork dynamoDbUnitOfWork)
{
var tcs = new TaskCompletionSource<TransactWriteItemsRequest>();
var attributes = _context.ToDocument(messageToStore, _dynamoOverwriteTableConfig).ToAttributeMap();
var transaction = dynamoDbUnitOfWork.GetTransaction();
transaction.TransactItems.Add(new TransactWriteItem{Put = new Put{TableName = _configuration.TableName, Item = attributes}});
tcs.SetResult(transaction);
return tcs.Task;
}

private async Task<Message> GetMessage(string id, CancellationToken cancellationToken = default)
{
MessageItem messageItem = await _context.LoadAsync<MessageItem>(id, _dynamoOverwriteTableConfig, cancellationToken);
var messageItem = await _context.LoadAsync<MessageItem>(id, _dynamoOverwriteTableConfig, cancellationToken);
return messageItem?.ConvertToMessage() ?? new Message();
}

Expand Down Expand Up @@ -734,14 +737,10 @@ private async Task<OutstandingMessagesQueryResult> PageOutstandingMessagesToBatc
{
do
{
// We get all the messages for topic, added within a time range
// There should be few enough of those that we can efficiently filter for those
// that don't have a delivery date.
var queryConfig = new QueryOperationConfig
{
IndexName = _configuration.OutstandingIndexName,
KeyExpression = new KeyTopicCreatedTimeExpression().Generate(topicName, olderThan, shard),
FilterExpression = new NoDispatchTimeExpression().Generate(),
KeyExpression = new KeyTopicOutstandingCreatedTimeExpression().Generate(topicName, olderThan, shard),
Limit = batchSize - results.Count,
PaginationToken = paginationToken,
ConsistentRead = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@

namespace Paramore.Brighter.Outbox.DynamoDB
{
internal class KeyTopicCreatedTimeExpression
internal class KeyTopicOutstandingCreatedTimeExpression
{
private readonly Expression _expression;

public KeyTopicCreatedTimeExpression()
public KeyTopicOutstandingCreatedTimeExpression()
{
_expression = new Expression { ExpressionStatement = "TopicShard = :v_TopicShard and CreatedTime < :v_CreatedTime" };
_expression = new Expression { ExpressionStatement = "TopicShard = :v_TopicShard and OutstandingCreatedTime < :v_OutstandingCreatedTime" };
}

public override string ToString()
Expand All @@ -22,7 +22,7 @@ public Expression Generate(string topicName, DateTimeOffset createdTime, int sha
{
var values = new Dictionary<string, DynamoDBEntry>();
values.Add(":v_TopicShard", $"{topicName}_{shard}");
values.Add(":v_CreatedTime", createdTime.Ticks);
values.Add(":v_OutstandingCreatedTime", createdTime.Ticks);

_expression.ExpressionAttributeValues = values;

Expand Down
9 changes: 8 additions & 1 deletion src/Paramore.Brighter.Outbox.DynamoDB/MessageItem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,16 @@ public class MessageItem
/// <summary>
/// The time at which the message was created, in ticks
/// </summary>
[DynamoDBGlobalSecondaryIndexRangeKey(indexName: "Outstanding")]
[DynamoDBProperty]
public long CreatedTime { get; set; }

/// <summary>
/// The time at which the message was created, in ticks. Null if the message has been dispatched.
/// </summary>
[DynamoDBGlobalSecondaryIndexRangeKey(indexName: "Outstanding")]
[DynamoDBProperty]
public long? OutstandingCreatedTime { get; set; }

/// <summary>
/// The time at which the message was delivered, formatted as a string yyyy-MM-dd
/// </summary>
Expand Down Expand Up @@ -123,6 +129,7 @@ public MessageItem(Message message, int shard = 0, long? expiresAt = null)
CharacterEncoding = message.Body.CharacterEncoding.ToString();
CreatedAt = date.ToString("yyyy-MM-ddTHH:mm:ss.fffZ");
CreatedTime = date.Ticks;
OutstandingCreatedTime = date.Ticks;
DeliveryTime = 0;
HeaderBag = JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);
MessageId = message.Id.ToString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public async Task When_there_are_outstanding_messages_in_the_outbox_async()
}

[Fact]
public async Task When_there_are_outstanding_messages_in_the_outbox()
public void When_there_are_outstanding_messages_in_the_outbox()
{
var context = new RequestContext();
_dynamoDbOutbox.Add(_message, context);
Expand Down Expand Up @@ -87,7 +87,7 @@ public async Task When_there_are_outstanding_messages_for_multiple_topics_async(
}

[Fact]
public async Task When_there_are_outstanding_messages_for_multiple_topics()
public void When_there_are_outstanding_messages_for_multiple_topics()
{
var messages = new List<Message>();
var context = new RequestContext();
Expand Down Expand Up @@ -149,7 +149,7 @@ public async Task When_there_are_multiple_pages_of_outstanding_messages_for_a_to
}

[Fact]
public async Task When_there_are_multiple_pages_of_outstanding_messages_for_a_topic()
public void When_there_are_multiple_pages_of_outstanding_messages_for_a_topic()
{
var context = new RequestContext();
var messages = new List<Message>();
Expand Down Expand Up @@ -227,7 +227,7 @@ public async Task When_there_are_multiple_pages_of_outstanding_messages_for_all_
}

[Fact]
public async Task When_there_are_multiple_pages_of_outstanding_messages_for_all_topics()
public void When_there_are_multiple_pages_of_outstanding_messages_for_all_topics()
{
var context = new RequestContext();
var messages = new List<Message>();
Expand Down Expand Up @@ -269,6 +269,56 @@ public async Task When_there_are_multiple_pages_of_outstanding_messages_for_all_
}
}

[Fact]
public async Task When_an_outstanding_message_is_dispatched_async()
{
var context = new RequestContext();
await _dynamoDbOutbox.AddAsync(_message, context);

_fakeTimeProvider.Advance(TimeSpan.FromSeconds(1));

var args = new Dictionary<string, object> { { "Topic", "test_topic" } };

var messages = await _dynamoDbOutbox.OutstandingMessagesAsync(TimeSpan.Zero, context, 100, 1, args);

//Other tests may leave messages, so make sure that we grab ours
var message = messages.Single(m => m.Id == _message.Id);
message.Should().NotBeNull();

await _dynamoDbOutbox.MarkDispatchedAsync(_message.Id, context);

// Give the GSI a second to catch up
await Task.Delay(1000);

messages = await _dynamoDbOutbox.OutstandingMessagesAsync(TimeSpan.Zero, context, 100, 1, args);
messages.All(m => m.Id != _message.Id);
}

[Fact]
public async Task When_an_outstanding_message_is_dispatched()
{
var context = new RequestContext();
_dynamoDbOutbox.Add(_message, context);

_fakeTimeProvider.Advance(TimeSpan.FromSeconds(1));

var args = new Dictionary<string, object> { { "Topic", "test_topic" } };

var messages = _dynamoDbOutbox.OutstandingMessages(TimeSpan.Zero, context, 100, 1, args);

//Other tests may leave messages, so make sure that we grab ours
var message = messages.Single(m => m.Id == _message.Id);
message.Should().NotBeNull();

_dynamoDbOutbox.MarkDispatched(_message.Id, context);

// Give the GSI a second to catch up
await Task.Delay(1000);

messages = _dynamoDbOutbox.OutstandingMessages(TimeSpan.Zero, context, 100, 1, args);
messages.All(m => m.Id != _message.Id);
}

private Message CreateMessage(string topic)
{
return new Message(
Expand Down
Loading