Skip to content

Commit

Permalink
Add an option to enable load balancing between replicas (#535)
Browse files Browse the repository at this point in the history
* in progress shuffle clients

* first draft load balancing, need tests

* WIP logic for client shuffling - unsure how to incorporate priority

* WIP

* shuffle all clients together, fix logic for order of clients used

* WIP

* WIP store shuffle order for combined list

* WIP shuffle logic

* WIP new design

* clean up logic/leftover code

* move tests, check if dynamic clients are available in getclients

* remove unused code

* fix syntax issues, extend test

* fix logic to increment client index

* add clarifying comment

* remove tests for now

* WIP tests

* add some tests, will add more

* add to last test

* remove unused usings

* add extra verify statement to check client isnt used

* edit logic to treat passed in clients as highest priority

* PR comment revisions

* check for more than one client in load balancing logic

* set clients equal to new copied list before finding next available client

* remove convert list to clients
  • Loading branch information
amerjusupovic authored Apr 22, 2024
1 parent 2745270 commit e74a679
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,15 @@ public class AzureAppConfigurationOptions
private SortedSet<string> _keyPrefixes = new SortedSet<string>(Comparer<string>.Create((k1, k2) => -string.Compare(k1, k2, StringComparison.OrdinalIgnoreCase)));

/// <summary>
/// Flag to indicate whether enable replica discovery.
/// Flag to indicate whether replica discovery is enabled.
/// </summary>
public bool ReplicaDiscoveryEnabled { get; set; } = true;

/// <summary>
/// Flag to indicate whether load balancing is enabled.
/// </summary>
public bool LoadBalancingEnabled { get; set; }

/// <summary>
/// The list of connection strings used to connect to an Azure App Configuration store and its replicas.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ internal class AzureAppConfigurationProvider : ConfigurationProvider, IConfigura
private bool _isFeatureManagementVersionInspected;
private readonly bool _requestTracingEnabled;
private readonly IConfigurationClientManager _configClientManager;
private Uri _lastSuccessfulEndpoint;
private AzureAppConfigurationOptions _options;
private Dictionary<string, ConfigurationSetting> _mappedData;
private Dictionary<KeyValueIdentifier, ConfigurationSetting> _watchedSettings = new Dictionary<KeyValueIdentifier, ConfigurationSetting>();
Expand Down Expand Up @@ -990,6 +991,27 @@ private async Task<T> ExecuteWithFailOverPolicyAsync<T>(
Func<ConfigurationClient, Task<T>> funcToExecute,
CancellationToken cancellationToken = default)
{
if (_options.LoadBalancingEnabled && _lastSuccessfulEndpoint != null && clients.Count() > 1)
{
int nextClientIndex = 0;

foreach (ConfigurationClient client in clients)
{
nextClientIndex++;

if (_configClientManager.GetEndpointForClient(client) == _lastSuccessfulEndpoint)
{
break;
}
}

// If we found the last successful client, we'll rotate the list so that the next client is at the beginning
if (nextClientIndex < clients.Count())
{
clients = clients.Skip(nextClientIndex).Concat(clients.Take(nextClientIndex));
}
}

using IEnumerator<ConfigurationClient> clientEnumerator = clients.GetEnumerator();

clientEnumerator.MoveNext();
Expand All @@ -1010,6 +1032,8 @@ private async Task<T> ExecuteWithFailOverPolicyAsync<T>(
T result = await funcToExecute(currentClient).ConfigureAwait(false);
success = true;

_lastSuccessfulEndpoint = _configClientManager.GetEndpointForClient(currentClient);

return result;
}
catch (RequestFailedException rfe)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,20 @@ public IConfigurationProvider Build(IConfigurationBuilder builder)
}
else if (options.ConnectionStrings != null)
{
clientManager = new ConfigurationClientManager(options.ConnectionStrings, options.ClientOptions, options.ReplicaDiscoveryEnabled);
clientManager = new ConfigurationClientManager(
options.ConnectionStrings,
options.ClientOptions,
options.ReplicaDiscoveryEnabled,
options.LoadBalancingEnabled);
}
else if (options.Endpoints != null && options.Credential != null)
{
clientManager = new ConfigurationClientManager(options.Endpoints, options.Credential, options.ClientOptions, options.ReplicaDiscoveryEnabled);
clientManager = new ConfigurationClientManager(
options.Endpoints,
options.Credential,
options.ClientOptions,
options.ReplicaDiscoveryEnabled,
options.LoadBalancingEnabled);
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ internal class ConfigurationClientManager : IConfigurationClientManager, IDispos
public ConfigurationClientManager(
IEnumerable<string> connectionStrings,
ConfigurationClientOptions clientOptions,
bool replicaDiscoveryEnabled)
bool replicaDiscoveryEnabled,
bool loadBalancingEnabled)
{
if (connectionStrings == null || !connectionStrings.Any())
{
Expand All @@ -68,6 +69,12 @@ public ConfigurationClientManager(
_clientOptions = clientOptions;
_replicaDiscoveryEnabled = replicaDiscoveryEnabled;

// If load balancing is enabled, shuffle the passed in connection strings to randomize the endpoint used on startup
if (loadBalancingEnabled)
{
connectionStrings = connectionStrings.ToList().Shuffle();
}

_validDomain = GetValidDomain(_endpoint);
_srvLookupClient = new SrvLookupClient();

Expand All @@ -84,7 +91,8 @@ public ConfigurationClientManager(
IEnumerable<Uri> endpoints,
TokenCredential credential,
ConfigurationClientOptions clientOptions,
bool replicaDiscoveryEnabled)
bool replicaDiscoveryEnabled,
bool loadBalancingEnabled)
{
if (endpoints == null || !endpoints.Any())
{
Expand All @@ -101,6 +109,12 @@ public ConfigurationClientManager(
_clientOptions = clientOptions;
_replicaDiscoveryEnabled = replicaDiscoveryEnabled;

// If load balancing is enabled, shuffle the passed in endpoints to randomize the endpoint used on startup
if (loadBalancingEnabled)
{
endpoints = endpoints.ToList().Shuffle();
}

_validDomain = GetValidDomain(_endpoint);
_srvLookupClient = new SrvLookupClient();

Expand Down Expand Up @@ -132,6 +146,7 @@ public IEnumerable<ConfigurationClient> GetClients()
_ = DiscoverFallbackClients();
}

// Treat the passed in endpoints as the highest priority clients
IEnumerable<ConfigurationClient> clients = _clients.Select(c => c.Client);

if (_dynamicClients != null && _dynamicClients.Any())
Expand Down
15 changes: 10 additions & 5 deletions tests/Tests.AzureAppConfiguration/FailoverTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ public void FailOverTests_ValidateEndpoints()
new[] { new Uri("https://foobar.azconfig.io") },
new DefaultAzureCredential(),
new ConfigurationClientOptions(),
true);
true,
false);

Assert.True(configClientManager.IsValidEndpoint("azure.azconfig.io"));
Assert.True(configClientManager.IsValidEndpoint("appconfig.azconfig.io"));
Expand All @@ -287,7 +288,8 @@ public void FailOverTests_ValidateEndpoints()
new[] { new Uri("https://foobar.appconfig.azure.com") },
new DefaultAzureCredential(),
new ConfigurationClientOptions(),
true);
true,
false);

Assert.True(configClientManager2.IsValidEndpoint("azure.appconfig.azure.com"));
Assert.True(configClientManager2.IsValidEndpoint("azure.z1.appconfig.azure.com"));
Expand All @@ -302,7 +304,8 @@ public void FailOverTests_ValidateEndpoints()
new[] { new Uri("https://foobar.azconfig-test.io") },
new DefaultAzureCredential(),
new ConfigurationClientOptions(),
true);
true,
false);

