Skip to content

Commit

Permalink
Fix bug in MsSql Outbox and plumb in more efficently Outbox Archive M…
Browse files Browse the repository at this point in the history
…ethod (#2670)
  • Loading branch information
preardon authored May 16, 2023
1 parent 254c426 commit faae1f4
Show file tree
Hide file tree
Showing 12 changed files with 111 additions and 19 deletions.
42 changes: 27 additions & 15 deletions src/Paramore.Brighter.Extensions.Hosting/TimedOutboxArchiver.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Paramore.Brighter.Logging;
Expand All @@ -17,6 +16,8 @@ public class TimedOutboxArchiver : IHostedService, IDisposable
private IAmAnArchiveProvider _archiveProvider;
private Timer _timer;

private static readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);

public TimedOutboxArchiver(IAmAnOutbox<Message> outbox, IAmAnArchiveProvider archiveProvider,
TimedOutboxArchiverOptions options)
{
Expand All @@ -29,7 +30,7 @@ public Task StartAsync(CancellationToken cancellationToken)
{
s_logger.LogInformation("Outbox Archiver Service is starting.");

_timer = new Timer(Archive, null, TimeSpan.Zero, TimeSpan.FromSeconds(_options.TimerInterval));
_timer = new Timer(async (e) => await Archive(e, cancellationToken), null, TimeSpan.Zero, TimeSpan.FromSeconds(_options.TimerInterval));

return Task.CompletedTask;
}
Expand All @@ -48,25 +49,36 @@ public void Dispose()
_timer.Dispose();
}

private void Archive(object state)
private async Task Archive(object state, CancellationToken cancellationToken)
{
s_logger.LogInformation("Outbox Archiver looking for messages to Archive");

try
if (await _semaphore.WaitAsync(TimeSpan.Zero, cancellationToken))
{
var outBoxArchiver = new OutboxArchiver(
_outbox,
_archiveProvider,
_options.BatchSize);
s_logger.LogInformation("Outbox Archiver looking for messages to Archive");
try
{
var outBoxArchiver = new OutboxArchiver(
_outbox,
_archiveProvider,
_options.BatchSize);

outBoxArchiver.Archive(_options.MinimumAge);
await outBoxArchiver.ArchiveAsync(_options.MinimumAge, cancellationToken);
}
catch (Exception e)
{
s_logger.LogError(e, "Error while sweeping the outbox.");
}
finally
{
_semaphore.Release();
}

s_logger.LogInformation("Outbox Sweeper sleeping");
}
catch (Exception e)
else
{
s_logger.LogError(e, "Error while sweeping the outbox.");
s_logger.LogWarning("Outbox Archiver is still running - abandoning attempt.");
}

s_logger.LogInformation("Outbox Sweeper sleeping");

}
}
}
6 changes: 6 additions & 0 deletions src/Paramore.Brighter.Outbox.DynamoDB/DynamoDbOutbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,12 @@ public Task DeleteAsync(CancellationToken cancellationToken, params Guid[] messa
throw new NotImplementedException();
}

public Task<IEnumerable<Message>> DispatchedMessagesAsync(int hoursDispatchedSince, int pageSize = 100,
CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}

