Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade Resiliency: Adds Code to Enable Replica Validation Feature for Preview #3951

Merged
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<ClientOfficialVersion>3.35.1</ClientOfficialVersion>
<ClientPreviewVersion>3.35.1</ClientPreviewVersion>
<ClientPreviewSuffixVersion>preview</ClientPreviewSuffixVersion>
<DirectVersion>3.31.2</DirectVersion>
<DirectVersion>3.31.3</DirectVersion>
<EncryptionOfficialVersion>2.0.2</EncryptionOfficialVersion>
<EncryptionPreviewVersion>2.0.2</EncryptionPreviewVersion>
<EncryptionPreviewSuffixVersion>preview</EncryptionPreviewSuffixVersion>
Expand Down
7 changes: 5 additions & 2 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider
private const string DefaultInitTaskKey = "InitTaskKey";

private readonly bool IsLocalQuorumConsistency = false;
private readonly bool isReplicaAddressValidationEnabled;

//Auth
internal readonly AuthorizationTokenProvider cosmosAuthorization;

Expand Down Expand Up @@ -231,7 +233,7 @@ public DocumentClient(Uri serviceEndpoint,

this.Initialize(serviceEndpoint, connectionPolicy, desiredConsistencyLevel);
this.initTaskCache = new AsyncCacheNonBlocking<string, bool>(cancellationToken: this.cancellationTokenSource.Token);

this.isReplicaAddressValidationEnabled = ConfigurationManager.IsReplicaAddressValidationEnabled();
}

/// <summary>
Expand Down Expand Up @@ -6700,7 +6702,8 @@ private void CreateStoreModel(bool subscribeRntbdStatus)
this.ConnectionPolicy.EnableReadRequestsFallback ?? (this.accountServiceConfiguration.DefaultConsistencyLevel != Documents.ConsistencyLevel.BoundedStaleness),
!this.enableRntbdChannel,
this.UseMultipleWriteLocations && (this.accountServiceConfiguration.DefaultConsistencyLevel != Documents.ConsistencyLevel.Strong),
true);
true,
enableReplicaValidation: this.isReplicaAddressValidationEnabled);

if (subscribeRntbdStatus)
{
Expand Down
24 changes: 14 additions & 10 deletions Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public GatewayAddressCache(
CosmosHttpClient httpClient,
IOpenConnectionsHandler openConnectionsHandler,
long suboptimalPartitionForceRefreshIntervalInSeconds = 600,
bool enableTcpConnectionEndpointRediscovery = false)
bool enableTcpConnectionEndpointRediscovery = false,
bool replicaAddressValidationEnabled = false)
{
this.addressEndpoint = new Uri(serviceEndpoint + "/" + Paths.AddressPathSegment);
this.protocol = protocol;
Expand All @@ -90,9 +91,7 @@ public GatewayAddressCache(
GatewayAddressCache.ProtocolString(this.protocol));

this.openConnectionsHandler = openConnectionsHandler;
this.isReplicaAddressValidationEnabled = Helpers.GetEnvironmentVariable<bool>(
name: Constants.EnvironmentVariables.ReplicaConnectivityValidationEnabled,
defaultValue: false);
this.isReplicaAddressValidationEnabled = replicaAddressValidationEnabled;
}

public Uri ServiceEndpoint => this.serviceEndpoint;
Expand Down Expand Up @@ -158,7 +157,8 @@ public async Task OpenConnectionsAsync(
collectionRid: collection.ResourceId,
partitionKeyRangeIds: partitionKeyRangeIdentities.Skip(i).Take(batchSize).Select(range => range.PartitionKeyRangeId),
containerProperties: collection,
shouldOpenRntbdChannels: shouldOpenRntbdChannels));
shouldOpenRntbdChannels: shouldOpenRntbdChannels,
cancellationToken: cancellationToken));
}
}

