Skip to content

Commit

Permalink
Add TimeToLive to PriorityQueue (#2750)
Browse files Browse the repository at this point in the history
Adding TimeToLive to MessageStore and adding integ test.

With the addition of priority queues, we also now have the TTL for each route, instead of for each MessageStore. 

This PR changes the MessageStore to be aware of the TTL for each message and use the TTL for each message as the source of truth for TTL. If a message has no TTL specified, it gets a default value of '0.' Messages with default value of 0 take from the global TTL of the MessageStore.


Note: Because message stores are per endpoint per priority, there could be multiple routes that use the same MessageStore. When CleanupProcessor runs, it assumes that the head of a MessageStore (you can think of a MessageStore as a queue) is the oldest message and that all the messages have the same TTL. Therefore, it assumes that it doesn't need to check past the first unexpired message it finds. Now, it is possible to have messages in the same queue with different TTL's. This PR just allows the store to pile up with expired messages, so long as the head is unexpired. In a future PR, we will allow another option - to go through the entire list each time to find expired messages. It's a tradeoff between I/O and DB space, so we can make it configurable.
  • Loading branch information
dylanbronson authored Apr 1, 2020
1 parent 2554c01 commit 2662d9c
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public async Task RemoveEndpoint(string endpointId)
}
}

public async Task<IMessage> Add(string endpointId, IMessage message)
public async Task<IMessage> Add(string endpointId, IMessage message, uint timeToLiveSecs)
{
Preconditions.CheckNotNull(message, nameof(message));
if (!this.endpointSequentialStores.TryGetValue(Preconditions.CheckNonWhiteSpace(endpointId, nameof(endpointId)), out ISequentialStore<MessageRef> sequentialStore))
Expand All @@ -87,6 +87,8 @@ public async Task<IMessage> Add(string endpointId, IMessage message)
throw new InvalidOperationException("Message does not contain required system property EdgeMessageId");
}

TimeSpan timeToLive = timeToLiveSecs == 0 ? this.timeToLive : TimeSpan.FromSeconds(timeToLiveSecs);