private Task<TransactWriteItemsRequest> AddToTransactionWrite(MessageItem messageToStore, DynamoDbUnitOfWork dynamoDbUnitOfWork)
{
var tcs = new TaskCompletionSource<TransactWriteItemsRequest>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,12 @@ public Task DeleteAsync(CancellationToken cancellationToken, params Guid[] messa
throw new NotImplementedException();
}

public Task<IEnumerable<Message>> DispatchedMessagesAsync(int hoursDispatchedSince, int pageSize = 100,
CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}

private static int ExtractEventNumberFromHeader(Dictionary<string, object> headerBag, Guid messageId)
{
object version;
Expand Down
4 changes: 3 additions & 1 deletion src/Paramore.Brighter.Outbox.MsSql/MsSqlQueries.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
{
public class MsSqlQueries : IRelationDatabaseOutboxQueries
{
public string PagedDispatchedCommand { get; } = "SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY Timestamp DESC) AS NUMBER, * FROM {0}) AS TBL WHERE DISPATCHED IS NOT NULL AND DISPATCHED < DATEADD(millisecond, @OutStandingSince, getutcdate()) AND NUMBER BETWEEN ((@PageNumber-1)*@PageSize+1) AND (@PageNumber*@PageSize) ORDER BY Timestamp DESC";
public string PagedDispatchedCommand { get; } = "SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY Timestamp DESC) AS NUMBER, * FROM {0}) AS TBL WHERE DISPATCHED IS NOT NULL AND DISPATCHED < DATEADD(millisecond, -@OutStandingSince, getutcdate()) AND NUMBER BETWEEN ((@PageNumber-1)*@PageSize+1) AND (@PageNumber*@PageSize) ORDER BY Timestamp DESC";
public string PagedReadCommand { get; } = "SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY Timestamp DESC) AS NUMBER, * FROM {0}) AS TBL WHERE NUMBER BETWEEN ((@PageNumber-1)*@PageSize+1) AND (@PageNumber*@PageSize) ORDER BY Timestamp DESC";
public string PagedOutstandingCommand { get; } = "SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY Timestamp ASC) AS NUMBER, * FROM {0} WHERE DISPATCHED IS NULL) AS TBL WHERE TIMESTAMP < DATEADD(millisecond, -@OutStandingSince, getutcdate()) AND NUMBER BETWEEN ((@PageNumber-1)*@PageSize+1) AND (@PageNumber*@PageSize) ORDER BY Timestamp ASC";
public string AddCommand { get; } = "INSERT INTO {0} (MessageId, MessageType, Topic, Timestamp, CorrelationId, ReplyTo, ContentType, HeaderBag, Body) VALUES (@MessageId, @MessageType, @Topic, @Timestamp, @CorrelationId, @ReplyTo, @ContentType, @HeaderBag, @Body)";
Expand All @@ -12,5 +12,7 @@ public class MsSqlQueries : IRelationDatabaseOutboxQueries
public string GetMessageCommand { get; } = "SELECT * FROM {0} WHERE MessageId = @MessageId";
public string GetMessagesCommand { get; } = "SELECT * FROM {0} WHERE MessageId IN ( {1} )";
public string DeleteMessagesCommand { get; } = "DELETE FROM {0} WHERE MessageId IN ( {1} )";

public string DispatchedCommand { get; } = "Select top(@PageSize) * FROM {0} WHERE Dispatched is not NULL and Dispatched < DATEADD(hour, -@DispatchedSince, getutcdate()) Order BY Dispatched";
}
}
2 changes: 2 additions & 0 deletions src/Paramore.Brighter.Outbox.MySql/MySqlQueries.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,7 @@ public class MySqlQueries : IRelationDatabaseOutboxQueries
public string GetMessageCommand { get; } = "SELECT * FROM {0} WHERE MessageId = @MessageId";
public string GetMessagesCommand { get; } = "SELECT * FROM {0} WHERE `MessageID` IN ( {1} )";
public string DeleteMessagesCommand { get; } = "DELETE FROM {0} WHERE MessageId IN ( {1} )";

public string DispatchedCommand { get; } = "Select * FROM {0} WHERE Dispatched is not NULL and Dispatched < DATEADD(hour, -@DispatchedSince, getutcdate()) LIMIT @PageSize Order BY Dispatched";
}
}
2 changes: 2 additions & 0 deletions src/Paramore.Brighter.Outbox.Sqlite/SqliteQueries.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,7 @@ public class SqliteQueries : IRelationDatabaseOutboxQueries
public string GetMessageCommand { get; } = "SELECT * FROM {0} WHERE MessageId = @MessageId";
public string GetMessagesCommand { get; } = "SELECT * FROM {0} WHERE MessageId IN ( {1} )";
public string DeleteMessagesCommand { get; } = "DELETE FROM {0} WHERE MessageId IN ( {1} )";

public string DispatchedCommand { get; } = "Select top(@PageSize) * FROM {0} WHERE Dispatched is not NULL and Dispatched < DATEADD(hour, -@DispatchedSince, getutcdate()) Order BY Dispatched";
}
}
12 changes: 12 additions & 0 deletions src/Paramore.Brighter/IAmAnOutboxAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -146,5 +146,17 @@ Task<IEnumerable<Message>> OutstandingMessagesAsync(
/// <param name="cancellationToken">The Cancellation Token</param>
/// <param name="messageIds">The id of the message to delete</param>
Task DeleteAsync(CancellationToken cancellationToken, params Guid[] messageIds);

/// <summary>
/// Get the messages that have been dispatched
/// </summary>
/// <param name="hoursDispatchedSince">The number of hours since the message was dispatched</param>
/// <param name="pageSize">The amount to return</param>
/// <param name="cancellationToken">The Cancellation Token</param>
/// <returns>Messages that have already been dispatched</returns>
Task<IEnumerable<Message>> DispatchedMessagesAsync(
int hoursDispatchedSince,
int pageSize = 100,
CancellationToken cancellationToken = default);
}
}
1 change: 1 addition & 0 deletions src/Paramore.Brighter/IRelationDatabaseOutboxQueries.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ public interface IRelationDatabaseOutboxQueries
string GetMessageCommand { get; }
string GetMessagesCommand { get; }
string DeleteMessagesCommand { get; }
string DispatchedCommand { get; }
}
}
16 changes: 16 additions & 0 deletions src/Paramore.Brighter/InMemoryOutbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -368,5 +368,21 @@ public Task DeleteAsync(CancellationToken cancellationToken, params Guid[] messa
Delete(messageIds);
return Task.CompletedTask;
}

