Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for CancellationToken during refresh operations #281

Merged
merged 6 commits into from
Oct 19, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public async Task InvokeAsync(HttpContext context)
{
foreach (var refresher in Refreshers)
{
_ = refresher.TryRefreshAsync();
_ = refresher.TryRefreshAsync(context.RequestAborted);
avanigupta marked this conversation as resolved.
Show resolved Hide resolved
}

// Call the next delegate/middleware in the pipeline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public override void Load()
{
// Load() is invoked only once during application startup. We don't need to check for concurrent network
// operations here because there can't be any other startup or refresh operation in progress at this time.
LoadAll(_optional).ConfigureAwait(false).GetAwaiter().GetResult();
LoadAll(_optional, CancellationToken.None).ConfigureAwait(false).GetAwaiter().GetResult();
}
catch (ArgumentException)
{
Expand Down Expand Up @@ -161,7 +161,7 @@ public override void Load()
_isInitialLoadComplete = true;
}

public async Task RefreshAsync()
public async Task RefreshAsync(CancellationToken cancellationToken)
{
// Ensure that concurrent threads do not simultaneously execute refresh operation.
if (Interlocked.Exchange(ref _networkOperationsInProgress, 1) == 0)
Expand All @@ -174,15 +174,15 @@ public async Task RefreshAsync()
if (InitializationCacheExpires < DateTimeOffset.UtcNow)
{
InitializationCacheExpires = DateTimeOffset.UtcNow.Add(MinCacheExpirationInterval);
await LoadAll(ignoreFailures: false).ConfigureAwait(false);
await LoadAll(ignoreFailures: false, cancellationToken).ConfigureAwait(false);
}

return;
}

await RefreshIndividualKeyValues().ConfigureAwait(false);
await RefreshKeyValueCollections().ConfigureAwait(false);
await RefreshKeyValueAdapters().ConfigureAwait(false);
await RefreshIndividualKeyValues(cancellationToken).ConfigureAwait(false);
await RefreshKeyValueCollections(cancellationToken).ConfigureAwait(false);
await RefreshKeyValueAdapters(cancellationToken).ConfigureAwait(false);
}
finally
{
Expand All @@ -191,11 +191,11 @@ public async Task RefreshAsync()
}
}

public async Task<bool> TryRefreshAsync()
public async Task<bool> TryRefreshAsync(CancellationToken cancellationToken)
{
try
{
await RefreshAsync().ConfigureAwait(false);
await RefreshAsync(cancellationToken).ConfigureAwait(false);
}
catch (RequestFailedException e)
{
Expand Down Expand Up @@ -252,7 +252,7 @@ public void SetDirty(TimeSpan? maxDelay)
}
}