// First put the message in the entity store and then put it in the sequentialStore. This is because the pump can go fast enough that it
// reads the message from the sequential store and tries to find the message in the entity store before the message has been added to the
// entity store.
Expand All @@ -110,7 +112,7 @@ await this.messageEntityStore.PutOrUpdate(
{
using (MetricsV0.SequentialStoreLatency(endpointId))
{
long offset = await sequentialStore.Append(new MessageRef(edgeMessageId));
long offset = await sequentialStore.Append(new MessageRef(edgeMessageId, timeToLive));
Events.MessageAdded(offset, edgeMessageId, endpointId, this.messageCount);
return new MessageWithOffset(message, offset);
}
Expand Down Expand Up @@ -256,7 +258,7 @@ async Task CleanupMessages()
async Task<bool> DeleteMessageCallback(long offset, MessageRef messageRef)
{
if (checkpointData.Offset < offset &&
DateTime.UtcNow - messageRef.TimeStamp < this.messageStore.timeToLive)
DateTime.UtcNow - messageRef.TimeStamp < messageRef.TimeToLive)
{
return false;
}
Expand Down Expand Up @@ -290,6 +292,12 @@ await this.messageStore.messageEntityStore.Update(
return true;
}

// With the addition of PriorityQueues, the CleanupProcessor assumptions change slightly:
// Previously, we could always assume that if a message at the head of the queue should not be deleted,
// then none of the other messages in the queue should be either. Now, because we can have different TTL's
// for messages within the same queue, there can be messages that have expired in the queue after the head.
// This is okay because they will be cleaned up eventually and they will be ignored otherwise.
// TODO: Add optional CleanupProcessor mode that will go through the entire length of the queue each time to remove expired messages.
int cleanupCount = 0;
while (await sequentialStore.RemoveFirst(DeleteMessageCallback))
{
Expand Down Expand Up @@ -463,6 +471,11 @@ public async Task<IEnumerable<IMessage>> GetNext(int batchSize)
{
foreach ((long offset, MessageRef msgRef) item in batch)
{
if (DateTime.UtcNow - item.msgRef.TimeStamp >= item.msgRef.TimeToLive)
{
continue;
}

Option<MessageWrapper> messageWrapper = await this.entityStore.Get(item.msgRef.EdgeMessageId);
if (!messageWrapper.HasValue)
{
Expand Down Expand Up @@ -495,22 +508,25 @@ public async Task<IEnumerable<IMessage>> GetNext(int batchSize)
/// </summary>
class MessageRef
{
public MessageRef(string edgeMessageId)
: this(edgeMessageId, DateTime.UtcNow)
public MessageRef(string edgeMessageId, TimeSpan timeToLive)
: this(edgeMessageId, DateTime.UtcNow, timeToLive)
{
}

[JsonConstructor]
public MessageRef(string edgeMessageId, DateTime timeStamp)
public MessageRef(string edgeMessageId, DateTime timeStamp, TimeSpan timeToLive)
{
Preconditions.CheckArgument(timeStamp != default(DateTime));
this.EdgeMessageId = Preconditions.CheckNonWhiteSpace(edgeMessageId, nameof(edgeMessageId));
this.TimeStamp = timeStamp;
this.TimeToLive = timeToLive;
}

public string EdgeMessageId { get; }

public DateTime TimeStamp { get; }

public TimeSpan TimeToLive { get; }
}

// Wrapper to allow adding offset to an existing IMessage object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public interface IMessageStore : IDisposable
/// Creates an entry in the message queue for the given endpoint
/// and returns the offset of that entry.
/// </summary>
Task<IMessage> Add(string endpointId, IMessage message);
Task<IMessage> Add(string endpointId, IMessage message, uint timeToLiveSecs);

/// <summary>
/// Returns an iterator that allows reading messages starting from the given offset.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public async Task Invoke(IMessage message, uint priority, uint timeToLiveSecs)
SortedDictionary<uint, EndpointExecutorFsm> snapshot = this.prioritiesToFsms;
ICheckpointer checkpointer = snapshot[priority].Checkpointer;

IMessage storedMessage = await this.messageStore.Add(GetMessageQueueId(this.Endpoint.Id, priority), message);
IMessage storedMessage = await this.messageStore.Add(GetMessageQueueId(this.Endpoint.Id, priority), message, timeToLiveSecs);
checkpointer.Propose(storedMessage);
Events.AddMessageSuccess(this, storedMessage.Offset, priority, timeToLiveSecs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ public async Task BasicTest(long initialCheckpointOffset)
if (i % 2 == 0)
{
IMessage input = this.GetMessage(i);
IMessage updatedMessage = await messageStore.Add("module1", input);
IMessage updatedMessage = await messageStore.Add("module1", input, 0);
CompareUpdatedMessageWithOffset(input, initialCheckpointOffset + 1 + i / 2, updatedMessage);
}
else
{
IMessage input = this.GetMessage(i);
IMessage updatedMessage = await messageStore.Add("module2", input);
IMessage updatedMessage = await messageStore.Add("module2", input, 0);
CompareUpdatedMessageWithOffset(input, initialCheckpointOffset + 1 + i / 2, updatedMessage);
}
}
Expand Down Expand Up @@ -83,13 +83,13 @@ public async Task CleanupTestTimeout()
if (i % 2 == 0)
{
IMessage input = this.GetMessage(i);
IMessage updatedMessage = await messageStore.Add("module1", input);
IMessage updatedMessage = await messageStore.Add("module1", input, 0);
CompareUpdatedMessageWithOffset(input, i / 2, updatedMessage);
}
else
{
IMessage input = this.GetMessage(i);
IMessage updatedMessage = await messageStore.Add("module2", input);
IMessage updatedMessage = await messageStore.Add("module2", input, 0);
CompareUpdatedMessageWithOffset(input, i / 2, updatedMessage);
}
}
Expand Down Expand Up @@ -125,13 +125,13 @@ public async Task CleanupTestTimeoutWithRead()
if (i % 2 == 0)
{
IMessage input = this.GetMessage(i);
IMessage updatedMessage = await messageStore.Add("module1", input);
IMessage updatedMessage = await messageStore.Add("module1", input, 0);
CompareUpdatedMessageWithOffset(input, i / 2, updatedMessage);
}
else
{
IMessage input = this.GetMessage(i);
IMessage updatedMessage = await messageStore.Add("module2", input);
IMessage updatedMessage = await messageStore.Add("module2", input, 0);
CompareUpdatedMessageWithOffset(input, i / 2, updatedMessage);
}
}
Expand All @@ -151,13 +151,13 @@ public async Task CleanupTestTimeoutWithRead()
if (i % 2 == 0)
{
IMessage input = this.GetMessage(i);
IMessage updatedMessage = await messageStore.Add("module1", input);
IMessage updatedMessage = await messageStore.Add("module1", input, 0);
CompareUpdatedMessageWithOffset(input, i / 2, updatedMessage);
}
else
{
IMessage input = this.GetMessage(i);
IMessage updatedMessage = await messageStore.Add("module2", input);
IMessage updatedMessage = await messageStore.Add("module2", input, 0);
CompareUpdatedMessageWithOffset(input, i / 2, updatedMessage);
}
}
Expand All @@ -184,13 +184,13 @@ public async Task CleanupTestCheckpointed()
if (i % 2 == 0)
{
IMessage input = this.GetMessage(i);
IMessage updatedMessage = await messageStore.Add("module1", input);
IMessage updatedMessage = await messageStore.Add("module1", input, 0);
CompareUpdatedMessageWithOffset(input, i / 2, updatedMessage);
}
else
{
IMessage input = this.GetMessage(i);
IMessage updatedMessage = await messageStore.Add("module2", input);
IMessage updatedMessage = await messageStore.Add("module2", input, 0);
CompareUpdatedMessageWithOffset(input, i / 2, updatedMessage);
}
}
Expand Down Expand Up @@ -218,7 +218,7 @@ public async Task CleanupTestCheckpointed()
}

[Fact]
public async Task CleanupTestTimeoutUpdateTimeToLive()
public async Task CleanupTestTimeoutUpdateGlobalTimeToLive()
{
(IMessageStore messageStore, ICheckpointStore checkpointStore) result = await this.GetMessageStore(20);
result.messageStore.SetTimeToLive(TimeSpan.FromSeconds(20));
Expand All @@ -229,13 +229,13 @@ public async Task CleanupTestTimeoutUpdateTimeToLive()
if (i % 2 == 0)
{
IMessage input = this.GetMessage(i);
IMessage updatedMessage = await messageStore.Add("module1", input);
IMessage updatedMessage = await messageStore.Add("module1", input, 0);
CompareUpdatedMessageWithOffset(input, i / 2, updatedMessage);
}
else
{
IMessage input = this.GetMessage(i);
IMessage updatedMessage = await messageStore.Add("module2", input);
IMessage updatedMessage = await messageStore.Add("module2", input, 0);
CompareUpdatedMessageWithOffset(input, i / 2, updatedMessage);
}
}
Expand Down Expand Up @@ -266,13 +266,13 @@ public async Task CleanupTestTimeoutUpdateTimeToLive()
if (i % 2 == 0)
{
IMessage input = this.GetMessage(i);
IMessage updatedMessage = await messageStore.Add("module1", input);
IMessage updatedMessage = await messageStore.Add("module1", input, 0);
CompareUpdatedMessageWithOffset(input, 100 + i / 2, updatedMessage);
}
else
{
IMessage input = this.GetMessage(i);
IMessage updatedMessage = await messageStore.Add("module2", input);
IMessage updatedMessage = await messageStore.Add("module2", input, 0);
CompareUpdatedMessageWithOffset(input, 100 + i / 2, updatedMessage);
}
}
Expand All @@ -297,6 +297,88 @@ public async Task CleanupTestTimeoutUpdateTimeToLive()
}
}

