Skip to content

Commit

Permalink
Fixes around datetime and outboxes in V10 (#3298)
Browse files Browse the repository at this point in the history
* fix: move to datetimeoffset; fix age issue where dispatched should be less than

* fix: remove fakecommandprocessor as nothing being tested; inject timeprovider into ExternalBusService.cs as tests have disparate time providers otherwise

* fix: usage of timeprovider

* checkin: working on passing these tests without fake

* fix: tests now use real command processor
  • Loading branch information
iancooper authored Sep 11, 2024
1 parent 5a2b04d commit edcde57
Show file tree
Hide file tree
Showing 20 changed files with 363 additions and 373 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ private static IAmAnExternalBusService BuildExternalBus(IServiceProvider service
busConfiguration.MaxOutStandingCheckInterval,
busConfiguration.OutBoxBag,
busConfiguration.ArchiveBatchSize,
TimeProvider.System,
busConfiguration.InstrumentationOptions);
}

Expand Down
2 changes: 1 addition & 1 deletion src/Paramore.Brighter.Outbox.DynamoDB/MessageItem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public MessageItem()

public MessageItem(Message message, int shard = 0, long? expiresAt = null)
{
var date = message.Header.TimeStamp == DateTime.MinValue ? DateTime.UtcNow : message.Header.TimeStamp;
var date = message.Header.TimeStamp == DateTimeOffset.MinValue ? DateTimeOffset.UtcNow : message.Header.TimeStamp;

Body = message.Body.Bytes;
ContentType = message.Header.ContentType;
Expand Down
6 changes: 3 additions & 3 deletions src/Paramore.Brighter.Outbox.MsSql/MsSqlOutbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -323,11 +323,11 @@ private string GetCorrelationId(DbDataReader dr)
return correlationId;
}

private static DateTime GetTimeStamp(DbDataReader dr)
private static DateTimeOffset GetTimeStamp(DbDataReader dr)
{
var ordinal = dr.GetOrdinal("Timestamp");
var timeStamp = dr.IsDBNull(ordinal)
? DateTime.MinValue
? DateTimeOffset.MinValue
: dr.GetDateTime(ordinal);
return timeStamp;
}
Expand Down Expand Up @@ -439,7 +439,7 @@ private Message MapAMessage(DbDataReader dr)

var header = new MessageHeader(id, topic, messageType);

DateTime timeStamp = GetTimeStamp(dr);
DateTimeOffset timeStamp = GetTimeStamp(dr);
var correlationId = GetCorrelationId(dr);
var replyTo = GetReplyTo(dr);
var contentType = GetContentType(dr);
Expand Down
6 changes: 3 additions & 3 deletions src/Paramore.Brighter.Outbox.MySql/MySqlOutbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ private Message MapAMessage(IDataReader dr)

if (dr.FieldCount > 4)
{
DateTime timeStamp = GetTimeStamp(dr);
DateTimeOffset timeStamp = GetTimeStamp(dr);
var correlationId = GetCorrelationId(dr);
var replyTo = GetReplyTo(dr);
var contentType = GetContentType(dr);
Expand Down Expand Up @@ -461,11 +461,11 @@ private static string GetTopic(IDataReader dr)
return dr.GetString(dr.GetOrdinal("Topic"));
}

private static DateTime GetTimeStamp(IDataReader dr)
private static DateTimeOffset GetTimeStamp(IDataReader dr)
{
var ordinal = dr.GetOrdinal("Timestamp");
var timeStamp = dr.IsDBNull(ordinal)
? DateTime.MinValue
? DateTimeOffset.MinValue
: dr.GetDateTime(ordinal);
return timeStamp;
}
Expand Down
6 changes: 3 additions & 3 deletions src/Paramore.Brighter.Outbox.PostgreSql/PostgreSqlOutbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ public Message MapAMessage(DbDataReader dr)
var messageType = GetMessageType(dr);
var topic = GetTopic(dr);