private async Task LoadAll(bool ignoreFailures)
private async Task LoadAll(bool ignoreFailures, CancellationToken cancellationToken)
{
IDictionary<string, ConfigurationSetting> data = null;
string cachedData = null;
Expand All @@ -275,7 +275,7 @@ private async Task LoadAll(bool ignoreFailures)

await CallWithRequestTracing(async () =>
{
await foreach (ConfigurationSetting setting in _client.GetConfigurationSettingsAsync(selector, CancellationToken.None).ConfigureAwait(false))
await foreach (ConfigurationSetting setting in _client.GetConfigurationSettingsAsync(selector, cancellationToken).ConfigureAwait(false))
{
serverData[setting.Key] = setting;
}
Expand Down Expand Up @@ -303,15 +303,15 @@ await CallWithRequestTracing(async () =>

await CallWithRequestTracing(async () =>
avanigupta marked this conversation as resolved.
Show resolved Hide resolved
{
await foreach (ConfigurationSetting setting in _client.GetConfigurationSettingsAsync(selector, CancellationToken.None).ConfigureAwait(false))
await foreach (ConfigurationSetting setting in _client.GetConfigurationSettingsAsync(selector, cancellationToken).ConfigureAwait(false))
{
serverData[setting.Key] = setting;
}
}).ConfigureAwait(false);
}

// Block current thread for the initial load of key-values registered for refresh that are not already loaded
await Task.Run(() => LoadKeyValuesRegisteredForRefresh(serverData).ConfigureAwait(false).GetAwaiter().GetResult()).ConfigureAwait(false);
await Task.Run(() => LoadKeyValuesRegisteredForRefresh(serverData, cancellationToken).ConfigureAwait(false).GetAwaiter().GetResult()).ConfigureAwait(false);
data = serverData;
}
catch (Exception exception) when (exception is RequestFailedException ||
Expand Down Expand Up @@ -344,7 +344,7 @@ await CallWithRequestTracing(async () =>
adapter.InvalidateCache();
}

await SetData(data, ignoreFailures).ConfigureAwait(false);
await SetData(data, ignoreFailures, cancellationToken).ConfigureAwait(false);

// Set the cache expiration time for all refresh registered settings
var initialLoadTime = DateTimeOffset.UtcNow;
Expand All @@ -366,7 +366,7 @@ await CallWithRequestTracing(async () =>
}
}

private async Task LoadKeyValuesRegisteredForRefresh(IDictionary<string, ConfigurationSetting> data)
private async Task LoadKeyValuesRegisteredForRefresh(IDictionary<string, ConfigurationSetting> data, CancellationToken cancellationToken)
{
_watchedSettings.Clear();

Expand All @@ -388,7 +388,7 @@ private async Task LoadKeyValuesRegisteredForRefresh(IDictionary<string, Configu
ConfigurationSetting watchedKv = null;
try
{
await CallWithRequestTracing(async () => watchedKv = await _client.GetConfigurationSettingAsync(watchedKey, watchedLabel, CancellationToken.None)).ConfigureAwait(false);
await CallWithRequestTracing(async () => watchedKv = await _client.GetConfigurationSettingAsync(watchedKey, watchedLabel, cancellationToken)).ConfigureAwait(false);
}
catch (RequestFailedException e) when (e.Status == (int)HttpStatusCode.NotFound)
{
Expand All @@ -404,7 +404,7 @@ private async Task LoadKeyValuesRegisteredForRefresh(IDictionary<string, Configu
}
}

private async Task RefreshIndividualKeyValues()
private async Task RefreshIndividualKeyValues(CancellationToken cancellationToken)
{
bool shouldRefreshAll = false;

Expand All @@ -426,7 +426,7 @@ private async Task RefreshIndividualKeyValues()
{
KeyValueChange keyValueChange = default;
await TracingUtils.CallWithRequestTracing(_requestTracingEnabled, RequestType.Watch, _requestTracingOptions,
async () => keyValueChange = await _client.GetKeyValueChange(watchedKv, CancellationToken.None).ConfigureAwait(false)).ConfigureAwait(false);
async () => keyValueChange = await _client.GetKeyValueChange(watchedKv, cancellationToken).ConfigureAwait(false)).ConfigureAwait(false);

changeWatcher.CacheExpires = DateTimeOffset.UtcNow.Add(changeWatcher.CacheExpirationInterval);

Expand Down Expand Up @@ -459,7 +459,7 @@ await TracingUtils.CallWithRequestTracing(_requestTracingEnabled, RequestType.Wa

try
{
await CallWithRequestTracing(async () => watchedKv = await _client.GetConfigurationSettingAsync(watchedKey, watchedLabel, CancellationToken.None).ConfigureAwait(false)).ConfigureAwait(false);
await CallWithRequestTracing(async () => watchedKv = await _client.GetConfigurationSettingAsync(watchedKey, watchedLabel, cancellationToken).ConfigureAwait(false)).ConfigureAwait(false);
}
catch (RequestFailedException e) when (e.Status == (int)HttpStatusCode.NotFound)
{
Expand All @@ -479,40 +479,40 @@ await TracingUtils.CallWithRequestTracing(_requestTracingEnabled, RequestType.Wa

hasChanged = true;

// Add the key-value if it is not loaded, or update it if it was loaded with a different label
_applicationSettings[watchedKey] = watchedKv;
_watchedSettings[watchedKeyLabel] = watchedKv;
// Add the key-value if it is not loaded, or update it if it was loaded with a different label
_applicationSettings[watchedKey] = watchedKv;
_watchedSettings[watchedKeyLabel] = watchedKv;

// Invalidate the cached Key Vault secret (if any) for this ConfigurationSetting
foreach (IKeyValueAdapter adapter in _options.Adapters)
{
adapter.InvalidateCache(watchedKv);
}
// Invalidate the cached Key Vault secret (if any) for this ConfigurationSetting
foreach (IKeyValueAdapter adapter in _options.Adapters)
{
adapter.InvalidateCache(watchedKv);
}
}
}

if (hasChanged)
{
await SetData(_applicationSettings).ConfigureAwait(false);
await SetData(_applicationSettings, false, cancellationToken).ConfigureAwait(false);
}
}

// Trigger a single refresh-all operation if a change was detected in one or more key-values with refreshAll: true
if (shouldRefreshAll)
{
await LoadAll(ignoreFailures: false).ConfigureAwait(false);
await LoadAll(ignoreFailures: false, cancellationToken).ConfigureAwait(false);
}
}

private async Task RefreshKeyValueAdapters()
private async Task RefreshKeyValueAdapters(CancellationToken cancellationToken)
{
if (_options.Adapters.Any(adapter => adapter.NeedsRefresh()))
{
SetData(_applicationSettings);
SetData(_applicationSettings, false, cancellationToken);
}
}

private async Task RefreshKeyValueCollections()
private async Task RefreshKeyValueCollections(CancellationToken cancellationToken)
{
foreach (KeyValueWatcher changeWatcher in _options.MultiKeyWatchers)
{
Expand Down Expand Up @@ -541,26 +541,29 @@ private async Task RefreshKeyValueCollections()
});
}

IEnumerable<KeyValueChange> keyValueChanges = await _client.GetKeyValueChangeCollection(currentKeyValues, new GetKeyValueChangeCollectionOptions
{
KeyFilter = changeWatcher.Key,
Label = changeWatcher.Label.NormalizeNull(),
RequestTracingEnabled = _requestTracingEnabled,
RequestTracingOptions = _requestTracingOptions
}).ConfigureAwait(false);
IEnumerable<KeyValueChange> keyValueChanges = await _client.GetKeyValueChangeCollection(
currentKeyValues,
new GetKeyValueChangeCollectionOptions
{
KeyFilter = changeWatcher.Key,
Label = changeWatcher.Label.NormalizeNull(),
RequestTracingEnabled = _requestTracingEnabled,
RequestTracingOptions = _requestTracingOptions
},
cancellationToken).ConfigureAwait(false);

changeWatcher.CacheExpires = DateTimeOffset.UtcNow.Add(changeWatcher.CacheExpirationInterval);

if (keyValueChanges?.Any() == true)
{
ProcessChanges(keyValueChanges);

await SetData(_applicationSettings).ConfigureAwait(false);
await SetData(_applicationSettings, false, cancellationToken).ConfigureAwait(false);
}
}
}

private async Task SetData(IDictionary<string, ConfigurationSetting> data, bool ignoreFailures = false, CancellationToken cancellationToken = default)
private async Task SetData(IDictionary<string, ConfigurationSetting> data, bool ignoreFailures, CancellationToken cancellationToken)
{
// Update cache of settings
this._applicationSettings = data as Dictionary<string, ConfigurationSetting> ??
Expand Down Expand Up @@ -657,7 +660,7 @@ private void ProcessChanges(IEnumerable<KeyValueChange> changes)
private async Task CallWithRequestTracing(Func<Task> clientCall)
avanigupta marked this conversation as resolved.
Show resolved Hide resolved
{
var requestType = _isInitialLoadComplete ? RequestType.Watch : RequestType.Startup;
await TracingUtils.CallWithRequestTracing(_requestTracingEnabled, requestType, _requestTracingOptions, clientCall).ConfigureAwait(false);
TracingUtils.CallWithRequestTracing(_requestTracingEnabled, requestType, _requestTracingOptions, clientCall);
}

private void SetRequestTracingOptions()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//
using Microsoft.Extensions.Logging;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Extensions.Configuration.AzureAppConfiguration
Expand Down Expand Up @@ -32,20 +33,20 @@ public void SetProvider(AzureAppConfigurationProvider provider)
AppConfigurationEndpoint = _provider.AppConfigurationEndpoint;
}

public async Task RefreshAsync()
public async Task RefreshAsync(CancellationToken cancellationToken)
{
ThrowIfNullProvider(nameof(RefreshAsync));
await _provider.RefreshAsync().ConfigureAwait(false);
await _provider.RefreshAsync(cancellationToken).ConfigureAwait(false);
}

public async Task<bool> TryRefreshAsync()
public async Task<bool> TryRefreshAsync(CancellationToken cancellationToken)
{
if (_provider == null)
{
return false;
}

return await _provider.TryRefreshAsync().ConfigureAwait(false);
return await _provider.TryRefreshAsync(cancellationToken).ConfigureAwait(false);
}

public void SetDirty(TimeSpan? maxDelay)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public static async Task<KeyValueChange> GetKeyValueChange(this ConfigurationCli
};
}