[Fact]
public async Task CleanupTestTimeoutUpdateIndividualMessageTimeToLive()
{
(IMessageStore messageStore, ICheckpointStore checkpointStore) result = await this.GetMessageStore(20);
result.messageStore.SetTimeToLive(TimeSpan.FromSeconds(20));
using (IMessageStore messageStore = result.messageStore)
{
for (int i = 0; i < 200; i++)
{
if (i % 2 == 0)
{
IMessage input = this.GetMessage(i);
IMessage updatedMessage = await messageStore.Add("module1", input, 20);
CompareUpdatedMessageWithOffset(input, i / 2, updatedMessage);
}
else
{
IMessage input = this.GetMessage(i);
IMessage updatedMessage = await messageStore.Add("module2", input, 20);
CompareUpdatedMessageWithOffset(input, i / 2, updatedMessage);
}
}

IMessageIterator module1Iterator = messageStore.GetMessageIterator("module1");
IEnumerable<IMessage> batch = await module1Iterator.GetNext(100);
Assert.Equal(100, batch.Count());

IMessageIterator module2Iterator = messageStore.GetMessageIterator("module2");
batch = await module2Iterator.GetNext(100);
Assert.Equal(100, batch.Count());

await Task.Delay(TimeSpan.FromSeconds(100));

module1Iterator = messageStore.GetMessageIterator("module1");
batch = await module1Iterator.GetNext(100);
Assert.Empty(batch);

module2Iterator = messageStore.GetMessageIterator("module2");
batch = await module2Iterator.GetNext(100);
Assert.Empty(batch);

// By setting the global TTL for the MessageStore to 20, the CleanupProcessor will run every 10 seconds
// But it won't clean up any messages, since the individual messages are set to have TTL of 2000 seconds
result.messageStore.SetTimeToLive(TimeSpan.FromSeconds(20));
await Task.Delay(TimeSpan.FromSeconds(50));

for (int i = 0; i < 200; i++)
{
if (i % 2 == 0)
{
IMessage input = this.GetMessage(i);
IMessage updatedMessage = await messageStore.Add("module1", input, 2000);
CompareUpdatedMessageWithOffset(input, 100 + i / 2, updatedMessage);
}
else
{
IMessage input = this.GetMessage(i);
IMessage updatedMessage = await messageStore.Add("module2", input, 50);
CompareUpdatedMessageWithOffset(input, 100 + i / 2, updatedMessage);
}
}

module1Iterator = messageStore.GetMessageIterator("module1");
batch = await module1Iterator.GetNext(100);
Assert.Equal(100, batch.Count());

module2Iterator = messageStore.GetMessageIterator("module2");
batch = await module2Iterator.GetNext(100);
Assert.Equal(100, batch.Count());

await Task.Delay(TimeSpan.FromSeconds(100));

module1Iterator = messageStore.GetMessageIterator("module1", 100);
batch = await module1Iterator.GetNext(100);
Assert.Equal(100, batch.Count());

module2Iterator = messageStore.GetMessageIterator("module2", 100);
batch = await module2Iterator.GetNext(100);
Assert.Empty(batch);
}
}