public IEnumerable<Message> DispatchedMessages(int hoursDispatchedSince, int pageSize = 100)
{
ClearExpiredMessages();

DateTime dispatchedSince = DateTime.UtcNow.AddHours( -1 * hoursDispatchedSince);
return _requests.Values.Where(oe => (oe.TimeFlushed != DateTime.MinValue) && (oe.TimeFlushed >= dispatchedSince))
.Take(pageSize)
.Select(oe => oe.Message).ToArray();
}

public Task<IEnumerable<Message>> DispatchedMessagesAsync(int hoursDispatchedSince, int pageSize = 100,
CancellationToken cancellationToken = default)
{
return Task.FromResult(DispatchedMessages(hoursDispatchedSince, pageSize));
}
}
}
4 changes: 1 addition & 3 deletions src/Paramore.Brighter/OutboxArchiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,10 @@ public void Archive(int minimumAge)
public async Task ArchiveAsync(int minimumAge, CancellationToken cancellationToken)
{
var activity = ApplicationTelemetry.ActivitySource.StartActivity(ARCHIVEOUTBOX, ActivityKind.Server);

var age = TimeSpan.FromHours(minimumAge);

try
{
var messages = await _outboxAsync.DispatchedMessagesAsync(age.Milliseconds, _batchSize,
var messages = await _outboxAsync.DispatchedMessagesAsync(minimumAge, _batchSize,
cancellationToken: cancellationToken);

if (!messages.Any()) return;
Expand Down
20 changes: 20 additions & 0 deletions src/Paramore.Brighter/RelationDatabaseOutbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,20 @@ public Task DeleteAsync(CancellationToken cancellationToken, params Guid[] messa
cancellationToken);
}

/// <summary>
/// Get the messages that have been dispatched
/// </summary>
/// <param name="hoursDispatchedSince">The number of hours since the message was dispatched</param>
/// <param name="pageSize">The amount to return</param>
/// <param name="cancellationToken">The Cancellation Token</param>
/// <returns>Messages that have already been dispatched</returns>
public Task<IEnumerable<Message>> DispatchedMessagesAsync(int hoursDispatchedSince, int pageSize = 100,
CancellationToken cancellationToken = default)
{
return ReadFromStoreAsync(connection => CreatePagedDispatchedCommand(connection, hoursDispatchedSince, pageSize),
dr => MapListFunctionAsync(dr, cancellationToken), cancellationToken);
}

#endregion

protected abstract void WriteToStore(IAmABoxTransactionConnectionProvider transactionConnectionProvider,
Expand All @@ -346,6 +360,12 @@ private TCommand CreatePagedDispatchedCommand(TConnection connection, double mil
=> CreateCommand(connection, GenerateSqlText(_queries.PagedDispatchedCommand), 0,
CreateSqlParameter("PageNumber", pageNumber), CreateSqlParameter("PageSize", pageSize),
CreateSqlParameter("OutstandingSince", -1 * millisecondsDispatchedSince));

private TCommand CreatePagedDispatchedCommand(TConnection connection, int hoursDispatchedSince,
int pageSize)
=> CreateCommand(connection, GenerateSqlText(_queries.PagedDispatchedCommand), 0,
CreateSqlParameter("PageSize", pageSize),
CreateSqlParameter("DispatchedSince", -1 * hoursDispatchedSince));

private TCommand CreatePagedReadCommand(TConnection connection, int pageSize, int pageNumber)
=> CreateCommand(connection, GenerateSqlText(_queries.PagedReadCommand), 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,21 @@ public Task DeleteAsync(CancellationToken cancellationToken, params Guid[] messa
return Task.CompletedTask;
}

public Task<IEnumerable<Message>> DispatchedMessagesAsync(int hoursDispatchedSince, int pageSize = 100,
CancellationToken cancellationToken = default)
{
return Task.FromResult(DispatchedMessages(hoursDispatchedSince, pageSize));
}

public IEnumerable<Message> DispatchedMessages(int hoursDispatchedSince, int pageSize = 100,
CancellationToken cancellationToken = default)
{
var ago = hoursDispatchedSince * -1;
var now = DateTime.UtcNow;
var messagesSince = now.AddHours(ago);
return _posts.Where(oe => oe.TimeFlushed >= messagesSince).Select(oe => oe.Message).Take(pageSize).ToArray();
}

public void MarkDispatched(Guid id, DateTime? dispatchedAt = null, Dictionary<string, object> args = null)
{
var entry = _posts.SingleOrDefault(oe => oe.Message.Id == id);
Expand Down

0 comments on commit faae1f4

Please sign in to comment.