public static async Task<IEnumerable<KeyValueChange>> GetKeyValueChangeCollection(this ConfigurationClient client, IEnumerable<ConfigurationSetting> keyValues, GetKeyValueChangeCollectionOptions options)
public static async Task<IEnumerable<KeyValueChange>> GetKeyValueChangeCollection(this ConfigurationClient client, IEnumerable<ConfigurationSetting> keyValues, GetKeyValueChangeCollectionOptions options, CancellationToken cancellationToken)
{
if (options == null)
{
Expand Down Expand Up @@ -107,7 +107,7 @@ public static async Task<IEnumerable<KeyValueChange>> GetKeyValueChangeCollectio
await TracingUtils.CallWithRequestTracing(options.RequestTracingEnabled, RequestType.Watch, options.RequestTracingOptions,
async () =>
{
await foreach(ConfigurationSetting setting in client.GetConfigurationSettingsAsync(selector).ConfigureAwait(false))
await foreach(ConfigurationSetting setting in client.GetConfigurationSettingsAsync(selector, cancellationToken).ConfigureAwait(false))
{
if (!eTagMap.TryGetValue(setting.Key, out ETag etag) || !etag.Equals(setting.ETag))
{
Expand Down Expand Up @@ -140,7 +140,7 @@ await TracingUtils.CallWithRequestTracing(options.RequestTracingEnabled, Request
await TracingUtils.CallWithRequestTracing(options.RequestTracingEnabled, RequestType.Watch, options.RequestTracingOptions,
async () =>
{
await foreach (ConfigurationSetting setting in client.GetConfigurationSettingsAsync(selector).ConfigureAwait(false))
await foreach (ConfigurationSetting setting in client.GetConfigurationSettingsAsync(selector, cancellationToken).ConfigureAwait(false))
{
if (!eTagMap.TryGetValue(setting.Key, out ETag etag) || !etag.Equals(setting.ETag))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ public Task<IEnumerable<KeyValuePair<string, string>>> ProcessKeyValue(Configura
// Conditionally on based on feature filters
for (int i = 0; i < featureFlag.Conditions.ClientFilters.Count; i++)
{
if (cancellationToken.IsCancellationRequested)
{
cancellationToken.ThrowIfCancellationRequested();
}

ClientFilter clientFilter = featureFlag.Conditions.ClientFilters[i];

keyValues.Add(new KeyValuePair<string, string>($"{FeatureManagementConstants.SectionName}:{featureFlag.Id}:{FeatureManagementConstants.EnabledFor}:{i}:Name", clientFilter.Name));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Azure;
using Microsoft.Extensions.Logging;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Extensions.Configuration.AzureAppConfiguration
Expand All @@ -26,18 +27,20 @@ public interface IConfigurationRefresher
/// <summary>
/// Refreshes the data from App Configuration asynchronously.
/// </summary>
/// <param name="cancellationToken">The cancellation token to cancel the operation.</param>
/// <exception cref="KeyVaultReferenceException">An error occurred when resolving a reference to an Azure Key Vault resource.</exception>
/// <exception cref="RequestFailedException">The request failed with an error code from the server.</exception>
/// <exception cref="AggregateException">
/// The refresh operation failed with one or more errors. Check <see cref="AggregateException.InnerExceptions"/> for more details.
/// </exception>
/// <exception cref="InvalidOperationException">The refresh operation was invoked before Azure App Configuration Provider was initialized.</exception>
Task RefreshAsync();
Task RefreshAsync(CancellationToken cancellationToken = default);

/// <summary>
/// Refreshes the data from App Configuration asynchronously. A return value indicates whether the operation succeeded.
/// </summary>
Task<bool> TryRefreshAsync();
/// <param name="cancellationToken">The cancellation token to cancel the operation.</param>
Task<bool> TryRefreshAsync(CancellationToken cancellationToken = default);

/// <summary>
/// Sets the cached value for key-values registered for refresh as dirty.
Expand Down
Loading