DateTime timeStamp = GetTimeStamp(dr);
DateTimeOffset timeStamp = GetTimeStamp(dr);
var correlationId = GetCorrelationId(dr);
var replyTo = GetReplyTo(dr);
var contentType = GetContentType(dr);
Expand Down Expand Up @@ -446,11 +446,11 @@ private string GetReplyTo(DbDataReader dr)
return replyTo;
}

private static DateTime GetTimeStamp(DbDataReader dr)
private static DateTimeOffset GetTimeStamp(DbDataReader dr)
{
var ordinal = dr.GetOrdinal("Timestamp");
var timeStamp = dr.IsDBNull(ordinal)
? DateTime.MinValue
? DateTimeOffset.MinValue
: dr.GetDateTime(ordinal);
return timeStamp;
}
Expand Down
6 changes: 3 additions & 3 deletions src/Paramore.Brighter.Outbox.Sqlite/SqliteOutbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ private Message MapAMessage(IDataReader dr)

if (dr.FieldCount > 4)
{
DateTime timeStamp = GetTimeStamp(dr);
DateTimeOffset timeStamp = GetTimeStamp(dr);
var correlationId = GetCorrelationId(dr);
var replyTo = GetReplyTo(dr);
var contentType = GetContentType(dr);
Expand Down Expand Up @@ -464,11 +464,11 @@ private static string GetTopic(IDataReader dr)
}


private static DateTime GetTimeStamp(IDataReader dr)
private static DateTimeOffset GetTimeStamp(IDataReader dr)
{
var ordinal = dr.GetOrdinal("Timestamp");
var timeStamp = dr.IsDBNull(ordinal)
? DateTime.MinValue
? DateTimeOffset.MinValue
: dr.GetDateTime(ordinal);
return timeStamp;
}
Expand Down
29 changes: 17 additions & 12 deletions src/Paramore.Brighter/ExternalBusService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class ExternalBusService<TMessage, TTransaction> : IAmAnExternalBusServic
//bool should be made thread-safe by locking the object
private static readonly SemaphoreSlim s_checkOutstandingSemaphoreToken = new(1, 1);

private DateTimeOffset _lastOutStandingMessageCheckAt = DateTimeOffset.UtcNow;
private DateTimeOffset _lastOutStandingMessageCheckAt;

//Uses -1 to indicate no outbox and will thus force a throw on a failed publish
private int _outStandingCount;
Expand All @@ -52,6 +52,7 @@ public class ExternalBusService<TMessage, TTransaction> : IAmAnExternalBusServic
private readonly TimeSpan _maxOutStandingCheckInterval;
private readonly Dictionary<string, object> _outBoxBag;
private readonly IAmABrighterTracer _tracer;
private readonly TimeProvider _timeProvider;

