Skip to content

Commit

Permalink
Fix task cancellation exception on shut down (#94)
Browse files Browse the repository at this point in the history
  • Loading branch information
jamarino authored Jan 30, 2020
1 parent 85dd065 commit 6fe2dfd
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ public ConsulConfigurationProvider(
public void Dispose()
{
_cancellationTokenSource.Cancel();
_pollTask?.Wait(500);
_cancellationTokenSource.Dispose();
}

Expand All @@ -57,20 +56,22 @@ public override void Load()
return;
}

DoLoad().GetAwaiter().GetResult();
var cancellationToken = _cancellationTokenSource.Token;

DoLoad(cancellationToken).GetAwaiter().GetResult();

// Polling starts after the initial load to ensure no concurrent access to the key from this instance
if (_source.ReloadOnChange)
{
_pollTask = Task.Run(PollingLoop);
_pollTask = Task.Run(() => PollingLoop(cancellationToken), cancellationToken);
}
}

private async Task DoLoad()
private async Task DoLoad(CancellationToken cancellationToken)
{
try
{
var result = await GetKvPairs(false).ConfigureAwait(false);
var result = await GetKvPairs(false, cancellationToken).ConfigureAwait(false);

if (result.HasValue())
{
Expand All @@ -94,7 +95,7 @@ private async Task DoLoad()
}
}

private async Task<QueryResult<KVPair[]>> GetKvPairs(bool waitForChange)
private async Task<QueryResult<KVPair[]>> GetKvPairs(bool waitForChange, CancellationToken cancellationToken)
{
using var consulClient = _consulClientFactory.Create();
var queryOptions = new QueryOptions
Expand All @@ -106,27 +107,25 @@ private async Task<QueryResult<KVPair[]>> GetKvPairs(bool waitForChange)
var result =
await consulClient
.KV
.List(_source.Key, queryOptions, _cancellationTokenSource.Token)
.List(_source.Key, queryOptions, cancellationToken)
.ConfigureAwait(false);

return result.StatusCode switch
{
HttpStatusCode.OK => result,
HttpStatusCode.NotFound => result,
_ =>
throw
new Exception($"Error loading configuration from consul. Status code: {result.StatusCode}.")
};
_ => throw new Exception($"Error loading configuration from consul. Status code: {result.StatusCode}.")
};
}

private async Task PollingLoop()
private async Task PollingLoop(CancellationToken cancellationToken)
{
var consecutiveFailureCount = 0;
while (!_cancellationTokenSource.Token.IsCancellationRequested)
while (!cancellationToken.IsCancellationRequested)
{
try
{
var result = await GetKvPairs(true).ConfigureAwait(false);
var result = await GetKvPairs(true, cancellationToken).ConfigureAwait(false);

if (result.HasValue() && result.LastIndex > _lastIndex)
{
Expand All @@ -143,7 +142,7 @@ private async Task PollingLoop()
_source.OnWatchException?.Invoke(
new ConsulWatchExceptionContext(exception, ++consecutiveFailureCount, _source)) ??
TimeSpan.FromSeconds(5);
await Task.Delay(wait, _cancellationTokenSource.Token);
await Task.Delay(wait, cancellationToken);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ public sealed class Dispose : ConsulConfigurationProviderTests
private async Task ShouldCancelPollingTaskWhenReloading()
{
var expectedKvCalls = 0;
var pollingCancelled = new TaskCompletionSource<CancellationToken>();
CancellationToken cancellationToken = default;
var pollingCancelled = new TaskCompletionSource<bool>();

_source.ReloadOnChange = true;
_source.Optional = true;
_kvEndpoint
Expand All @@ -87,18 +87,21 @@ private async Task ShouldCancelPollingTaskWhenReloading()
}

expectedKvCalls++;
if (cancellationToken == default)
if (token.CanBeCanceled)
{
cancellationToken = token;
token.Register(() => pollingCancelled.TrySetResult(true));
}
})
.Returns(
pollingCancelled.Task.IsCompleted
? Task.FromCanceled<QueryResult<KVPair[]>>(pollingCancelled.Task.Result)
: Task.FromResult(new QueryResult<KVPair[]> { StatusCode = HttpStatusCode.OK }));
.Returns<string, QueryOptions, CancellationToken>(
(_, __, token) =>
token.IsCancellationRequested
? Task.FromCanceled<QueryResult<KVPair[]>>(token)
: Task.Delay(5).ContinueWith(t => new QueryResult<KVPair[]> { StatusCode = HttpStatusCode.OK }));

_provider.Load();
cancellationToken.Register(() => pollingCancelled.SetResult(cancellationToken));

// allow polling loop to spin up
await Task.Delay(25);

_provider.Dispose();

Expand Down

0 comments on commit 6fe2dfd

Please sign in to comment.