Expand Down Expand Up @@ -348,12 +348,14 @@ public async Task<PartitionAddressInformation> TryGetAddressesAsync(
/// <param name="partitionKeyRangeIds">An instance of <see cref="IEnumerable{T}"/> containing the list of partition key range ids.</param>
/// <param name="containerProperties">An instance of <see cref="ContainerProperties"/> containing the collection properties.</param>
/// <param name="shouldOpenRntbdChannels">A boolean flag indicating whether Rntbd connections are required to be established to the backend replica nodes.</param>
/// <param name="cancellationToken">An instance of <see cref="CancellationToken"/>.</param>
private async Task WarmupCachesAndOpenConnectionsAsync(
DocumentServiceRequest request,
string collectionRid,
IEnumerable<string> partitionKeyRangeIds,
ContainerProperties containerProperties,
bool shouldOpenRntbdChannels)
bool shouldOpenRntbdChannels,
CancellationToken cancellationToken)
{
TryCatch<DocumentServiceResponse> documentServiceResponseWrapper = await this.GetAddressesAsync(
request: request,
Expand Down Expand Up @@ -381,6 +383,11 @@ private async Task WarmupCachesAndOpenConnectionsAsync(
List<Task> openConnectionTasks = new ();
foreach (Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation> addressInfo in addressInfos)
{
if (cancellationToken.IsCancellationRequested)
{
break;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OnCancellation behavior: With new changes its trying to gracefully complete right?
Current existing pattern across is to throw exception

cancellationToken.ThrowIfCancellationRequested()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this used during the CreateAndInitialize call? If so, we should avoid throwing. CreateAndInitialize is a best effort operation, if we throw, it will probably break customer application initialization

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is used in the CreateAndInitializeAsync() flow and throwing the exception might broke it, because the connection opening are more of a best effort, and should continue even if some of the connection warm ups failed.

}

this.serverPartitionAddressCache.Set(
new PartitionKeyRangeIdentity(containerProperties.ResourceId, addressInfo.Item1.PartitionKeyRangeId),
addressInfo.Item2);
Expand All @@ -398,10 +405,7 @@ private async Task WarmupCachesAndOpenConnectionsAsync(
}
}

if (openConnectionTasks.Any())
{
await Task.WhenAll(openConnectionTasks);
}
await Task.WhenAll(openConnectionTasks);
}
}
catch (Exception ex)
Expand Down
7 changes: 5 additions & 2 deletions Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ namespace Microsoft.Azure.Cosmos.Routing
internal sealed class GlobalAddressResolver : IAddressResolverExtension, IDisposable
{
private const int MaxBackupReadRegions = 3;

private readonly GlobalEndpointManager endpointManager;
private readonly GlobalPartitionEndpointManager partitionKeyRangeLocationCache;
private readonly Protocol protocol;
Expand All @@ -39,6 +38,7 @@ internal sealed class GlobalAddressResolver : IAddressResolverExtension, IDispos
private readonly CosmosHttpClient httpClient;
private readonly ConcurrentDictionary<Uri, EndpointCache> addressCacheByEndpoint;
private readonly bool enableTcpConnectionEndpointRediscovery;
private readonly bool isReplicaAddressValidationEnabled;
private IOpenConnectionsHandler openConnectionsHandler;

public GlobalAddressResolver(
Expand Down Expand Up @@ -67,6 +67,8 @@ public GlobalAddressResolver(

this.enableTcpConnectionEndpointRediscovery = connectionPolicy.EnableTcpConnectionEndpointRediscovery;

this.isReplicaAddressValidationEnabled = ConfigurationManager.IsReplicaAddressValidationEnabled();

this.maxEndpoints = maxBackupReadEndpoints + 2; // for write and alternate write endpoint (during failover)

this.addressCacheByEndpoint = new ConcurrentDictionary<Uri, EndpointCache>();
Expand Down Expand Up @@ -281,7 +283,8 @@ private EndpointCache GetOrAddEndpoint(Uri endpoint)
this.serviceConfigReader,
this.httpClient,
this.openConnectionsHandler,
enableTcpConnectionEndpointRediscovery: this.enableTcpConnectionEndpointRediscovery);
enableTcpConnectionEndpointRediscovery: this.enableTcpConnectionEndpointRediscovery,
replicaAddressValidationEnabled: this.isReplicaAddressValidationEnabled);

string location = this.endpointManager.GetLocation(endpoint);
AddressResolver addressResolver = new AddressResolver(null, new NullRequestSigner(), location);
Expand Down
30 changes: 29 additions & 1 deletion Microsoft.Azure.Cosmos/src/Util/ConfigurationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@ namespace Microsoft.Azure.Cosmos
using System;

internal static class ConfigurationManager
{
{
/// <summary>
/// A read-only string containing the environment variablename for enabling replica validation.
/// This will eventually be removed oncereplica valdiatin is enabled by default for both preview
/// and GA.
/// </summary>
private static readonly string ReplicaConnectivityValidationEnabled = "AZURE_COSMOS_REPLICA_VALIDATION_ENABLED";

public static T GetEnvironmentVariable<T>(string variable, T defaultValue)
{
string value = Environment.GetEnvironmentVariable(variable);
Expand All @@ -17,5 +24,26 @@ public static T GetEnvironmentVariable<T>(string variable, T defaultValue)
}
return (T)Convert.ChangeType(value, typeof(T));
}

/// <summary>
/// Gets the boolean value of the replica validation environment variable. Note that, replica validation
/// is enabled by default for the preview package and disabled for GA at the moment. The user can set the
/// respective environment variable 'AZURE_COSMOS_REPLICA_VALIDATION_ENABLED' to override the value for
/// both preview and GA. The method will eventually be removed, once replica valdiatin is enabled by default
/// for both preview and GA.
/// </summary>
/// <returns>A boolean flag indicating if replica validation is enabled.</returns>
public static bool IsReplicaAddressValidationEnabled()
{
bool replicaValidationDefaultValue = false;
#if PREVIEW
replicaValidationDefaultValue = true;
#endif

return ConfigurationManager
.GetEnvironmentVariable(
variable: ConfigurationManager.ReplicaConnectivityValidationEnabled,
defaultValue: replicaValidationDefaultValue);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ private async Task ValidateConnectTimeoutTriggersClientRetryPolicy(
enableReadRequestsFallback: false,
useMultipleWriteLocations: useMultipleWriteLocations,
detectClientConnectivityIssues: true,
disableRetryWithRetryPolicy: false);
disableRetryWithRetryPolicy: false,
enableReplicaValidation: false);

// Reducing retry timeout to avoid long-running tests
replicatedResourceClient.GoneAndRetryWithRetryTimeoutInSecondsOverride = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1000,19 +1000,8 @@ public async Task TryGetAddressesAsync_WhenReplicaVlidationEnabled_ShouldValidat
MockCosmosUtil.CreateCosmosHttpClient(() => httpClient),
openConnectionsHandler: fakeOpenConnectionHandler,
suboptimalPartitionForceRefreshIntervalInSeconds: 2,
enableTcpConnectionEndpointRediscovery: true);

// By default, the replica validation feature is disabled in GatewayAddressCache. Reflection is used to enable the feature
// for the purpose of this test.
FieldInfo fieldInfo = cache
.GetType()
.GetField(
name: "isReplicaAddressValidationEnabled",
bindingAttr: BindingFlags.Instance | BindingFlags.NonPublic);

fieldInfo.SetValue(
obj: cache,
value: true);
enableTcpConnectionEndpointRediscovery: true,
replicaAddressValidationEnabled: true);

DocumentServiceRequest request = DocumentServiceRequest.Create(OperationType.Invalid, ResourceType.Address, AuthorizationTokenType.Invalid);

Expand Down Expand Up @@ -1156,19 +1145,8 @@ public async Task TryGetAddressesAsync_WhenReplicaVlidationEnabledAndUnhealthyUr
MockCosmosUtil.CreateCosmosHttpClient(() => httpClient),
openConnectionsHandler: fakeOpenConnectionHandler,
suboptimalPartitionForceRefreshIntervalInSeconds: 2,
enableTcpConnectionEndpointRediscovery: true);