Assert.False(configClientManager3.IsValidEndpoint("azure.azconfig-test.io"));
Assert.False(configClientManager3.IsValidEndpoint("azure.azconfig.io"));
Expand All @@ -311,7 +314,8 @@ public void FailOverTests_ValidateEndpoints()
new[] { new Uri("https://foobar.z1.appconfig-test.azure.com") },
new DefaultAzureCredential(),
new ConfigurationClientOptions(),
true);
true,
false);

Assert.False(configClientManager4.IsValidEndpoint("foobar.z2.appconfig-test.azure.com"));
Assert.False(configClientManager4.IsValidEndpoint("foobar.appconfig-test.azure.com"));
Expand All @@ -325,7 +329,8 @@ public void FailOverTests_GetNoDynamicClient()
new[] { new Uri("https://azure.azconfig.io") },
new DefaultAzureCredential(),
new ConfigurationClientOptions(),
true);
true,
false);

var clients = configClientManager.GetClients();

Expand Down
152 changes: 152 additions & 0 deletions tests/Tests.AzureAppConfiguration/LoadBalancingTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
//
using Azure;
using Azure.Core.Testing;
using Azure.Data.AppConfiguration;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Configuration.AzureAppConfiguration;
using Moq;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Xunit;

namespace Tests.AzureAppConfiguration
{
public class LoadBalancingTests
{
readonly ConfigurationSetting kv = ConfigurationModelFactory.ConfigurationSetting(key: "TestKey1", label: "label", value: "TestValue1",
eTag: new ETag("0a76e3d7-7ec1-4e37-883c-9ea6d0d89e63"),
contentType: "text");

TimeSpan CacheExpirationTime = TimeSpan.FromSeconds(1);

[Fact]
public void LoadBalancingTests_UsesAllEndpoints()
{
IConfigurationRefresher refresher = null;
var mockResponse = new MockResponse(200);

var mockClient1 = new Mock<ConfigurationClient>(MockBehavior.Strict);
mockClient1.Setup(c => c.GetConfigurationSettingsAsync(It.IsAny<SettingSelector>(), It.IsAny<CancellationToken>()))
.Returns(new MockAsyncPageable(Enumerable.Empty<ConfigurationSetting>().ToList()));
mockClient1.Setup(c => c.GetConfigurationSettingAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(Response.FromValue<ConfigurationSetting>(kv, mockResponse));
mockClient1.Setup(c => c.GetConfigurationSettingAsync(It.IsAny<ConfigurationSetting>(), It.IsAny<bool>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(Response.FromValue(kv, mockResponse));
mockClient1.Setup(c => c.Equals(mockClient1)).Returns(true);

var mockClient2 = new Mock<ConfigurationClient>(MockBehavior.Strict);
mockClient2.Setup(c => c.GetConfigurationSettingsAsync(It.IsAny<SettingSelector>(), It.IsAny<CancellationToken>()))
.Returns(new MockAsyncPageable(Enumerable.Empty<ConfigurationSetting>().ToList()));
mockClient2.Setup(c => c.GetConfigurationSettingAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(Response.FromValue<ConfigurationSetting>(kv, mockResponse));
mockClient2.Setup(c => c.GetConfigurationSettingAsync(It.IsAny<ConfigurationSetting>(), It.IsAny<bool>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(Response.FromValue(kv, mockResponse));
mockClient2.Setup(c => c.Equals(mockClient2)).Returns(true);

ConfigurationClientWrapper cw1 = new ConfigurationClientWrapper(TestHelpers.PrimaryConfigStoreEndpoint, mockClient1.Object);
ConfigurationClientWrapper cw2 = new ConfigurationClientWrapper(TestHelpers.SecondaryConfigStoreEndpoint, mockClient2.Object);

var clientList = new List<ConfigurationClientWrapper>() { cw1, cw2 };
var configClientManager = new ConfigurationClientManager(clientList);

var config = new ConfigurationBuilder()
.AddAzureAppConfiguration(options =>
{
options.ClientManager = configClientManager;
options.ConfigureRefresh(refreshOptions =>
{
refreshOptions.Register("TestKey1", "label")
.SetCacheExpiration(CacheExpirationTime);
});
options.ReplicaDiscoveryEnabled = false;
options.LoadBalancingEnabled = true;

refresher = options.GetRefresher();
}).Build();

// Ensure client 1 was used for startup
mockClient1.Verify(mc => mc.GetConfigurationSettingsAsync(It.IsAny<SettingSelector>(), It.IsAny<CancellationToken>()), Times.Exactly(1));

Thread.Sleep(CacheExpirationTime);
refresher.RefreshAsync().Wait();

// Ensure client 2 was used for refresh
mockClient1.Verify(mc => mc.GetConfigurationSettingAsync(It.IsAny<ConfigurationSetting>(), It.IsAny<bool>(), It.IsAny<CancellationToken>()), Times.Exactly(0));

mockClient2.Verify(mc => mc.GetConfigurationSettingAsync(It.IsAny<ConfigurationSetting>(), It.IsAny<bool>(), It.IsAny<CancellationToken>()), Times.Exactly(1));

Thread.Sleep(CacheExpirationTime);
refresher.RefreshAsync().Wait();

// Ensure client 1 was now used for refresh
mockClient1.Verify(mc => mc.GetConfigurationSettingAsync(It.IsAny<ConfigurationSetting>(), It.IsAny<bool>(), It.IsAny<CancellationToken>()), Times.Exactly(1));
}

[Fact]
public void LoadBalancingTests_UsesClientAfterBackoffEnds()
{
IConfigurationRefresher refresher = null;
var mockResponse = new MockResponse(200);

var mockClient1 = new Mock<ConfigurationClient>(MockBehavior.Strict);
mockClient1.Setup(c => c.GetConfigurationSettingsAsync(It.IsAny<SettingSelector>(), It.IsAny<CancellationToken>()))
.Throws(new RequestFailedException(503, "Request failed."));
mockClient1.Setup(c => c.GetConfigurationSettingAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(Response.FromValue<ConfigurationSetting>(kv, mockResponse));
mockClient1.Setup(c => c.GetConfigurationSettingAsync(It.IsAny<ConfigurationSetting>(), It.IsAny<bool>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(Response.FromValue(kv, mockResponse));
mockClient1.Setup(c => c.Equals(mockClient1)).Returns(true);

var mockClient2 = new Mock<ConfigurationClient>(MockBehavior.Strict);
mockClient2.Setup(c => c.GetConfigurationSettingsAsync(It.IsAny<SettingSelector>(), It.IsAny<CancellationToken>()))
.Returns(new MockAsyncPageable(Enumerable.Empty<ConfigurationSetting>().ToList()));
mockClient2.Setup(c => c.GetConfigurationSettingAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(Response.FromValue<ConfigurationSetting>(kv, mockResponse));
mockClient2.Setup(c => c.GetConfigurationSettingAsync(It.IsAny<ConfigurationSetting>(), It.IsAny<bool>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(Response.FromValue(kv, mockResponse));
mockClient2.Setup(c => c.Equals(mockClient2)).Returns(true);

ConfigurationClientWrapper cw1 = new ConfigurationClientWrapper(TestHelpers.PrimaryConfigStoreEndpoint, mockClient1.Object);
ConfigurationClientWrapper cw2 = new ConfigurationClientWrapper(TestHelpers.SecondaryConfigStoreEndpoint, mockClient2.Object);

var clientList = new List<ConfigurationClientWrapper>() { cw1, cw2 };
var configClientManager = new ConfigurationClientManager(clientList);

var config = new ConfigurationBuilder()
.AddAzureAppConfiguration(options =>
{
options.MinBackoffDuration = TimeSpan.FromSeconds(2);
options.ClientManager = configClientManager;
options.ConfigureRefresh(refreshOptions =>
{
refreshOptions.Register("TestKey1", "label")
.SetCacheExpiration(CacheExpirationTime);
});
options.ReplicaDiscoveryEnabled = false;
options.LoadBalancingEnabled = true;

refresher = options.GetRefresher();
}).Build();

// Ensure client 2 was used for startup
mockClient2.Verify(mc => mc.GetConfigurationSettingsAsync(It.IsAny<SettingSelector>(), It.IsAny<CancellationToken>()), Times.Exactly(1));

Thread.Sleep(TimeSpan.FromSeconds(2));
refresher.RefreshAsync().Wait();

// Ensure client 1 has recovered and is used for refresh
mockClient2.Verify(mc => mc.GetConfigurationSettingAsync(It.IsAny<ConfigurationSetting>(), It.IsAny<bool>(), It.IsAny<CancellationToken>()), Times.Exactly(0));

mockClient1.Verify(mc => mc.GetConfigurationSettingAsync(It.IsAny<ConfigurationSetting>(), It.IsAny<bool>(), It.IsAny<CancellationToken>()), Times.Exactly(1));

Thread.Sleep(CacheExpirationTime);
refresher.RefreshAsync().Wait();

mockClient2.Verify(mc => mc.GetConfigurationSettingAsync(It.IsAny<ConfigurationSetting>(), It.IsAny<bool>(), It.IsAny<CancellationToken>()), Times.Exactly(1));
}
}
}

0 comments on commit e74a679

Please sign in to comment.