From 2d076e0c0ee0e361a81c453c603a7e5a8d18918e Mon Sep 17 00:00:00 2001 From: Debdatta Kunda Date: Mon, 26 Jun 2023 17:59:48 -0700 Subject: [PATCH 1/9] Code changes to add replica validation feature in cosmos client options. --- .../src/ConnectionPolicy.cs | 12 +++++++++++ .../src/CosmosClientOptions.cs | 17 ++++++++++++++++ Microsoft.Azure.Cosmos/src/DocumentClient.cs | 3 ++- .../src/Fluent/CosmosClientBuilder.cs | 20 +++++++++++++++++++ .../src/Routing/GatewayAddressCache.cs | 7 +++---- .../src/Routing/GlobalAddressResolver.cs | 5 ++++- 6 files changed, 58 insertions(+), 6 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs b/Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs index 7abfd76deb..a0cfef1347 100644 --- a/Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs +++ b/Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs @@ -459,6 +459,18 @@ public Func HttpClientFactory set; } + /// + /// Gets or sets the boolean flag to enable replica validation. + /// + /// + /// The default value is false + /// + public bool EnableReplicaValidation + { + get; + set; + } + /// /// (Direct/TCP) This is an advanced setting that controls the number of TCP connections that will be opened eagerly to each Cosmos DB back-end. /// diff --git a/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs b/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs index 1fce195915..875fbba29a 100644 --- a/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs +++ b/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs @@ -346,6 +346,23 @@ public ConnectionMode ConnectionMode /// public bool? EnableContentResponseOnWrite { get; set; } + /// + /// Gets or sets the prioritize healthy replicas flag. + /// Prioritizing healthy replicas helps the cosmos client to become more + /// resilient to connection timeouts, by choosing a healthy replica over an + /// unhealthy one. The default value for this parameter is false. + /// + /// + /// This is optimal for workloads where latency spikes are critical during upgrades. + /// + /// +#if PREVIEW + public +#else + internal +#endif + bool PrioritizeHealthyReplicas { get; set; } + /// /// (Direct/TCP) Controls the amount of idle time after which unused connections are closed. /// diff --git a/Microsoft.Azure.Cosmos/src/DocumentClient.cs b/Microsoft.Azure.Cosmos/src/DocumentClient.cs index 068bb5b857..015959ba50 100644 --- a/Microsoft.Azure.Cosmos/src/DocumentClient.cs +++ b/Microsoft.Azure.Cosmos/src/DocumentClient.cs @@ -6700,7 +6700,8 @@ private void CreateStoreModel(bool subscribeRntbdStatus) this.ConnectionPolicy.EnableReadRequestsFallback ?? (this.accountServiceConfiguration.DefaultConsistencyLevel != Documents.ConsistencyLevel.BoundedStaleness), !this.enableRntbdChannel, this.UseMultipleWriteLocations && (this.accountServiceConfiguration.DefaultConsistencyLevel != Documents.ConsistencyLevel.Strong), - true); + true, + this.ConnectionPolicy.EnableReplicaValidation); if (subscribeRntbdStatus) { diff --git a/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs b/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs index 8b72bfffa4..d2c3546204 100644 --- a/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs +++ b/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs @@ -621,6 +621,26 @@ public CosmosClientBuilder WithContentResponseOnWrite(bool contentResponseOnWrit return this; } + /// + /// Gets or sets the prioritize healthy replicas flag. + /// Prioritizing healthy replicas helps the cosmos client to become more + /// resilient to connection timeouts, by choosing a healthy replica over an + /// unhealthy one. The default value for this parameter is false. + /// + /// a boolean flag indicating if the feature will be enabled. + /// The object +#if PREVIEW + public +#else + internal +#endif + CosmosClientBuilder WithPrioritizeHealthyReplicas( + bool replicaValidationEnabled) + { + this.clientOptions.PrioritizeHealthyReplicas = replicaValidationEnabled; + return this; + } + /// /// The event handler to be invoked before the request is sent. /// diff --git a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs index f864256df9..5c0719bf88 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs @@ -66,7 +66,8 @@ public GatewayAddressCache( CosmosHttpClient httpClient, IOpenConnectionsHandler openConnectionsHandler, long suboptimalPartitionForceRefreshIntervalInSeconds = 600, - bool enableTcpConnectionEndpointRediscovery = false) + bool enableTcpConnectionEndpointRediscovery = false, + bool replicaAddressValidationEnabled = false) { this.addressEndpoint = new Uri(serviceEndpoint + "/" + Paths.AddressPathSegment); this.protocol = protocol; @@ -90,9 +91,7 @@ public GatewayAddressCache( GatewayAddressCache.ProtocolString(this.protocol)); this.openConnectionsHandler = openConnectionsHandler; - this.isReplicaAddressValidationEnabled = Helpers.GetEnvironmentVariable( - name: Constants.EnvironmentVariables.ReplicaConnectivityValidationEnabled, - defaultValue: false); + this.isReplicaAddressValidationEnabled = replicaAddressValidationEnabled; } public Uri ServiceEndpoint => this.serviceEndpoint; diff --git a/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs b/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs index 344f994395..4e4118965b 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs @@ -39,6 +39,7 @@ internal sealed class GlobalAddressResolver : IAddressResolverExtension, IDispos private readonly CosmosHttpClient httpClient; private readonly ConcurrentDictionary addressCacheByEndpoint; private readonly bool enableTcpConnectionEndpointRediscovery; + private readonly bool replicaAddressValidationEnabled; private IOpenConnectionsHandler openConnectionsHandler; public GlobalAddressResolver( @@ -66,6 +67,7 @@ public GlobalAddressResolver( ? GlobalAddressResolver.MaxBackupReadRegions : 0; this.enableTcpConnectionEndpointRediscovery = connectionPolicy.EnableTcpConnectionEndpointRediscovery; + this.replicaAddressValidationEnabled = connectionPolicy.EnableReplicaValidation; this.maxEndpoints = maxBackupReadEndpoints + 2; // for write and alternate write endpoint (during failover) @@ -281,7 +283,8 @@ private EndpointCache GetOrAddEndpoint(Uri endpoint) this.serviceConfigReader, this.httpClient, this.openConnectionsHandler, - enableTcpConnectionEndpointRediscovery: this.enableTcpConnectionEndpointRediscovery); + enableTcpConnectionEndpointRediscovery: this.enableTcpConnectionEndpointRediscovery, + replicaAddressValidationEnabled: this.replicaAddressValidationEnabled); string location = this.endpointManager.GetLocation(endpoint); AddressResolver addressResolver = new AddressResolver(null, new NullRequestSigner(), location); From 05817df2f74a6757dc24d2c0bc778dc27503d3e6 Mon Sep 17 00:00:00 2001 From: Debdatta Kunda Date: Tue, 27 Jun 2023 15:42:49 -0700 Subject: [PATCH 2/9] Code changes to upgrade the cosmos direct version to 3.31.3. --- Directory.Build.props | 2 +- .../src/CosmosClientOptions.cs | 15 ++++++------ Microsoft.Azure.Cosmos/src/DocumentClient.cs | 2 +- .../src/Fluent/CosmosClientBuilder.cs | 14 +++++------ .../src/Routing/GatewayAddressCache.cs | 20 +++++++++------- .../ClientRetryPolicyTests.cs | 3 ++- .../CosmosClientOptionsUnitTests.cs | 9 +++++-- .../RequestEventHandlerTests.cs | 2 +- .../StoreReaderTest.cs | 24 +++++++++++-------- 9 files changed, 52 insertions(+), 39 deletions(-) diff --git a/Directory.Build.props b/Directory.Build.props index 55a67ffec4..71b854a5c7 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -3,7 +3,7 @@ 3.35.1 3.35.1 preview - 3.31.2 + 3.31.3 2.0.2 2.0.2 preview diff --git a/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs b/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs index 875fbba29a..0baea086de 100644 --- a/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs +++ b/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs @@ -347,21 +347,21 @@ public ConnectionMode ConnectionMode public bool? EnableContentResponseOnWrite { get; set; } /// - /// Gets or sets the prioritize healthy replicas flag. - /// Prioritizing healthy replicas helps the cosmos client to become more - /// resilient to connection timeouts, by choosing a healthy replica over an - /// unhealthy one. The default value for this parameter is false. + /// Gets or sets the advanced replica selection flag. The advanced replica selection logic keeps track of the replica connection + /// status, and based on status, it prioritizes the replicas which are connected to the backend, so that the requests can be sent + /// confidently to the particular replica. This helps the cosmos client to become more resilient and effictive to any connection + /// timeouts. The default value for this parameter is false. /// /// - /// This is optimal for workloads where latency spikes are critical during upgrades. + /// This is optimal for workloads where latency spikes are critical due to connection timeouts. Does not apply if is used. /// - /// + /// #if PREVIEW public #else internal #endif - bool PrioritizeHealthyReplicas { get; set; } + bool EnableAdvancedReplicaSelectionForTcp { get; set; } /// /// (Direct/TCP) Controls the amount of idle time after which unused connections are closed. @@ -775,6 +775,7 @@ internal virtual ConnectionPolicy GetConnectionPolicy(int clientId) EnablePartitionLevelFailover = this.EnablePartitionLevelFailover, PortReuseMode = this.portReuseMode, EnableTcpConnectionEndpointRediscovery = this.EnableTcpConnectionEndpointRediscovery, + EnableReplicaValidation = this.EnableAdvancedReplicaSelectionForTcp, HttpClientFactory = this.httpClientFactory, ServerCertificateCustomValidationCallback = this.ServerCertificateCustomValidationCallback }; diff --git a/Microsoft.Azure.Cosmos/src/DocumentClient.cs b/Microsoft.Azure.Cosmos/src/DocumentClient.cs index 015959ba50..ca423eee00 100644 --- a/Microsoft.Azure.Cosmos/src/DocumentClient.cs +++ b/Microsoft.Azure.Cosmos/src/DocumentClient.cs @@ -6701,7 +6701,7 @@ private void CreateStoreModel(bool subscribeRntbdStatus) !this.enableRntbdChannel, this.UseMultipleWriteLocations && (this.accountServiceConfiguration.DefaultConsistencyLevel != Documents.ConsistencyLevel.Strong), true, - this.ConnectionPolicy.EnableReplicaValidation); + enableReplicaValidation: this.ConnectionPolicy.EnableReplicaValidation); if (subscribeRntbdStatus) { diff --git a/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs b/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs index d2c3546204..35970bcca0 100644 --- a/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs +++ b/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs @@ -622,22 +622,20 @@ public CosmosClientBuilder WithContentResponseOnWrite(bool contentResponseOnWrit } /// - /// Gets or sets the prioritize healthy replicas flag. - /// Prioritizing healthy replicas helps the cosmos client to become more - /// resilient to connection timeouts, by choosing a healthy replica over an - /// unhealthy one. The default value for this parameter is false. + /// Enables the advanced replica selection flag. The advanced replica selection logic keeps track of the replica connection status, + /// and based on status, it prioritizes the replicas which are connected to the backend, so that the requests can be sent + /// confidently to the particular replica. This helps the cosmos client to become more resilient and effictive to any connection + /// timeouts. The default value for this parameter is false. /// - /// a boolean flag indicating if the feature will be enabled. /// The object #if PREVIEW public #else internal #endif - CosmosClientBuilder WithPrioritizeHealthyReplicas( - bool replicaValidationEnabled) + CosmosClientBuilder WithAdvancedReplicaSelectionEnabledForTcp() { - this.clientOptions.PrioritizeHealthyReplicas = replicaValidationEnabled; + this.clientOptions.EnableAdvancedReplicaSelectionForTcp = true; return this; } diff --git a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs index 5c0719bf88..f47f1496b9 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs @@ -143,6 +143,7 @@ public async Task OpenConnectionsAsync( Paths.CollectionsPathSegment, Uri.EscapeUriString(collection.Id)); + using CancellationTokenSource linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); using (DocumentServiceRequest request = DocumentServiceRequest.CreateFromName( OperationType.Read, collectionAltLink, @@ -157,12 +158,11 @@ public async Task OpenConnectionsAsync( collectionRid: collection.ResourceId, partitionKeyRangeIds: partitionKeyRangeIdentities.Skip(i).Take(batchSize).Select(range => range.PartitionKeyRangeId), containerProperties: collection, - shouldOpenRntbdChannels: shouldOpenRntbdChannels)); + shouldOpenRntbdChannels: shouldOpenRntbdChannels, + cancellationToken: linkedTokenSource.Token)); } } - using CancellationTokenSource linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - // The `timeoutTask` is a background task which adds a delay for a period of WarmupCacheAndOpenConnectionTimeout. The task will // be cancelled either by - a) when `linkedTokenSource` expires, which means the original `cancellationToken` expires or // b) the the `linkedTokenSource.Cancel()` is called. @@ -347,12 +347,14 @@ public async Task TryGetAddressesAsync( /// An instance of containing the list of partition key range ids. /// An instance of containing the collection properties. /// A boolean flag indicating whether Rntbd connections are required to be established to the backend replica nodes. + /// An instance of . private async Task WarmupCachesAndOpenConnectionsAsync( DocumentServiceRequest request, string collectionRid, IEnumerable partitionKeyRangeIds, ContainerProperties containerProperties, - bool shouldOpenRntbdChannels) + bool shouldOpenRntbdChannels, + CancellationToken cancellationToken) { TryCatch documentServiceResponseWrapper = await this.GetAddressesAsync( request: request, @@ -380,6 +382,11 @@ private async Task WarmupCachesAndOpenConnectionsAsync( List openConnectionTasks = new (); foreach (Tuple addressInfo in addressInfos) { + if (cancellationToken.IsCancellationRequested) + { + break; + } + this.serverPartitionAddressCache.Set( new PartitionKeyRangeIdentity(containerProperties.ResourceId, addressInfo.Item1.PartitionKeyRangeId), addressInfo.Item2); @@ -397,10 +404,7 @@ private async Task WarmupCachesAndOpenConnectionsAsync( } } - if (openConnectionTasks.Any()) - { - await Task.WhenAll(openConnectionTasks); - } + await Task.WhenAll(openConnectionTasks); } } catch (Exception ex) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ClientRetryPolicyTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ClientRetryPolicyTests.cs index a41f08afa1..0851952f40 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ClientRetryPolicyTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ClientRetryPolicyTests.cs @@ -209,7 +209,8 @@ private async Task ValidateConnectTimeoutTriggersClientRetryPolicy( enableReadRequestsFallback: false, useMultipleWriteLocations: useMultipleWriteLocations, detectClientConnectivityIssues: true, - disableRetryWithRetryPolicy: false); + disableRetryWithRetryPolicy: false, + enableReplicaValidation: false); // Reducing retry timeout to avoid long-running tests replicatedResourceClient.GoneAndRetryWithRetryTimeoutInSecondsOverride = 1; diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientOptionsUnitTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientOptionsUnitTests.cs index efb06d7cc5..e6c5ea6f92 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientOptionsUnitTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientOptionsUnitTests.cs @@ -81,6 +81,7 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated() Assert.IsNull(clientOptions.HttpClientFactory); Assert.AreNotEqual(consistencyLevel, clientOptions.ConsistencyLevel); Assert.IsFalse(clientOptions.EnablePartitionLevelFailover); + Assert.IsFalse(clientOptions.EnableAdvancedReplicaSelectionForTcp); //Verify GetConnectionPolicy returns the correct values for default ConnectionPolicy policy = clientOptions.GetConnectionPolicy(clientId: 0); @@ -97,6 +98,7 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated() Assert.IsNull(policy.HttpClientFactory); Assert.AreNotEqual(Cosmos.ConsistencyLevel.Session, clientOptions.ConsistencyLevel); Assert.IsFalse(policy.EnablePartitionLevelFailover); + Assert.IsFalse(policy.EnableReplicaValidation); cosmosClientBuilder.WithApplicationRegion(region) .WithConnectionModeGateway(maxConnections, webProxy) @@ -108,7 +110,8 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated() .WithBulkExecution(true) .WithSerializerOptions(cosmosSerializerOptions) .WithConsistencyLevel(consistencyLevel) - .WithPartitionLevelFailoverEnabled(); + .WithPartitionLevelFailoverEnabled() + .WithAdvancedReplicaSelectionEnabledForTcp(); cosmosClient = cosmosClientBuilder.Build(new MockDocumentClient()); clientOptions = cosmosClient.ClientOptions; @@ -131,6 +134,7 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated() Assert.IsTrue(clientOptions.AllowBulkExecution); Assert.AreEqual(consistencyLevel, clientOptions.ConsistencyLevel); Assert.IsTrue(clientOptions.EnablePartitionLevelFailover); + Assert.IsTrue(clientOptions.EnableAdvancedReplicaSelectionForTcp); //Verify GetConnectionPolicy returns the correct values policy = clientOptions.GetConnectionPolicy(clientId: 0); @@ -145,7 +149,8 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated() Assert.AreEqual((int)maxRetryWaitTime.TotalSeconds, policy.RetryOptions.MaxRetryWaitTimeInSeconds); Assert.AreEqual((Documents.ConsistencyLevel)consistencyLevel, clientOptions.GetDocumentsConsistencyLevel()); Assert.IsTrue(policy.EnablePartitionLevelFailover); - + Assert.IsTrue(policy.EnableReplicaValidation); + IReadOnlyList preferredLocations = new List() { Regions.AustraliaCentral, Regions.AustraliaCentral2 }; //Verify Direct Mode settings cosmosClientBuilder = new CosmosClientBuilder( diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/RequestEventHandlerTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/RequestEventHandlerTests.cs index f5602a1f29..9a19fb4a90 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/RequestEventHandlerTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/RequestEventHandlerTests.cs @@ -82,7 +82,7 @@ private StoreClient GetMockStoreClient() TransportClient mockTransportClient = this.GetMockTransportClient(); ISessionContainer sessionContainer = new SessionContainer(string.Empty); - StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer); + StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer, false); Mock mockAuthorizationTokenProvider = new Mock(); mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync( diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/StoreReaderTest.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/StoreReaderTest.cs index b92ca3c804..831e32041a 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/StoreReaderTest.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/StoreReaderTest.cs @@ -536,7 +536,8 @@ public void StoreReaderBarrierTest() new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), - sessionContainer); + sessionContainer, + enableReplicaValidation: false); // reads always go to read quorum (2) replicas int replicaCountToRead = 2; @@ -611,14 +612,14 @@ public void GlobalStrongConsistentWriteMockTest() for (int i = 0; i < addressInformation.Length; i++) { TransportClient mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, false, false, false); - StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer); - ConsistencyWriter consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false); + StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer, false); + ConsistencyWriter consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false); StoreResponse response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result; Assert.AreEqual(100, response.LSN); //globalCommittedLsn never catches up in this case mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, true, false, false); - consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false); + consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false); try { response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result; @@ -629,17 +630,17 @@ public void GlobalStrongConsistentWriteMockTest() } mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, false, true, false); - consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false); + consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false); response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result; Assert.AreEqual(100, response.LSN); mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, false, true, true); - consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false); + consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false); response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result; Assert.AreEqual(100, response.LSN); mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, false, false, true); - consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false); + consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false); response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result; Assert.AreEqual(100, response.LSN); } @@ -703,7 +704,8 @@ public void GlobalStrongConsistencyMockTest() new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), - sessionContainer); + sessionContainer, + false); Mock mockAuthorizationTokenProvider = new Mock(); mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync( @@ -746,7 +748,8 @@ public void GlobalStrongConsistencyMockTest() new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), - sessionContainer); + sessionContainer, + false); Mock mockAuthorizationTokenProvider = new Mock(); mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync( @@ -798,7 +801,8 @@ public void GlobalStrongConsistencyMockTest() new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), - sessionContainer); + sessionContainer, + false); Mock mockAuthorizationTokenProvider = new Mock(); mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync( From 3da28952bb558edfef2cfec793d061fc37795e9c Mon Sep 17 00:00:00 2001 From: Debdatta Kunda Date: Tue, 27 Jun 2023 17:04:32 -0700 Subject: [PATCH 3/9] Adding emulator test to cover replica validation. --- .../CosmosReadManyItemsTests.cs | 60 ++++++++++++++----- 1 file changed, 45 insertions(+), 15 deletions(-) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosReadManyItemsTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosReadManyItemsTests.cs index d215b6b18b..cbeb50522f 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosReadManyItemsTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosReadManyItemsTests.cs @@ -51,27 +51,57 @@ public async Task Cleanup() } [TestMethod] - public async Task ReadManyTypedTest() + [DataRow(true, DisplayName = "Validates Read Many scenario with advanced replica selection enabled.")] + [DataRow(false, DisplayName = "Validates Read Many scenario with advanced replica selection disabled.")] + public async Task ReadManyTypedTestWithAdvancedReplicaSelection( + bool advancedReplicaSelectionEnabled) { - List<(string, PartitionKey)> itemList = new List<(string, PartitionKey)>(); - for (int i=0; i<10; i++) + CosmosClient cosmosClient = advancedReplicaSelectionEnabled + ? TestCommon.CreateCosmosClient( + customizeClientBuilder: (CosmosClientBuilder builder) => builder.WithAdvancedReplicaSelectionEnabledForTcp()) + : TestCommon.CreateCosmosClient(); + + Database database = null; + try { - itemList.Add((i.ToString(), new PartitionKey("pk" + i.ToString()))); - } + database = await cosmosClient.CreateDatabaseAsync("ReadManyTypedTestScenarioDb"); + Container containerCC1 = await database.CreateContainerAsync("ReadManyTypedTestContainer", "/pk"); - FeedResponse feedResponse= await this.Container.ReadManyItemsAsync(itemList); - Assert.IsNotNull(feedResponse); - Assert.AreEqual(feedResponse.Count, 10); - Assert.IsTrue(feedResponse.Headers.RequestCharge > 0); - Assert.IsNotNull(feedResponse.Diagnostics); + // Create items with different pk values + for (int i = 0; i < 500; i++) + { + ToDoActivity item = ToDoActivity.CreateRandomToDoActivity(); + item.pk = "pk" + i.ToString(); + item.id = i.ToString(); + ItemResponse itemResponse = await containerCC1.CreateItemAsync(item); + Assert.AreEqual(HttpStatusCode.Created, itemResponse.StatusCode); + } + + List<(string, PartitionKey)> itemList = new List<(string, PartitionKey)>(); + for (int i = 0; i < 20; i++) + { + itemList.Add((i.ToString(), new PartitionKey("pk" + i.ToString()))); + } + + FeedResponse feedResponse = await containerCC1.ReadManyItemsAsync(itemList); + Assert.IsNotNull(feedResponse); + Assert.AreEqual(20, feedResponse.Count); + Assert.IsTrue(feedResponse.Headers.RequestCharge > 0); + Assert.IsNotNull(feedResponse.Diagnostics); - int count = 0; - foreach (ToDoActivity item in feedResponse) + int count = 0; + foreach (ToDoActivity item in feedResponse) + { + count++; + Assert.IsNotNull(item); + } + Assert.AreEqual(20, count); + } + finally { - count++; - Assert.IsNotNull(item); + await database.DeleteAsync(); + cosmosClient.Dispose(); } - Assert.AreEqual(count, 10); } [TestMethod] From f64470c32d986af87694c23862d3b3bfb4dd5f1a Mon Sep 17 00:00:00 2001 From: Debdatta Kunda Date: Wed, 28 Jun 2023 14:21:05 -0700 Subject: [PATCH 4/9] Code changes to address cosmetic clean ups. --- Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs | 2 +- .../CosmosReadManyItemsTests.cs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs b/Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs index a0cfef1347..a92d196464 100644 --- a/Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs +++ b/Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs @@ -463,7 +463,7 @@ public Func HttpClientFactory /// Gets or sets the boolean flag to enable replica validation. /// /// - /// The default value is false + /// The default value for this parameter is false. /// public bool EnableReplicaValidation { diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosReadManyItemsTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosReadManyItemsTests.cs index cbeb50522f..2747531a77 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosReadManyItemsTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosReadManyItemsTests.cs @@ -65,7 +65,7 @@ public async Task ReadManyTypedTestWithAdvancedReplicaSelection( try { database = await cosmosClient.CreateDatabaseAsync("ReadManyTypedTestScenarioDb"); - Container containerCC1 = await database.CreateContainerAsync("ReadManyTypedTestContainer", "/pk"); + Container container = await database.CreateContainerAsync("ReadManyTypedTestContainer", "/pk"); // Create items with different pk values for (int i = 0; i < 500; i++) @@ -73,7 +73,7 @@ public async Task ReadManyTypedTestWithAdvancedReplicaSelection( ToDoActivity item = ToDoActivity.CreateRandomToDoActivity(); item.pk = "pk" + i.ToString(); item.id = i.ToString(); - ItemResponse itemResponse = await containerCC1.CreateItemAsync(item); + ItemResponse itemResponse = await container.CreateItemAsync(item); Assert.AreEqual(HttpStatusCode.Created, itemResponse.StatusCode); } @@ -83,7 +83,7 @@ public async Task ReadManyTypedTestWithAdvancedReplicaSelection( itemList.Add((i.ToString(), new PartitionKey("pk" + i.ToString()))); } - FeedResponse feedResponse = await containerCC1.ReadManyItemsAsync(itemList); + FeedResponse feedResponse = await container.ReadManyItemsAsync(itemList); Assert.IsNotNull(feedResponse); Assert.AreEqual(20, feedResponse.Count); Assert.IsTrue(feedResponse.Headers.RequestCharge > 0); From c4382f630871f55e8053533692d1d5f6ea468094 Mon Sep 17 00:00:00 2001 From: Debdatta Kunda Date: Wed, 28 Jun 2023 15:45:52 -0700 Subject: [PATCH 5/9] Code changes to address review comments. Fixed preview build failures. --- .../src/ConnectionPolicy.cs | 2 +- .../src/CosmosClientOptions.cs | 10 ++++---- Microsoft.Azure.Cosmos/src/DocumentClient.cs | 2 +- .../src/Routing/GlobalAddressResolver.cs | 2 +- .../Contracts/DotNetPreviewSDKAPI.json | 24 +++++++++++++++++++ .../CosmosClientOptionsUnitTests.cs | 4 ++-- 6 files changed, 34 insertions(+), 10 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs b/Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs index a92d196464..bd91a96b15 100644 --- a/Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs +++ b/Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs @@ -465,7 +465,7 @@ public Func HttpClientFactory /// /// The default value for this parameter is false. /// - public bool EnableReplicaValidation + public bool EnableAdvancedReplicaSelectionForTcp { get; set; diff --git a/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs b/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs index 0baea086de..3b6479016a 100644 --- a/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs +++ b/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs @@ -348,12 +348,12 @@ public ConnectionMode ConnectionMode /// /// Gets or sets the advanced replica selection flag. The advanced replica selection logic keeps track of the replica connection - /// status, and based on status, it prioritizes the replicas which are connected to the backend, so that the requests can be sent - /// confidently to the particular replica. This helps the cosmos client to become more resilient and effictive to any connection - /// timeouts. The default value for this parameter is false. + /// status, and based on status, it prioritizes the replicas which show healthy stable connections, so that the requests can be sent + /// confidently to the particular replica. This helps the cosmos client to become more resilient and effective to any connectivity issues. + /// The default value for this parameter is 'false'. /// /// - /// This is optimal for workloads where latency spikes are critical due to connection timeouts. Does not apply if is used. + /// This is optimal for latency-sensitive workloads. Does not apply if is used. /// /// #if PREVIEW @@ -775,7 +775,7 @@ internal virtual ConnectionPolicy GetConnectionPolicy(int clientId) EnablePartitionLevelFailover = this.EnablePartitionLevelFailover, PortReuseMode = this.portReuseMode, EnableTcpConnectionEndpointRediscovery = this.EnableTcpConnectionEndpointRediscovery, - EnableReplicaValidation = this.EnableAdvancedReplicaSelectionForTcp, + EnableAdvancedReplicaSelectionForTcp = this.EnableAdvancedReplicaSelectionForTcp, HttpClientFactory = this.httpClientFactory, ServerCertificateCustomValidationCallback = this.ServerCertificateCustomValidationCallback }; diff --git a/Microsoft.Azure.Cosmos/src/DocumentClient.cs b/Microsoft.Azure.Cosmos/src/DocumentClient.cs index ca423eee00..af3c3d45b2 100644 --- a/Microsoft.Azure.Cosmos/src/DocumentClient.cs +++ b/Microsoft.Azure.Cosmos/src/DocumentClient.cs @@ -6701,7 +6701,7 @@ private void CreateStoreModel(bool subscribeRntbdStatus) !this.enableRntbdChannel, this.UseMultipleWriteLocations && (this.accountServiceConfiguration.DefaultConsistencyLevel != Documents.ConsistencyLevel.Strong), true, - enableReplicaValidation: this.ConnectionPolicy.EnableReplicaValidation); + enableReplicaValidation: this.ConnectionPolicy.EnableAdvancedReplicaSelectionForTcp); if (subscribeRntbdStatus) { diff --git a/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs b/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs index 4e4118965b..2f611f49e3 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs @@ -67,7 +67,7 @@ public GlobalAddressResolver( ? GlobalAddressResolver.MaxBackupReadRegions : 0; this.enableTcpConnectionEndpointRediscovery = connectionPolicy.EnableTcpConnectionEndpointRediscovery; - this.replicaAddressValidationEnabled = connectionPolicy.EnableReplicaValidation; + this.replicaAddressValidationEnabled = connectionPolicy.EnableAdvancedReplicaSelectionForTcp; this.maxEndpoints = maxBackupReadEndpoints + 2; // for write and alternate write endpoint (during failover) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Contracts/DotNetPreviewSDKAPI.json b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Contracts/DotNetPreviewSDKAPI.json index 8546a93b2f..f717ea525d 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Contracts/DotNetPreviewSDKAPI.json +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Contracts/DotNetPreviewSDKAPI.json @@ -355,6 +355,18 @@ "Microsoft.Azure.Cosmos.CosmosClientOptions;System.Object;IsAbstract:False;IsSealed:False;IsInterface:False;IsEnum:False;IsClass:True;IsValueType:False;IsNested:False;IsGenericType:False;IsSerializable:False": { "Subclasses": {}, "Members": { + "Boolean EnableAdvancedReplicaSelectionForTcp": { + "Type": "Property", + "Attributes": [], + "MethodInfo": "Boolean EnableAdvancedReplicaSelectionForTcp;CanRead:True;CanWrite:True;Boolean get_EnableAdvancedReplicaSelectionForTcp();IsAbstract:False;IsStatic:False;IsVirtual:False;IsGenericMethod:False;IsConstructor:False;IsFinal:False;Void set_EnableAdvancedReplicaSelectionForTcp(Boolean);IsAbstract:False;IsStatic:False;IsVirtual:False;IsGenericMethod:False;IsConstructor:False;IsFinal:False;" + }, + "Boolean get_EnableAdvancedReplicaSelectionForTcp()[System.Runtime.CompilerServices.CompilerGeneratedAttribute()]": { + "Type": "Method", + "Attributes": [ + "CompilerGeneratedAttribute" + ], + "MethodInfo": "Boolean get_EnableAdvancedReplicaSelectionForTcp();IsAbstract:False;IsStatic:False;IsVirtual:False;IsGenericMethod:False;IsConstructor:False;IsFinal:False;" + }, "Boolean get_IsDistributedTracingEnabled()[System.Runtime.CompilerServices.CompilerGeneratedAttribute()]": { "Type": "Method", "Attributes": [ @@ -367,6 +379,13 @@ "Attributes": [], "MethodInfo": "Boolean IsDistributedTracingEnabled;CanRead:True;CanWrite:True;Boolean get_IsDistributedTracingEnabled();IsAbstract:False;IsStatic:False;IsVirtual:False;IsGenericMethod:False;IsConstructor:False;IsFinal:False;Void set_IsDistributedTracingEnabled(Boolean);IsAbstract:False;IsStatic:False;IsVirtual:False;IsGenericMethod:False;IsConstructor:False;IsFinal:False;" }, + "Void set_EnableAdvancedReplicaSelectionForTcp(Boolean)[System.Runtime.CompilerServices.CompilerGeneratedAttribute()]": { + "Type": "Method", + "Attributes": [ + "CompilerGeneratedAttribute" + ], + "MethodInfo": "Void set_EnableAdvancedReplicaSelectionForTcp(Boolean);IsAbstract:False;IsStatic:False;IsVirtual:False;IsGenericMethod:False;IsConstructor:False;IsFinal:False;" + }, "Void set_IsDistributedTracingEnabled(Boolean)[System.Runtime.CompilerServices.CompilerGeneratedAttribute()]": { "Type": "Method", "Attributes": [ @@ -454,6 +473,11 @@ "Microsoft.Azure.Cosmos.Fluent.CosmosClientBuilder;System.Object;IsAbstract:False;IsSealed:False;IsInterface:False;IsEnum:False;IsClass:True;IsValueType:False;IsNested:False;IsGenericType:False;IsSerializable:False": { "Subclasses": {}, "Members": { + "Microsoft.Azure.Cosmos.Fluent.CosmosClientBuilder WithAdvancedReplicaSelectionEnabledForTcp()": { + "Type": "Method", + "Attributes": [], + "MethodInfo": "Microsoft.Azure.Cosmos.Fluent.CosmosClientBuilder WithAdvancedReplicaSelectionEnabledForTcp();IsAbstract:False;IsStatic:False;IsVirtual:False;IsGenericMethod:False;IsConstructor:False;IsFinal:False;" + }, "Microsoft.Azure.Cosmos.Fluent.CosmosClientBuilder WithDistributedTracing(Boolean)": { "Type": "Method", "Attributes": [], diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientOptionsUnitTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientOptionsUnitTests.cs index e6c5ea6f92..1d20ff4a66 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientOptionsUnitTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientOptionsUnitTests.cs @@ -98,7 +98,7 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated() Assert.IsNull(policy.HttpClientFactory); Assert.AreNotEqual(Cosmos.ConsistencyLevel.Session, clientOptions.ConsistencyLevel); Assert.IsFalse(policy.EnablePartitionLevelFailover); - Assert.IsFalse(policy.EnableReplicaValidation); + Assert.IsFalse(policy.EnableAdvancedReplicaSelectionForTcp); cosmosClientBuilder.WithApplicationRegion(region) .WithConnectionModeGateway(maxConnections, webProxy) @@ -149,7 +149,7 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated() Assert.AreEqual((int)maxRetryWaitTime.TotalSeconds, policy.RetryOptions.MaxRetryWaitTimeInSeconds); Assert.AreEqual((Documents.ConsistencyLevel)consistencyLevel, clientOptions.GetDocumentsConsistencyLevel()); Assert.IsTrue(policy.EnablePartitionLevelFailover); - Assert.IsTrue(policy.EnableReplicaValidation); + Assert.IsTrue(policy.EnableAdvancedReplicaSelectionForTcp); IReadOnlyList preferredLocations = new List() { Regions.AustraliaCentral, Regions.AustraliaCentral2 }; //Verify Direct Mode settings From a0ba442772291550b34244e78c04a2d9cfd56bb5 Mon Sep 17 00:00:00 2001 From: Debdatta Kunda Date: Wed, 28 Jun 2023 20:49:12 -0700 Subject: [PATCH 6/9] Code changes to enable replica validation for preview package by default. --- .../src/ConnectionPolicy.cs | 12 ---- .../src/CosmosClientOptions.cs | 18 ------ Microsoft.Azure.Cosmos/src/DocumentClient.cs | 14 ++++- .../src/Fluent/CosmosClientBuilder.cs | 18 ------ .../src/Routing/GlobalAddressResolver.cs | 15 +++-- .../CosmosReadManyItemsTests.cs | 60 +++++-------------- .../Contracts/DotNetPreviewSDKAPI.json | 24 -------- .../CosmosClientOptionsUnitTests.cs | 9 +-- 8 files changed, 40 insertions(+), 130 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs b/Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs index bd91a96b15..7abfd76deb 100644 --- a/Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs +++ b/Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs @@ -459,18 +459,6 @@ public Func HttpClientFactory set; } - /// - /// Gets or sets the boolean flag to enable replica validation. - /// - /// - /// The default value for this parameter is false. - /// - public bool EnableAdvancedReplicaSelectionForTcp - { - get; - set; - } - /// /// (Direct/TCP) This is an advanced setting that controls the number of TCP connections that will be opened eagerly to each Cosmos DB back-end. /// diff --git a/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs b/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs index 3b6479016a..1fce195915 100644 --- a/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs +++ b/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs @@ -346,23 +346,6 @@ public ConnectionMode ConnectionMode /// public bool? EnableContentResponseOnWrite { get; set; } - /// - /// Gets or sets the advanced replica selection flag. The advanced replica selection logic keeps track of the replica connection - /// status, and based on status, it prioritizes the replicas which show healthy stable connections, so that the requests can be sent - /// confidently to the particular replica. This helps the cosmos client to become more resilient and effective to any connectivity issues. - /// The default value for this parameter is 'false'. - /// - /// - /// This is optimal for latency-sensitive workloads. Does not apply if is used. - /// - /// -#if PREVIEW - public -#else - internal -#endif - bool EnableAdvancedReplicaSelectionForTcp { get; set; } - /// /// (Direct/TCP) Controls the amount of idle time after which unused connections are closed. /// @@ -775,7 +758,6 @@ internal virtual ConnectionPolicy GetConnectionPolicy(int clientId) EnablePartitionLevelFailover = this.EnablePartitionLevelFailover, PortReuseMode = this.portReuseMode, EnableTcpConnectionEndpointRediscovery = this.EnableTcpConnectionEndpointRediscovery, - EnableAdvancedReplicaSelectionForTcp = this.EnableAdvancedReplicaSelectionForTcp, HttpClientFactory = this.httpClientFactory, ServerCertificateCustomValidationCallback = this.ServerCertificateCustomValidationCallback }; diff --git a/Microsoft.Azure.Cosmos/src/DocumentClient.cs b/Microsoft.Azure.Cosmos/src/DocumentClient.cs index af3c3d45b2..100f0d95cb 100644 --- a/Microsoft.Azure.Cosmos/src/DocumentClient.cs +++ b/Microsoft.Azure.Cosmos/src/DocumentClient.cs @@ -99,6 +99,7 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider private const string EnableCpuMonitorConfig = "CosmosDbEnableCpuMonitor"; // Env variable private const string RntbdMaxConcurrentOpeningConnectionCountConfig = "AZURE_COSMOS_TCP_MAX_CONCURRENT_OPENING_CONNECTION_COUNT"; + private const string ReplicaConnectivityValidationEnabled = "AZURE_COSMOS_REPLICA_VALIDATION_ENABLED"; private const int MaxConcurrentConnectionOpenRequestsPerProcessor = 25; private const int DefaultMaxRequestsPerRntbdChannel = 30; @@ -113,6 +114,8 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider private const string DefaultInitTaskKey = "InitTaskKey"; private readonly bool IsLocalQuorumConsistency = false; + private readonly bool isReplicaAddressValidationEnabled; + //Auth internal readonly AuthorizationTokenProvider cosmosAuthorization; @@ -231,7 +234,14 @@ public DocumentClient(Uri serviceEndpoint, this.Initialize(serviceEndpoint, connectionPolicy, desiredConsistencyLevel); this.initTaskCache = new AsyncCacheNonBlocking(cancellationToken: this.cancellationTokenSource.Token); - +#if PREVIEW + this.isReplicaAddressValidationEnabled = ConfigurationManager + .GetEnvironmentVariable( + variable: DocumentClient.ReplicaConnectivityValidationEnabled, + defaultValue: true); +#else + this.isReplicaAddressValidationEnabled = false; +#endif } /// @@ -6701,7 +6711,7 @@ private void CreateStoreModel(bool subscribeRntbdStatus) !this.enableRntbdChannel, this.UseMultipleWriteLocations && (this.accountServiceConfiguration.DefaultConsistencyLevel != Documents.ConsistencyLevel.Strong), true, - enableReplicaValidation: this.ConnectionPolicy.EnableAdvancedReplicaSelectionForTcp); + enableReplicaValidation: this.isReplicaAddressValidationEnabled); if (subscribeRntbdStatus) { diff --git a/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs b/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs index 35970bcca0..8b72bfffa4 100644 --- a/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs +++ b/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs @@ -621,24 +621,6 @@ public CosmosClientBuilder WithContentResponseOnWrite(bool contentResponseOnWrit return this; } - /// - /// Enables the advanced replica selection flag. The advanced replica selection logic keeps track of the replica connection status, - /// and based on status, it prioritizes the replicas which are connected to the backend, so that the requests can be sent - /// confidently to the particular replica. This helps the cosmos client to become more resilient and effictive to any connection - /// timeouts. The default value for this parameter is false. - /// - /// The object -#if PREVIEW - public -#else - internal -#endif - CosmosClientBuilder WithAdvancedReplicaSelectionEnabledForTcp() - { - this.clientOptions.EnableAdvancedReplicaSelectionForTcp = true; - return this; - } - /// /// The event handler to be invoked before the request is sent. /// diff --git a/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs b/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs index 2f611f49e3..cf444cf413 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs @@ -27,6 +27,7 @@ namespace Microsoft.Azure.Cosmos.Routing internal sealed class GlobalAddressResolver : IAddressResolverExtension, IDisposable { private const int MaxBackupReadRegions = 3; + private const string ReplicaConnectivityValidationEnabled = "AZURE_COSMOS_REPLICA_VALIDATION_ENABLED"; private readonly GlobalEndpointManager endpointManager; private readonly GlobalPartitionEndpointManager partitionKeyRangeLocationCache; @@ -39,7 +40,7 @@ internal sealed class GlobalAddressResolver : IAddressResolverExtension, IDispos private readonly CosmosHttpClient httpClient; private readonly ConcurrentDictionary addressCacheByEndpoint; private readonly bool enableTcpConnectionEndpointRediscovery; - private readonly bool replicaAddressValidationEnabled; + private readonly bool isReplicaAddressValidationEnabled; private IOpenConnectionsHandler openConnectionsHandler; public GlobalAddressResolver( @@ -67,8 +68,14 @@ public GlobalAddressResolver( ? GlobalAddressResolver.MaxBackupReadRegions : 0; this.enableTcpConnectionEndpointRediscovery = connectionPolicy.EnableTcpConnectionEndpointRediscovery; - this.replicaAddressValidationEnabled = connectionPolicy.EnableAdvancedReplicaSelectionForTcp; - +#if PREVIEW + this.isReplicaAddressValidationEnabled = ConfigurationManager + .GetEnvironmentVariable( + variable: GlobalAddressResolver.ReplicaConnectivityValidationEnabled, + defaultValue: true); +#else + this.isReplicaAddressValidationEnabled = false; +#endif this.maxEndpoints = maxBackupReadEndpoints + 2; // for write and alternate write endpoint (during failover) this.addressCacheByEndpoint = new ConcurrentDictionary(); @@ -284,7 +291,7 @@ private EndpointCache GetOrAddEndpoint(Uri endpoint) this.httpClient, this.openConnectionsHandler, enableTcpConnectionEndpointRediscovery: this.enableTcpConnectionEndpointRediscovery, - replicaAddressValidationEnabled: this.replicaAddressValidationEnabled); + replicaAddressValidationEnabled: this.isReplicaAddressValidationEnabled); string location = this.endpointManager.GetLocation(endpoint); AddressResolver addressResolver = new AddressResolver(null, new NullRequestSigner(), location); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosReadManyItemsTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosReadManyItemsTests.cs index 2747531a77..d215b6b18b 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosReadManyItemsTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosReadManyItemsTests.cs @@ -51,57 +51,27 @@ public async Task Cleanup() } [TestMethod] - [DataRow(true, DisplayName = "Validates Read Many scenario with advanced replica selection enabled.")] - [DataRow(false, DisplayName = "Validates Read Many scenario with advanced replica selection disabled.")] - public async Task ReadManyTypedTestWithAdvancedReplicaSelection( - bool advancedReplicaSelectionEnabled) + public async Task ReadManyTypedTest() { - CosmosClient cosmosClient = advancedReplicaSelectionEnabled - ? TestCommon.CreateCosmosClient( - customizeClientBuilder: (CosmosClientBuilder builder) => builder.WithAdvancedReplicaSelectionEnabledForTcp()) - : TestCommon.CreateCosmosClient(); - - Database database = null; - try + List<(string, PartitionKey)> itemList = new List<(string, PartitionKey)>(); + for (int i=0; i<10; i++) { - database = await cosmosClient.CreateDatabaseAsync("ReadManyTypedTestScenarioDb"); - Container container = await database.CreateContainerAsync("ReadManyTypedTestContainer", "/pk"); - - // Create items with different pk values - for (int i = 0; i < 500; i++) - { - ToDoActivity item = ToDoActivity.CreateRandomToDoActivity(); - item.pk = "pk" + i.ToString(); - item.id = i.ToString(); - ItemResponse itemResponse = await container.CreateItemAsync(item); - Assert.AreEqual(HttpStatusCode.Created, itemResponse.StatusCode); - } - - List<(string, PartitionKey)> itemList = new List<(string, PartitionKey)>(); - for (int i = 0; i < 20; i++) - { - itemList.Add((i.ToString(), new PartitionKey("pk" + i.ToString()))); - } + itemList.Add((i.ToString(), new PartitionKey("pk" + i.ToString()))); + } - FeedResponse feedResponse = await container.ReadManyItemsAsync(itemList); - Assert.IsNotNull(feedResponse); - Assert.AreEqual(20, feedResponse.Count); - Assert.IsTrue(feedResponse.Headers.RequestCharge > 0); - Assert.IsNotNull(feedResponse.Diagnostics); + FeedResponse feedResponse= await this.Container.ReadManyItemsAsync(itemList); + Assert.IsNotNull(feedResponse); + Assert.AreEqual(feedResponse.Count, 10); + Assert.IsTrue(feedResponse.Headers.RequestCharge > 0); + Assert.IsNotNull(feedResponse.Diagnostics); - int count = 0; - foreach (ToDoActivity item in feedResponse) - { - count++; - Assert.IsNotNull(item); - } - Assert.AreEqual(20, count); - } - finally + int count = 0; + foreach (ToDoActivity item in feedResponse) { - await database.DeleteAsync(); - cosmosClient.Dispose(); + count++; + Assert.IsNotNull(item); } + Assert.AreEqual(count, 10); } [TestMethod] diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Contracts/DotNetPreviewSDKAPI.json b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Contracts/DotNetPreviewSDKAPI.json index f717ea525d..8546a93b2f 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Contracts/DotNetPreviewSDKAPI.json +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Contracts/DotNetPreviewSDKAPI.json @@ -355,18 +355,6 @@ "Microsoft.Azure.Cosmos.CosmosClientOptions;System.Object;IsAbstract:False;IsSealed:False;IsInterface:False;IsEnum:False;IsClass:True;IsValueType:False;IsNested:False;IsGenericType:False;IsSerializable:False": { "Subclasses": {}, "Members": { - "Boolean EnableAdvancedReplicaSelectionForTcp": { - "Type": "Property", - "Attributes": [], - "MethodInfo": "Boolean EnableAdvancedReplicaSelectionForTcp;CanRead:True;CanWrite:True;Boolean get_EnableAdvancedReplicaSelectionForTcp();IsAbstract:False;IsStatic:False;IsVirtual:False;IsGenericMethod:False;IsConstructor:False;IsFinal:False;Void set_EnableAdvancedReplicaSelectionForTcp(Boolean);IsAbstract:False;IsStatic:False;IsVirtual:False;IsGenericMethod:False;IsConstructor:False;IsFinal:False;" - }, - "Boolean get_EnableAdvancedReplicaSelectionForTcp()[System.Runtime.CompilerServices.CompilerGeneratedAttribute()]": { - "Type": "Method", - "Attributes": [ - "CompilerGeneratedAttribute" - ], - "MethodInfo": "Boolean get_EnableAdvancedReplicaSelectionForTcp();IsAbstract:False;IsStatic:False;IsVirtual:False;IsGenericMethod:False;IsConstructor:False;IsFinal:False;" - }, "Boolean get_IsDistributedTracingEnabled()[System.Runtime.CompilerServices.CompilerGeneratedAttribute()]": { "Type": "Method", "Attributes": [ @@ -379,13 +367,6 @@ "Attributes": [], "MethodInfo": "Boolean IsDistributedTracingEnabled;CanRead:True;CanWrite:True;Boolean get_IsDistributedTracingEnabled();IsAbstract:False;IsStatic:False;IsVirtual:False;IsGenericMethod:False;IsConstructor:False;IsFinal:False;Void set_IsDistributedTracingEnabled(Boolean);IsAbstract:False;IsStatic:False;IsVirtual:False;IsGenericMethod:False;IsConstructor:False;IsFinal:False;" }, - "Void set_EnableAdvancedReplicaSelectionForTcp(Boolean)[System.Runtime.CompilerServices.CompilerGeneratedAttribute()]": { - "Type": "Method", - "Attributes": [ - "CompilerGeneratedAttribute" - ], - "MethodInfo": "Void set_EnableAdvancedReplicaSelectionForTcp(Boolean);IsAbstract:False;IsStatic:False;IsVirtual:False;IsGenericMethod:False;IsConstructor:False;IsFinal:False;" - }, "Void set_IsDistributedTracingEnabled(Boolean)[System.Runtime.CompilerServices.CompilerGeneratedAttribute()]": { "Type": "Method", "Attributes": [ @@ -473,11 +454,6 @@ "Microsoft.Azure.Cosmos.Fluent.CosmosClientBuilder;System.Object;IsAbstract:False;IsSealed:False;IsInterface:False;IsEnum:False;IsClass:True;IsValueType:False;IsNested:False;IsGenericType:False;IsSerializable:False": { "Subclasses": {}, "Members": { - "Microsoft.Azure.Cosmos.Fluent.CosmosClientBuilder WithAdvancedReplicaSelectionEnabledForTcp()": { - "Type": "Method", - "Attributes": [], - "MethodInfo": "Microsoft.Azure.Cosmos.Fluent.CosmosClientBuilder WithAdvancedReplicaSelectionEnabledForTcp();IsAbstract:False;IsStatic:False;IsVirtual:False;IsGenericMethod:False;IsConstructor:False;IsFinal:False;" - }, "Microsoft.Azure.Cosmos.Fluent.CosmosClientBuilder WithDistributedTracing(Boolean)": { "Type": "Method", "Attributes": [], diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientOptionsUnitTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientOptionsUnitTests.cs index 1d20ff4a66..efb06d7cc5 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientOptionsUnitTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientOptionsUnitTests.cs @@ -81,7 +81,6 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated() Assert.IsNull(clientOptions.HttpClientFactory); Assert.AreNotEqual(consistencyLevel, clientOptions.ConsistencyLevel); Assert.IsFalse(clientOptions.EnablePartitionLevelFailover); - Assert.IsFalse(clientOptions.EnableAdvancedReplicaSelectionForTcp); //Verify GetConnectionPolicy returns the correct values for default ConnectionPolicy policy = clientOptions.GetConnectionPolicy(clientId: 0); @@ -98,7 +97,6 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated() Assert.IsNull(policy.HttpClientFactory); Assert.AreNotEqual(Cosmos.ConsistencyLevel.Session, clientOptions.ConsistencyLevel); Assert.IsFalse(policy.EnablePartitionLevelFailover); - Assert.IsFalse(policy.EnableAdvancedReplicaSelectionForTcp); cosmosClientBuilder.WithApplicationRegion(region) .WithConnectionModeGateway(maxConnections, webProxy) @@ -110,8 +108,7 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated() .WithBulkExecution(true) .WithSerializerOptions(cosmosSerializerOptions) .WithConsistencyLevel(consistencyLevel) - .WithPartitionLevelFailoverEnabled() - .WithAdvancedReplicaSelectionEnabledForTcp(); + .WithPartitionLevelFailoverEnabled(); cosmosClient = cosmosClientBuilder.Build(new MockDocumentClient()); clientOptions = cosmosClient.ClientOptions; @@ -134,7 +131,6 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated() Assert.IsTrue(clientOptions.AllowBulkExecution); Assert.AreEqual(consistencyLevel, clientOptions.ConsistencyLevel); Assert.IsTrue(clientOptions.EnablePartitionLevelFailover); - Assert.IsTrue(clientOptions.EnableAdvancedReplicaSelectionForTcp); //Verify GetConnectionPolicy returns the correct values policy = clientOptions.GetConnectionPolicy(clientId: 0); @@ -149,8 +145,7 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated() Assert.AreEqual((int)maxRetryWaitTime.TotalSeconds, policy.RetryOptions.MaxRetryWaitTimeInSeconds); Assert.AreEqual((Documents.ConsistencyLevel)consistencyLevel, clientOptions.GetDocumentsConsistencyLevel()); Assert.IsTrue(policy.EnablePartitionLevelFailover); - Assert.IsTrue(policy.EnableAdvancedReplicaSelectionForTcp); - + IReadOnlyList preferredLocations = new List() { Regions.AustraliaCentral, Regions.AustraliaCentral2 }; //Verify Direct Mode settings cosmosClientBuilder = new CosmosClientBuilder( From a04e7cbaa4644fa98d09b531959d70bbd3418129 Mon Sep 17 00:00:00 2001 From: Debdatta Kunda Date: Thu, 29 Jun 2023 11:31:32 -0700 Subject: [PATCH 7/9] Code changes to address review comments. --- Microsoft.Azure.Cosmos/src/DocumentClient.cs | 10 +------ .../src/Routing/GatewayAddressCache.cs | 5 ++-- .../src/Routing/GlobalAddressResolver.cs | 13 ++------ .../src/Util/ConfigurationManager.cs | 30 ++++++++++++++++++- .../GatewayAddressCacheTests.cs | 30 +++---------------- 5 files changed, 40 insertions(+), 48 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/DocumentClient.cs b/Microsoft.Azure.Cosmos/src/DocumentClient.cs index 100f0d95cb..752ec8f5a5 100644 --- a/Microsoft.Azure.Cosmos/src/DocumentClient.cs +++ b/Microsoft.Azure.Cosmos/src/DocumentClient.cs @@ -99,7 +99,6 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider private const string EnableCpuMonitorConfig = "CosmosDbEnableCpuMonitor"; // Env variable private const string RntbdMaxConcurrentOpeningConnectionCountConfig = "AZURE_COSMOS_TCP_MAX_CONCURRENT_OPENING_CONNECTION_COUNT"; - private const string ReplicaConnectivityValidationEnabled = "AZURE_COSMOS_REPLICA_VALIDATION_ENABLED"; private const int MaxConcurrentConnectionOpenRequestsPerProcessor = 25; private const int DefaultMaxRequestsPerRntbdChannel = 30; @@ -234,14 +233,7 @@ public DocumentClient(Uri serviceEndpoint, this.Initialize(serviceEndpoint, connectionPolicy, desiredConsistencyLevel); this.initTaskCache = new AsyncCacheNonBlocking(cancellationToken: this.cancellationTokenSource.Token); -#if PREVIEW - this.isReplicaAddressValidationEnabled = ConfigurationManager - .GetEnvironmentVariable( - variable: DocumentClient.ReplicaConnectivityValidationEnabled, - defaultValue: true); -#else - this.isReplicaAddressValidationEnabled = false; -#endif + this.isReplicaAddressValidationEnabled = ConfigurationManager.IsReplicaAddressValidationEnabled(); } /// diff --git a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs index f47f1496b9..a4dbb1d8a3 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs @@ -143,7 +143,6 @@ public async Task OpenConnectionsAsync( Paths.CollectionsPathSegment, Uri.EscapeUriString(collection.Id)); - using CancellationTokenSource linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); using (DocumentServiceRequest request = DocumentServiceRequest.CreateFromName( OperationType.Read, collectionAltLink, @@ -159,10 +158,12 @@ public async Task OpenConnectionsAsync( partitionKeyRangeIds: partitionKeyRangeIdentities.Skip(i).Take(batchSize).Select(range => range.PartitionKeyRangeId), containerProperties: collection, shouldOpenRntbdChannels: shouldOpenRntbdChannels, - cancellationToken: linkedTokenSource.Token)); + cancellationToken: cancellationToken)); } } + using CancellationTokenSource linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + // The `timeoutTask` is a background task which adds a delay for a period of WarmupCacheAndOpenConnectionTimeout. The task will // be cancelled either by - a) when `linkedTokenSource` expires, which means the original `cancellationToken` expires or // b) the the `linkedTokenSource.Cancel()` is called. diff --git a/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs b/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs index cf444cf413..8ac0719dcd 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs @@ -27,8 +27,6 @@ namespace Microsoft.Azure.Cosmos.Routing internal sealed class GlobalAddressResolver : IAddressResolverExtension, IDisposable { private const int MaxBackupReadRegions = 3; - private const string ReplicaConnectivityValidationEnabled = "AZURE_COSMOS_REPLICA_VALIDATION_ENABLED"; - private readonly GlobalEndpointManager endpointManager; private readonly GlobalPartitionEndpointManager partitionKeyRangeLocationCache; private readonly Protocol protocol; @@ -68,14 +66,9 @@ public GlobalAddressResolver( ? GlobalAddressResolver.MaxBackupReadRegions : 0; this.enableTcpConnectionEndpointRediscovery = connectionPolicy.EnableTcpConnectionEndpointRediscovery; -#if PREVIEW - this.isReplicaAddressValidationEnabled = ConfigurationManager - .GetEnvironmentVariable( - variable: GlobalAddressResolver.ReplicaConnectivityValidationEnabled, - defaultValue: true); -#else - this.isReplicaAddressValidationEnabled = false; -#endif + + this.isReplicaAddressValidationEnabled = ConfigurationManager.IsReplicaAddressValidationEnabled(); + this.maxEndpoints = maxBackupReadEndpoints + 2; // for write and alternate write endpoint (during failover) this.addressCacheByEndpoint = new ConcurrentDictionary(); diff --git a/Microsoft.Azure.Cosmos/src/Util/ConfigurationManager.cs b/Microsoft.Azure.Cosmos/src/Util/ConfigurationManager.cs index 4e9e8fde84..6821af655c 100644 --- a/Microsoft.Azure.Cosmos/src/Util/ConfigurationManager.cs +++ b/Microsoft.Azure.Cosmos/src/Util/ConfigurationManager.cs @@ -7,7 +7,14 @@ namespace Microsoft.Azure.Cosmos using System; internal static class ConfigurationManager - { + { + /// + /// A read-only string containing the environment variablename for enabling replica validation. + /// This will eventually be removed oncereplica valdiatin is enabled by default for both preview + /// and GA. + /// + private static readonly string ReplicaConnectivityValidationEnabled = "AZURE_COSMOS_REPLICA_VALIDATION_ENABLED"; + public static T GetEnvironmentVariable(string variable, T defaultValue) { string value = Environment.GetEnvironmentVariable(variable); @@ -17,5 +24,26 @@ public static T GetEnvironmentVariable(string variable, T defaultValue) } return (T)Convert.ChangeType(value, typeof(T)); } + + /// + /// Gets the boolean value of the replica validation environment variable. Note that, replica validation + /// is enabled by default for the preview package and disabled for GA at the moment. The user can set the + /// respective environment variable 'AZURE_COSMOS_REPLICA_VALIDATION_ENABLED' to override the value for + /// both preview and GA. The method will eventually be removed, once replica valdiatin is enabled by default + /// for both preview and GA. + /// + /// A boolean flag indicating if replica validation is enabled. + public static bool IsReplicaAddressValidationEnabled() + { + bool replicaValidationDefaultValue = false; +#if PREVIEW + replicaValidationDefaultValue = true; +#endif + + return ConfigurationManager + .GetEnvironmentVariable( + variable: ConfigurationManager.ReplicaConnectivityValidationEnabled, + defaultValue: replicaValidationDefaultValue); + } } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs index 296c131fbf..c855599465 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs @@ -1000,19 +1000,8 @@ public async Task TryGetAddressesAsync_WhenReplicaVlidationEnabled_ShouldValidat MockCosmosUtil.CreateCosmosHttpClient(() => httpClient), openConnectionsHandler: fakeOpenConnectionHandler, suboptimalPartitionForceRefreshIntervalInSeconds: 2, - enableTcpConnectionEndpointRediscovery: true); - - // By default, the replica validation feature is disabled in GatewayAddressCache. Reflection is used to enable the feature - // for the purpose of this test. - FieldInfo fieldInfo = cache - .GetType() - .GetField( - name: "isReplicaAddressValidationEnabled", - bindingAttr: BindingFlags.Instance | BindingFlags.NonPublic); - - fieldInfo.SetValue( - obj: cache, - value: true); + enableTcpConnectionEndpointRediscovery: true, + replicaAddressValidationEnabled: true); DocumentServiceRequest request = DocumentServiceRequest.Create(OperationType.Invalid, ResourceType.Address, AuthorizationTokenType.Invalid); @@ -1156,19 +1145,8 @@ public async Task TryGetAddressesAsync_WhenReplicaVlidationEnabledAndUnhealthyUr MockCosmosUtil.CreateCosmosHttpClient(() => httpClient), openConnectionsHandler: fakeOpenConnectionHandler, suboptimalPartitionForceRefreshIntervalInSeconds: 2, - enableTcpConnectionEndpointRediscovery: true); - - // By default, the replica validation feature is disabled in GatewayAddressCache. Reflection is used to enable the feature - // for the purpose of this test. - FieldInfo fieldInfo = cache - .GetType() - .GetField( - name: "isReplicaAddressValidationEnabled", - bindingAttr: BindingFlags.Instance | BindingFlags.NonPublic); - - fieldInfo.SetValue( - obj: cache, - value: true); + enableTcpConnectionEndpointRediscovery: true, + replicaAddressValidationEnabled: true); DocumentServiceRequest request = DocumentServiceRequest.Create(OperationType.Invalid, ResourceType.Address, AuthorizationTokenType.Invalid); From 94ba2005e61ebb0d34091084f0dcf94e47294171 Mon Sep 17 00:00:00 2001 From: Debdatta Kunda Date: Thu, 29 Jun 2023 15:04:39 -0700 Subject: [PATCH 8/9] Code changes to fix preview unit tests. --- .../src/Util/ConfigurationManager.cs | 2 +- .../CosmosBadReplicaTests.cs | 20 +++++++++++++++++-- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/Util/ConfigurationManager.cs b/Microsoft.Azure.Cosmos/src/Util/ConfigurationManager.cs index 6821af655c..ed0f5a47a6 100644 --- a/Microsoft.Azure.Cosmos/src/Util/ConfigurationManager.cs +++ b/Microsoft.Azure.Cosmos/src/Util/ConfigurationManager.cs @@ -13,7 +13,7 @@ internal static class ConfigurationManager /// This will eventually be removed oncereplica valdiatin is enabled by default for both preview /// and GA. /// - private static readonly string ReplicaConnectivityValidationEnabled = "AZURE_COSMOS_REPLICA_VALIDATION_ENABLED"; + internal static readonly string ReplicaConnectivityValidationEnabled = "AZURE_COSMOS_REPLICA_VALIDATION_ENABLED"; public static T GetEnvironmentVariable(string variable, T defaultValue) { diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosBadReplicaTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosBadReplicaTests.cs index 3ef86568b8..7bd64591df 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosBadReplicaTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosBadReplicaTests.cs @@ -25,8 +25,15 @@ public class CosmosBadReplicaTests { [TestMethod] [Timeout(30000)] - public async Task TestGoneFromServiceScenarioAsync() + [DataRow(true, DisplayName = "Validate when replica validation is enabled.")] + [DataRow(false, DisplayName = "Validate when replica validation is disabled.")] + public async Task TestGoneFromServiceScenarioAsync( + bool enableReplicaValidation) { + Environment.SetEnvironmentVariable( + variable: ConfigurationManager.ReplicaConnectivityValidationEnabled, + value: enableReplicaValidation.ToString()); + Mock mockHttpHandler = new Mock(MockBehavior.Strict); Uri endpoint = MockSetupsHelper.SetupSingleRegionAccount( "mockAccountInfo", @@ -160,7 +167,16 @@ public async Task TestGoneFromServiceScenarioAsync() mockHttpHandler.VerifyAll(); Documents.TransportAddressUri failedReplica = urisVisited.First(); - Assert.AreEqual(1, urisVisited.Count(x => x.Equals(failedReplica))); + + // With replica validation enabled in preview mode, the failed replica will be validated as a part of the flow, + // and because any subsequent validation/ connection will be successful, the failed replica will now be marked + // as connected, thus it will be visited more than once. However, note that when replice validation is disabled, + // no validation is done thus the URI will be marked as unhealthy as expected. Therefore the uri will be visited + // just once. + Assert.IsTrue( + enableReplicaValidation + ? urisVisited.Any(x => x.Equals(failedReplica)) + : urisVisited.Count(x => x.Equals(failedReplica)) == 1); urisVisited.Clear(); delayCacheRefresh = false; From ad7e8f599e947065f535dfa0c3d7b0746311ad65 Mon Sep 17 00:00:00 2001 From: Debdatta Kunda Date: Thu, 29 Jun 2023 15:27:31 -0700 Subject: [PATCH 9/9] Code changes to disable environment variable at the end of the test. --- .../CosmosBadReplicaTests.cs | 299 +++++++++--------- 1 file changed, 152 insertions(+), 147 deletions(-) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosBadReplicaTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosBadReplicaTests.cs index 7bd64591df..7a5b387e8e 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosBadReplicaTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosBadReplicaTests.cs @@ -6,7 +6,6 @@ namespace Microsoft.Azure.Cosmos.Tests { using System; using System.Collections.Generic; - using System.Globalization; using System.IO; using System.Linq; using System.Net; @@ -14,11 +13,8 @@ namespace Microsoft.Azure.Cosmos.Tests using System.Text; using System.Threading; using System.Threading.Tasks; - using global::Azure.Core; - using Microsoft.Azure.Cosmos.Fluent; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; - using Newtonsoft.Json.Linq; [TestClass] public class CosmosBadReplicaTests @@ -30,35 +26,37 @@ public class CosmosBadReplicaTests public async Task TestGoneFromServiceScenarioAsync( bool enableReplicaValidation) { - Environment.SetEnvironmentVariable( - variable: ConfigurationManager.ReplicaConnectivityValidationEnabled, - value: enableReplicaValidation.ToString()); - - Mock mockHttpHandler = new Mock(MockBehavior.Strict); - Uri endpoint = MockSetupsHelper.SetupSingleRegionAccount( - "mockAccountInfo", - consistencyLevel: ConsistencyLevel.Session, - mockHttpHandler, - out string primaryRegionEndpoint); - - string databaseName = "mockDbName"; - string containerName = "mockContainerName"; - string containerRid = "ccZ1ANCszwk="; - Documents.ResourceId cRid = Documents.ResourceId.Parse(containerRid); - MockSetupsHelper.SetupContainerProperties( - mockHttpHandler: mockHttpHandler, - regionEndpoint: primaryRegionEndpoint, - databaseName: databaseName, - containerName: containerName, - containerRid: containerRid); - - MockSetupsHelper.SetupSinglePartitionKeyRange( - mockHttpHandler, - primaryRegionEndpoint, - cRid, - out IReadOnlyList partitionKeyRanges); - - List replicaIds1 = new List() + try + { + Environment.SetEnvironmentVariable( + variable: ConfigurationManager.ReplicaConnectivityValidationEnabled, + value: enableReplicaValidation.ToString()); + + Mock mockHttpHandler = new Mock(MockBehavior.Strict); + Uri endpoint = MockSetupsHelper.SetupSingleRegionAccount( + "mockAccountInfo", + consistencyLevel: ConsistencyLevel.Session, + mockHttpHandler, + out string primaryRegionEndpoint); + + string databaseName = "mockDbName"; + string containerName = "mockContainerName"; + string containerRid = "ccZ1ANCszwk="; + Documents.ResourceId cRid = Documents.ResourceId.Parse(containerRid); + MockSetupsHelper.SetupContainerProperties( + mockHttpHandler: mockHttpHandler, + regionEndpoint: primaryRegionEndpoint, + databaseName: databaseName, + containerName: containerName, + containerRid: containerRid); + + MockSetupsHelper.SetupSinglePartitionKeyRange( + mockHttpHandler, + primaryRegionEndpoint, + cRid, + out IReadOnlyList partitionKeyRanges); + + List replicaIds1 = new List() { "11111111111111111", "22222222222222222", @@ -66,14 +64,14 @@ public async Task TestGoneFromServiceScenarioAsync( "44444444444444444", }; - HttpResponseMessage replicaSet1 = MockSetupsHelper.CreateAddresses( - replicaIds1, - partitionKeyRanges.First(), - "eastus", - cRid); + HttpResponseMessage replicaSet1 = MockSetupsHelper.CreateAddresses( + replicaIds1, + partitionKeyRanges.First(), + "eastus", + cRid); - // One replica changed on the refresh - List replicaIds2 = new List() + // One replica changed on the refresh + List replicaIds2 = new List() { "11111111111111111", "22222222222222222", @@ -81,127 +79,134 @@ public async Task TestGoneFromServiceScenarioAsync( "55555555555555555", }; - HttpResponseMessage replicaSet2 = MockSetupsHelper.CreateAddresses( - replicaIds2, - partitionKeyRanges.First(), - "eastus", - cRid); - - bool delayCacheRefresh = true; - bool delayRefreshUnblocked = false; - mockHttpHandler.SetupSequence(x => x.SendAsync( - It.Is(r => r.RequestUri.ToString().Contains("addresses")), It.IsAny())) - .Returns(Task.FromResult(replicaSet1)) - .Returns(async ()=> + HttpResponseMessage replicaSet2 = MockSetupsHelper.CreateAddresses( + replicaIds2, + partitionKeyRanges.First(), + "eastus", + cRid); + + bool delayCacheRefresh = true; + bool delayRefreshUnblocked = false; + mockHttpHandler.SetupSequence(x => x.SendAsync( + It.Is(r => r.RequestUri.ToString().Contains("addresses")), It.IsAny())) + .Returns(Task.FromResult(replicaSet1)) + .Returns(async ()=> + { + //block cache refresh to verify bad replica is not visited during refresh + while (delayCacheRefresh) + { + await Task.Delay(TimeSpan.FromMilliseconds(20)); + } + + delayRefreshUnblocked = true; + return replicaSet2; + }); + + int callBack = 0; + List urisVisited = new List(); + Mock mockTransportClient = new Mock(MockBehavior.Strict); + mockTransportClient.Setup(x => x.InvokeResourceOperationAsync(It.IsAny(), It.IsAny())) + .Callback((t, _) => urisVisited.Add(t)) + .Returns(() => { - //block cache refresh to verify bad replica is not visited during refresh - while (delayCacheRefresh) + callBack++; + if (callBack == 1) { - await Task.Delay(TimeSpan.FromMilliseconds(20)); + throw Documents.Rntbd.TransportExceptions.GetGoneException( + new Uri("https://localhost:8081"), + Guid.NewGuid(), + new Documents.TransportException(Documents.TransportErrorCode.ConnectionBroken, + null, + Guid.NewGuid(), + new Uri("https://localhost:8081"), + "Mock", + userPayload: true, + payloadSent: false)); } - delayRefreshUnblocked = true; - return replicaSet2; - }); - - int callBack = 0; - List urisVisited = new List(); - Mock mockTransportClient = new Mock(MockBehavior.Strict); - mockTransportClient.Setup(x => x.InvokeResourceOperationAsync(It.IsAny(), It.IsAny())) - .Callback((t, _) => urisVisited.Add(t)) - .Returns(() => - { - callBack++; - if (callBack == 1) - { - throw Documents.Rntbd.TransportExceptions.GetGoneException( - new Uri("https://localhost:8081"), - Guid.NewGuid(), - new Documents.TransportException(Documents.TransportErrorCode.ConnectionBroken, - null, - Guid.NewGuid(), - new Uri("https://localhost:8081"), - "Mock", - userPayload: true, - payloadSent: false)); - } - - return Task.FromResult(new Documents.StoreResponse() - { - Status = 200, - Headers = new Documents.Collections.StoreResponseNameValueCollection() + return Task.FromResult(new Documents.StoreResponse() { - ActivityId = Guid.NewGuid().ToString(), - LSN = "12345", - PartitionKeyRangeId = "0", - GlobalCommittedLSN = "12345", - SessionToken = "1#12345#1=12345" - }, - ResponseBody = new MemoryStream() + Status = 200, + Headers = new Documents.Collections.StoreResponseNameValueCollection() + { + ActivityId = Guid.NewGuid().ToString(), + LSN = "12345", + PartitionKeyRangeId = "0", + GlobalCommittedLSN = "12345", + SessionToken = "1#12345#1=12345" + }, + ResponseBody = new MemoryStream() + }); }); - }); - CosmosClientOptions cosmosClientOptions = new CosmosClientOptions() - { - ConsistencyLevel = Cosmos.ConsistencyLevel.Session, - HttpClientFactory = () => new HttpClient(new HttpHandlerHelper(mockHttpHandler.Object)), - TransportClientHandlerFactory = (original) => mockTransportClient.Object, - }; - - using (CosmosClient customClient = new CosmosClient( - endpoint.ToString(), - Convert.ToBase64String(Encoding.UTF8.GetBytes(Guid.NewGuid().ToString())), - cosmosClientOptions)) - { - try + CosmosClientOptions cosmosClientOptions = new CosmosClientOptions() { - Container container = customClient.GetContainer(databaseName, containerName); - - for (int i = 0; i < 20; i++) + ConsistencyLevel = Cosmos.ConsistencyLevel.Session, + HttpClientFactory = () => new HttpClient(new HttpHandlerHelper(mockHttpHandler.Object)), + TransportClientHandlerFactory = (original) => mockTransportClient.Object, + }; + + using (CosmosClient customClient = new CosmosClient( + endpoint.ToString(), + Convert.ToBase64String(Encoding.UTF8.GetBytes(Guid.NewGuid().ToString())), + cosmosClientOptions)) + { + try { - ResponseMessage response = await container.ReadItemStreamAsync(Guid.NewGuid().ToString(), new Cosmos.PartitionKey(Guid.NewGuid().ToString())); - Assert.AreEqual(HttpStatusCode.OK, response.StatusCode); + Container container = customClient.GetContainer(databaseName, containerName); + + for (int i = 0; i < 20; i++) + { + ResponseMessage response = await container.ReadItemStreamAsync(Guid.NewGuid().ToString(), new Cosmos.PartitionKey(Guid.NewGuid().ToString())); + Assert.AreEqual(HttpStatusCode.OK, response.StatusCode); + } + + mockTransportClient.VerifyAll(); + mockHttpHandler.VerifyAll(); + + Documents.TransportAddressUri failedReplica = urisVisited.First(); + + // With replica validation enabled in preview mode, the failed replica will be validated as a part of the flow, + // and because any subsequent validation/ connection will be successful, the failed replica will now be marked + // as connected, thus it will be visited more than once. However, note that when replice validation is disabled, + // no validation is done thus the URI will be marked as unhealthy as expected. Therefore the uri will be visited + // just once. + Assert.IsTrue( + enableReplicaValidation + ? urisVisited.Any(x => x.Equals(failedReplica)) + : urisVisited.Count(x => x.Equals(failedReplica)) == 1); + + urisVisited.Clear(); + delayCacheRefresh = false; + do + { + await Task.Delay(TimeSpan.FromMilliseconds(100)); + }while (!delayRefreshUnblocked); + + for (int i = 0; i < 20; i++) + { + ResponseMessage response = await container.ReadItemStreamAsync(Guid.NewGuid().ToString(), new Cosmos.PartitionKey(Guid.NewGuid().ToString())); + Assert.AreEqual(HttpStatusCode.OK, response.StatusCode); + } + + Assert.AreEqual(4, urisVisited.ToHashSet().Count()); + + // Clears all the setups. No network calls should be done on the next operation. + mockHttpHandler.Reset(); + mockTransportClient.Reset(); } - - mockTransportClient.VerifyAll(); - mockHttpHandler.VerifyAll(); - - Documents.TransportAddressUri failedReplica = urisVisited.First(); - - // With replica validation enabled in preview mode, the failed replica will be validated as a part of the flow, - // and because any subsequent validation/ connection will be successful, the failed replica will now be marked - // as connected, thus it will be visited more than once. However, note that when replice validation is disabled, - // no validation is done thus the URI will be marked as unhealthy as expected. Therefore the uri will be visited - // just once. - Assert.IsTrue( - enableReplicaValidation - ? urisVisited.Any(x => x.Equals(failedReplica)) - : urisVisited.Count(x => x.Equals(failedReplica)) == 1); - - urisVisited.Clear(); - delayCacheRefresh = false; - do - { - await Task.Delay(TimeSpan.FromMilliseconds(100)); - }while (!delayRefreshUnblocked); - - for (int i = 0; i < 20; i++) + finally { - ResponseMessage response = await container.ReadItemStreamAsync(Guid.NewGuid().ToString(), new Cosmos.PartitionKey(Guid.NewGuid().ToString())); - Assert.AreEqual(HttpStatusCode.OK, response.StatusCode); + mockTransportClient.Setup(x => x.Dispose()); } - - Assert.AreEqual(4, urisVisited.ToHashSet().Count()); - - // Clears all the setups. No network calls should be done on the next operation. - mockHttpHandler.Reset(); - mockTransportClient.Reset(); - } - finally - { - mockTransportClient.Setup(x => x.Dispose()); } } + finally + { + Environment.SetEnvironmentVariable( + variable: ConfigurationManager.ReplicaConnectivityValidationEnabled, + value: null); + } } } }