// By default, the replica validation feature is disabled in GatewayAddressCache. Reflection is used to enable the feature
// for the purpose of this test.
FieldInfo fieldInfo = cache
.GetType()
.GetField(
name: "isReplicaAddressValidationEnabled",
bindingAttr: BindingFlags.Instance | BindingFlags.NonPublic);

fieldInfo.SetValue(
obj: cache,
value: true);
enableTcpConnectionEndpointRediscovery: true,
replicaAddressValidationEnabled: true);

DocumentServiceRequest request = DocumentServiceRequest.Create(OperationType.Invalid, ResourceType.Address, AuthorizationTokenType.Invalid);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private StoreClient GetMockStoreClient()
TransportClient mockTransportClient = this.GetMockTransportClient();
ISessionContainer sessionContainer = new SessionContainer(string.Empty);

StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer);
StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer, false);

Mock<IAuthorizationTokenProvider> mockAuthorizationTokenProvider = new Mock<IAuthorizationTokenProvider>();
mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,8 @@ public void StoreReaderBarrierTest()
new StoreReader(mockTransportClient,
addressSelector,
new AddressEnumerator(),
sessionContainer);
sessionContainer,
enableReplicaValidation: false);

// reads always go to read quorum (2) replicas
int replicaCountToRead = 2;
Expand Down Expand Up @@ -611,14 +612,14 @@ public void GlobalStrongConsistentWriteMockTest()
for (int i = 0; i < addressInformation.Length; i++)
{
TransportClient mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, false, false, false);
StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer);
ConsistencyWriter consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false);
StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer, false);
ConsistencyWriter consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false);
StoreResponse response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result;
Assert.AreEqual(100, response.LSN);