[Fact]
public async Task MessageStoreAddRemoveEndpointTest()
{
Expand All @@ -311,7 +393,7 @@ public async Task MessageStoreAddRemoveEndpointTest()

for (int i = 0; i < 10; i++)
{
await messageStore.Add("module1", this.GetMessage(i));
await messageStore.Add("module1", this.GetMessage(i), 0);
}

// Assert
Expand All @@ -331,15 +413,15 @@ public async Task MessageStoreAddRemoveEndpointTest()
await messageStore.RemoveEndpoint("module1");

// Assert
await Assert.ThrowsAsync<InvalidOperationException>(() => messageStore.Add("module1", this.GetMessage(0)));
await Assert.ThrowsAsync<InvalidOperationException>(() => messageStore.Add("module1", this.GetMessage(0), 0));
Assert.Throws<InvalidOperationException>(() => messageStore.GetMessageIterator("module1"));

// Act
await messageStore.AddEndpoint("module1");

for (int i = 20; i < 30; i++)
{
await messageStore.Add("module1", this.GetMessage(i));
await messageStore.Add("module1", this.GetMessage(i), 0);
}

// Assert
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ public void Dispose()
{
}

public async Task<IMessage> Add(string endpointId, IMessage message)
public async Task<IMessage> Add(string endpointId, IMessage message, uint _)
{
TestMessageQueue queue = this.GetQueue(endpointId);
long offset = await queue.Add(message);
Expand Down

0 comments on commit 2662d9c

Please sign in to comment.