/// <summary>
/// Creates an instance of External Bus Services
Expand All @@ -70,9 +71,9 @@ public class ExternalBusService<TMessage, TTransaction> : IAmAnExternalBusServic
/// <param name="maxOutStandingCheckInterval">How long before we check for maxOutStandingMessages</param>
/// <param name="outBoxBag">An outbox may require additional arguments, such as a topic list to search</param>
/// <param name="archiveBatchSize">What batch size to use when archiving from the Outbox</param>
/// <param name="timeProvider"></param>
/// <param name="instrumentationOptions">How verbose do we want our instrumentation to be</param>
public ExternalBusService(
IAmAProducerRegistry producerRegistry,
public ExternalBusService(IAmAProducerRegistry producerRegistry,
IPolicyRegistry<string> policyRegistry,
IAmAMessageMapperRegistry mapperRegistry,
IAmAMessageTransformerFactory messageTransformerFactory,
Expand All @@ -86,6 +87,7 @@ public ExternalBusService(
TimeSpan? maxOutStandingCheckInterval = null,
Dictionary<string, object> outBoxBag = null,
int archiveBatchSize = 100,
TimeProvider timeProvider = null,
InstrumentationOptions instrumentationOptions = InstrumentationOptions.All)
{
_producerRegistry = producerRegistry ??
Expand All @@ -105,6 +107,9 @@ public ExternalBusService(
if (messageTransformerFactory is null || messageTransformerFactoryAsync is null)
throw new ConfigurationException(
"A Command Processor with an external bus must have a message transformer factory");

_timeProvider = (timeProvider is null) ? TimeProvider.System : timeProvider;
_lastOutStandingMessageCheckAt = _timeProvider.GetUtcNow();

_transformPipelineBuilder = new TransformPipelineBuilder(mapperRegistry, messageTransformerFactory);
_transformPipelineBuilderAsync =
Expand Down Expand Up @@ -764,7 +769,7 @@ private void CheckOutboxOutstandingLimit()

private void CheckOutstandingMessages(RequestContext requestContext)
{
var now = DateTime.UtcNow;
var now = _timeProvider.GetUtcNow();

var timeSinceLastCheck = now - _lastOutStandingMessageCheckAt;

Expand All @@ -777,11 +782,11 @@ private void CheckOutstandingMessages(RequestContext requestContext)
{
s_logger.LogDebug($"Check not ready to run yet");
return;
}
}

s_logger.LogDebug(
"Running outstanding message check at {MessageCheckTime} after {SecondsSinceLastCheck} seconds wait",
DateTime.UtcNow, timeSinceLastCheck.TotalSeconds
now, timeSinceLastCheck.TotalSeconds
);
//This is expensive, so use a background thread
Task.Run(
Expand Down Expand Up @@ -821,7 +826,7 @@ private void ConfigureAsyncPublisherCallbackMaybe(IAmAMessageProducer producer,
if (_asyncOutbox != null)
await RetryAsync(
async ct =>
await _asyncOutbox.MarkDispatchedAsync(id, requestContext, DateTime.UtcNow,
await _asyncOutbox.MarkDispatchedAsync(id, requestContext, _timeProvider.GetUtcNow(),
cancellationToken: ct),
requestContext
);
Expand All @@ -848,7 +853,7 @@ private bool ConfigurePublisherCallbackMaybe(IAmAMessageProducer producer, Reque

if (_outBox != null)
Retry(
() => _outBox.MarkDispatched(id, requestContext, DateTime.UtcNow),
() => _outBox.MarkDispatched(id, requestContext, _timeProvider.GetUtcNow()),
requestContext);
}
};
Expand Down Expand Up @@ -895,7 +900,7 @@ private void Dispatch(IEnumerable<Message> posts, RequestContext requestContext,
);
if (sent)
Retry(
() => _outBox.MarkDispatched(message.Id, requestContext, DateTime.UtcNow, args),
() => _outBox.MarkDispatched(message.Id, requestContext, _timeProvider.GetUtcNow(), args),
requestContext
);
}
Expand Down Expand Up @@ -949,7 +954,7 @@ private async Task BulkDispatchAsync(IEnumerable<Message> posts, RequestContext
{
await RetryAsync(async _ =>
await _asyncOutbox.MarkDispatchedAsync(
successfulMessage, requestContext, DateTime.UtcNow,
successfulMessage, requestContext, _timeProvider.GetUtcNow(),
cancellationToken: cancellationToken
),
requestContext,
Expand Down Expand Up @@ -1024,7 +1029,7 @@ await producerAsync.SendAsync(message)
if (sent)
await RetryAsync(
async _ => await _asyncOutbox.MarkDispatchedAsync(
message.Id, requestContext, DateTime.UtcNow,
message.Id, requestContext, _timeProvider.GetUtcNow(),
cancellationToken: cancellationToken
),
requestContext,
Expand Down Expand Up @@ -1097,7 +1102,7 @@ private void OutstandingMessagesCheck(RequestContext requestContext)
{
s_checkOutstandingSemaphoreToken.Wait();

_lastOutStandingMessageCheckAt = DateTime.UtcNow;
_lastOutStandingMessageCheckAt = _timeProvider.GetUtcNow();
s_logger.LogDebug("Begin count of outstanding messages");
try
{
Expand Down
6 changes: 3 additions & 3 deletions src/Paramore.Brighter/InMemoryBox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public interface IHaveABoxWriteTime
public class InMemoryBox<T>(TimeProvider timeProvider) where T: IHaveABoxWriteTime
{
protected readonly ConcurrentDictionary<string, T> Requests = new ConcurrentDictionary<string, T>();
private DateTime _lastScanAt = timeProvider.GetUtcNow().DateTime;
private DateTimeOffset _lastScanAt = timeProvider.GetUtcNow();
private readonly object _cleanupRunningLockObject = new object();

/// <summary>
Expand Down Expand Up @@ -61,15 +61,15 @@ public class InMemoryBox<T>(TimeProvider timeProvider) where T: IHaveABoxWriteTi

protected void ClearExpiredMessages()
{
var now = timeProvider.GetUtcNow().DateTime;
var now = timeProvider.GetUtcNow();

TimeSpan elapsedSinceLastScan = now - _lastScanAt;
if (elapsedSinceLastScan < ExpirationScanInterval)
return;

//This is expensive, so use a background thread
Task.Factory.StartNew(
action: state => RemoveExpiredMessages((DateTime)state),
action: state => RemoveExpiredMessages((DateTimeOffset)state),
state: now,
cancellationToken: CancellationToken.None,
creationOptions: TaskCreationOptions.DenyChildAttach,
Expand Down
4 changes: 2 additions & 2 deletions src/Paramore.Brighter/InMemoryOutbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ public IEnumerable<Message> DispatchedMessages(
var age =
_timeProvider.GetUtcNow() - dispatchedSince;
return Requests.Values
.Where(oe => (oe.TimeFlushed != DateTime.MinValue) && (oe.TimeFlushed >= age))
.Where(oe => (oe.TimeFlushed != DateTimeOffset.MinValue) && (oe.TimeFlushed <= age))
.Take(pageSize)
.Select(oe => oe.Message).ToArray();
}
Expand Down Expand Up @@ -474,7 +474,7 @@ public IEnumerable<Message> OutstandingMessages(
var now = _timeProvider.GetUtcNow();
var sentBefore = now - dispatchedSince;
var outstandingMessages = Requests.Values
.Where(oe => oe.TimeFlushed == DateTime.MinValue && oe.WriteTime <= sentBefore.DateTime)
.Where(oe => oe.TimeFlushed == DateTimeOffset.MinValue && oe.WriteTime <= sentBefore.DateTime)
.Take(pageSize)
.Select(oe => oe.Message).ToArray();
return outstandingMessages;
Expand Down
2 changes: 1 addition & 1 deletion src/Paramore.Brighter/MessageHeader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ public MessageHeader(
MessageType messageType,
Uri source = null,
string type = "goparamore.io.Paramore.Brighter.Message",
DateTime? timeStamp = null,
DateTimeOffset? timeStamp = null,
string correlationId = null,
string replyTo = null,
string contentType = "text/plain",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Transactions;
using FluentAssertions;
using Microsoft.Extensions.Time.Testing;
Expand All @@ -16,13 +17,14 @@ public class ServiceBusMessageStoreArchiverTests
private readonly InMemoryOutbox _outbox;
private readonly InMemoryArchiveProvider _archiveProvider;
private readonly ExternalBusService<Message,CommittableTransaction> _bus;
private readonly FakeTimeProvider _timeProvider;

public ServiceBusMessageStoreArchiverTests()
{
const string topic = "MyTopic";

var timeProvider = new FakeTimeProvider();
var producer = new InMemoryProducer(new InternalBus(), timeProvider){Publication = {Topic = new RoutingKey(topic), RequestType = typeof(MyCommand)}};
_timeProvider = new FakeTimeProvider();
var producer = new InMemoryProducer(new InternalBus(), _timeProvider){Publication = {Topic = new RoutingKey(topic), RequestType = typeof(MyCommand)}};

var messageMapperRegistry = new MessageMapperRegistry(
new SimpleMessageMapperFactory((_) => new MyCommandMessageMapper()),
Expand All @@ -48,7 +50,7 @@ public ServiceBusMessageStoreArchiverTests()
};

var tracer = new BrighterTracer();
_outbox = new InMemoryOutbox(timeProvider){Tracer = tracer};
_outbox = new InMemoryOutbox(_timeProvider){Tracer = tracer};
_archiveProvider = new InMemoryArchiveProvider();

_bus = new ExternalBusService<Message, CommittableTransaction>(
Expand Down Expand Up @@ -84,7 +86,9 @@ public void When_Archiving_All_Messages_From_The_Outbox()
//act
_outbox.EntryCount.Should().Be(3);

_bus.Archive(TimeSpan.FromMilliseconds(20000), context);
_timeProvider.Advance(TimeSpan.FromMinutes(15));

_bus.Archive(TimeSpan.FromMilliseconds(500), context);

//assert
_outbox.EntryCount.Should().Be(0);
Expand Down Expand Up @@ -112,7 +116,9 @@ public void When_Archiving_Some_Messages_From_The_Outbox()
//act
_outbox.EntryCount.Should().Be(3);

_bus.Archive(TimeSpan.FromMilliseconds(20000), context);
_timeProvider.Advance(TimeSpan.FromSeconds(30));

_bus.Archive(TimeSpan.FromSeconds(30), context);

//assert
_outbox.EntryCount.Should().Be(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class ServiceBusMessageStoreArchiverTestsAsync
private readonly InMemoryOutbox _outbox;
private readonly InMemoryArchiveProvider _archiveProvider;
private readonly ExternalBusService<Message,CommittableTransaction> _bus;
private readonly FakeTimeProvider _timeProvider;

public ServiceBusMessageStoreArchiverTestsAsync()
{
Expand Down Expand Up @@ -52,9 +53,9 @@ public ServiceBusMessageStoreArchiverTestsAsync()
{ CommandProcessor.CIRCUITBREAKERASYNC, circuitBreakerPolicy }
};

var timeProvider = new FakeTimeProvider();
_timeProvider = new FakeTimeProvider();
var tracer = new BrighterTracer();
_outbox = new InMemoryOutbox(timeProvider){Tracer = tracer};
_outbox = new InMemoryOutbox(_timeProvider){Tracer = tracer};
_archiveProvider = new InMemoryArchiveProvider();

_bus = new ExternalBusService<Message, CommittableTransaction>(
Expand Down Expand Up @@ -90,7 +91,9 @@ public async Task When_Archiving_Old_Messages_From_The_Outbox()
//act
_outbox.EntryCount.Should().Be(3);

await _bus.ArchiveAsync(TimeSpan.FromMilliseconds(20000), context, new CancellationToken());
_timeProvider.Advance(TimeSpan.FromSeconds(30));

await _bus.ArchiveAsync(TimeSpan.FromSeconds(15), context, new CancellationToken());

//assert
_outbox.EntryCount.Should().Be(0);
Expand Down Expand Up @@ -118,7 +121,9 @@ public async Task When_Archiving_Some_Messages_From_The_Outbox()
//act
_outbox.EntryCount.Should().Be(3);

await _bus.ArchiveAsync(TimeSpan.FromMilliseconds(20000), context, new CancellationToken());
_timeProvider.Advance(TimeSpan.FromSeconds(30));

await _bus.ArchiveAsync(TimeSpan.FromSeconds(15), context, new CancellationToken());

//assert
_outbox.EntryCount.Should().Be(1);
Expand Down
Loading

0 comments on commit edcde57

Please sign in to comment.