//globalCommittedLsn never catches up in this case
mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, true, false, false);
consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false);
consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false);
try
{
response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result;
Expand All @@ -629,17 +630,17 @@ public void GlobalStrongConsistentWriteMockTest()
}

mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, false, true, false);
consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false);
consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false);
response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result;
Assert.AreEqual(100, response.LSN);

mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, false, true, true);
consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false);
consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false);
response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result;
Assert.AreEqual(100, response.LSN);

mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, false, false, true);
consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false);
consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false);
response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result;
Assert.AreEqual(100, response.LSN);
}
Expand Down Expand Up @@ -703,7 +704,8 @@ public void GlobalStrongConsistencyMockTest()
new StoreReader(mockTransportClient,
addressSelector,
new AddressEnumerator(),
sessionContainer);
sessionContainer,
false);

Mock<IAuthorizationTokenProvider> mockAuthorizationTokenProvider = new Mock<IAuthorizationTokenProvider>();
mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync(
Expand Down Expand Up @@ -746,7 +748,8 @@ public void GlobalStrongConsistencyMockTest()
new StoreReader(mockTransportClient,
addressSelector,
new AddressEnumerator(),
sessionContainer);
sessionContainer,
false);

Mock<IAuthorizationTokenProvider> mockAuthorizationTokenProvider = new Mock<IAuthorizationTokenProvider>();
mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync(
Expand Down Expand Up @@ -798,7 +801,8 @@ public void GlobalStrongConsistencyMockTest()
new StoreReader(mockTransportClient,
addressSelector,
new AddressEnumerator(),
sessionContainer);
sessionContainer,
false);

Mock<IAuthorizationTokenProvider> mockAuthorizationTokenProvider = new Mock<IAuthorizationTokenProvider>();
mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync(
Expand Down