diff --git a/Directory.Build.props b/Directory.Build.props index 55a67ffec4..71b854a5c7 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -3,7 +3,7 @@ 3.35.1 3.35.1 preview - 3.31.2 + 3.31.3 2.0.2 2.0.2 preview diff --git a/Microsoft.Azure.Cosmos/src/DocumentClient.cs b/Microsoft.Azure.Cosmos/src/DocumentClient.cs index 068bb5b857..752ec8f5a5 100644 --- a/Microsoft.Azure.Cosmos/src/DocumentClient.cs +++ b/Microsoft.Azure.Cosmos/src/DocumentClient.cs @@ -113,6 +113,8 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider private const string DefaultInitTaskKey = "InitTaskKey"; private readonly bool IsLocalQuorumConsistency = false; + private readonly bool isReplicaAddressValidationEnabled; + //Auth internal readonly AuthorizationTokenProvider cosmosAuthorization; @@ -231,7 +233,7 @@ public DocumentClient(Uri serviceEndpoint, this.Initialize(serviceEndpoint, connectionPolicy, desiredConsistencyLevel); this.initTaskCache = new AsyncCacheNonBlocking(cancellationToken: this.cancellationTokenSource.Token); - + this.isReplicaAddressValidationEnabled = ConfigurationManager.IsReplicaAddressValidationEnabled(); } /// @@ -6700,7 +6702,8 @@ private void CreateStoreModel(bool subscribeRntbdStatus) this.ConnectionPolicy.EnableReadRequestsFallback ?? (this.accountServiceConfiguration.DefaultConsistencyLevel != Documents.ConsistencyLevel.BoundedStaleness), !this.enableRntbdChannel, this.UseMultipleWriteLocations && (this.accountServiceConfiguration.DefaultConsistencyLevel != Documents.ConsistencyLevel.Strong), - true); + true, + enableReplicaValidation: this.isReplicaAddressValidationEnabled); if (subscribeRntbdStatus) { diff --git a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs index f864256df9..a4dbb1d8a3 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs @@ -66,7 +66,8 @@ public GatewayAddressCache( CosmosHttpClient httpClient, IOpenConnectionsHandler openConnectionsHandler, long suboptimalPartitionForceRefreshIntervalInSeconds = 600, - bool enableTcpConnectionEndpointRediscovery = false) + bool enableTcpConnectionEndpointRediscovery = false, + bool replicaAddressValidationEnabled = false) { this.addressEndpoint = new Uri(serviceEndpoint + "/" + Paths.AddressPathSegment); this.protocol = protocol; @@ -90,9 +91,7 @@ public GatewayAddressCache( GatewayAddressCache.ProtocolString(this.protocol)); this.openConnectionsHandler = openConnectionsHandler; - this.isReplicaAddressValidationEnabled = Helpers.GetEnvironmentVariable( - name: Constants.EnvironmentVariables.ReplicaConnectivityValidationEnabled, - defaultValue: false); + this.isReplicaAddressValidationEnabled = replicaAddressValidationEnabled; } public Uri ServiceEndpoint => this.serviceEndpoint; @@ -158,7 +157,8 @@ public async Task OpenConnectionsAsync( collectionRid: collection.ResourceId, partitionKeyRangeIds: partitionKeyRangeIdentities.Skip(i).Take(batchSize).Select(range => range.PartitionKeyRangeId), containerProperties: collection, - shouldOpenRntbdChannels: shouldOpenRntbdChannels)); + shouldOpenRntbdChannels: shouldOpenRntbdChannels, + cancellationToken: cancellationToken)); } } @@ -348,12 +348,14 @@ public async Task TryGetAddressesAsync( /// An instance of containing the list of partition key range ids. /// An instance of containing the collection properties. /// A boolean flag indicating whether Rntbd connections are required to be established to the backend replica nodes. + /// An instance of . private async Task WarmupCachesAndOpenConnectionsAsync( DocumentServiceRequest request, string collectionRid, IEnumerable partitionKeyRangeIds, ContainerProperties containerProperties, - bool shouldOpenRntbdChannels) + bool shouldOpenRntbdChannels, + CancellationToken cancellationToken) { TryCatch documentServiceResponseWrapper = await this.GetAddressesAsync( request: request, @@ -381,6 +383,11 @@ private async Task WarmupCachesAndOpenConnectionsAsync( List openConnectionTasks = new (); foreach (Tuple addressInfo in addressInfos) { + if (cancellationToken.IsCancellationRequested) + { + break; + } + this.serverPartitionAddressCache.Set( new PartitionKeyRangeIdentity(containerProperties.ResourceId, addressInfo.Item1.PartitionKeyRangeId), addressInfo.Item2); @@ -398,10 +405,7 @@ private async Task WarmupCachesAndOpenConnectionsAsync( } } - if (openConnectionTasks.Any()) - { - await Task.WhenAll(openConnectionTasks); - } + await Task.WhenAll(openConnectionTasks); } } catch (Exception ex) diff --git a/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs b/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs index 344f994395..8ac0719dcd 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs @@ -27,7 +27,6 @@ namespace Microsoft.Azure.Cosmos.Routing internal sealed class GlobalAddressResolver : IAddressResolverExtension, IDisposable { private const int MaxBackupReadRegions = 3; - private readonly GlobalEndpointManager endpointManager; private readonly GlobalPartitionEndpointManager partitionKeyRangeLocationCache; private readonly Protocol protocol; @@ -39,6 +38,7 @@ internal sealed class GlobalAddressResolver : IAddressResolverExtension, IDispos private readonly CosmosHttpClient httpClient; private readonly ConcurrentDictionary addressCacheByEndpoint; private readonly bool enableTcpConnectionEndpointRediscovery; + private readonly bool isReplicaAddressValidationEnabled; private IOpenConnectionsHandler openConnectionsHandler; public GlobalAddressResolver( @@ -67,6 +67,8 @@ public GlobalAddressResolver( this.enableTcpConnectionEndpointRediscovery = connectionPolicy.EnableTcpConnectionEndpointRediscovery; + this.isReplicaAddressValidationEnabled = ConfigurationManager.IsReplicaAddressValidationEnabled(); + this.maxEndpoints = maxBackupReadEndpoints + 2; // for write and alternate write endpoint (during failover) this.addressCacheByEndpoint = new ConcurrentDictionary(); @@ -281,7 +283,8 @@ private EndpointCache GetOrAddEndpoint(Uri endpoint) this.serviceConfigReader, this.httpClient, this.openConnectionsHandler, - enableTcpConnectionEndpointRediscovery: this.enableTcpConnectionEndpointRediscovery); + enableTcpConnectionEndpointRediscovery: this.enableTcpConnectionEndpointRediscovery, + replicaAddressValidationEnabled: this.isReplicaAddressValidationEnabled); string location = this.endpointManager.GetLocation(endpoint); AddressResolver addressResolver = new AddressResolver(null, new NullRequestSigner(), location); diff --git a/Microsoft.Azure.Cosmos/src/Util/ConfigurationManager.cs b/Microsoft.Azure.Cosmos/src/Util/ConfigurationManager.cs index 4e9e8fde84..ed0f5a47a6 100644 --- a/Microsoft.Azure.Cosmos/src/Util/ConfigurationManager.cs +++ b/Microsoft.Azure.Cosmos/src/Util/ConfigurationManager.cs @@ -7,7 +7,14 @@ namespace Microsoft.Azure.Cosmos using System; internal static class ConfigurationManager - { + { + /// + /// A read-only string containing the environment variablename for enabling replica validation. + /// This will eventually be removed oncereplica valdiatin is enabled by default for both preview + /// and GA. + /// + internal static readonly string ReplicaConnectivityValidationEnabled = "AZURE_COSMOS_REPLICA_VALIDATION_ENABLED"; + public static T GetEnvironmentVariable(string variable, T defaultValue) { string value = Environment.GetEnvironmentVariable(variable); @@ -17,5 +24,26 @@ public static T GetEnvironmentVariable(string variable, T defaultValue) } return (T)Convert.ChangeType(value, typeof(T)); } + + /// + /// Gets the boolean value of the replica validation environment variable. Note that, replica validation + /// is enabled by default for the preview package and disabled for GA at the moment. The user can set the + /// respective environment variable 'AZURE_COSMOS_REPLICA_VALIDATION_ENABLED' to override the value for + /// both preview and GA. The method will eventually be removed, once replica valdiatin is enabled by default + /// for both preview and GA. + /// + /// A boolean flag indicating if replica validation is enabled. + public static bool IsReplicaAddressValidationEnabled() + { + bool replicaValidationDefaultValue = false; +#if PREVIEW + replicaValidationDefaultValue = true; +#endif + + return ConfigurationManager + .GetEnvironmentVariable( + variable: ConfigurationManager.ReplicaConnectivityValidationEnabled, + defaultValue: replicaValidationDefaultValue); + } } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ClientRetryPolicyTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ClientRetryPolicyTests.cs index a41f08afa1..0851952f40 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ClientRetryPolicyTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ClientRetryPolicyTests.cs @@ -209,7 +209,8 @@ private async Task ValidateConnectTimeoutTriggersClientRetryPolicy( enableReadRequestsFallback: false, useMultipleWriteLocations: useMultipleWriteLocations, detectClientConnectivityIssues: true, - disableRetryWithRetryPolicy: false); + disableRetryWithRetryPolicy: false, + enableReplicaValidation: false); // Reducing retry timeout to avoid long-running tests replicatedResourceClient.GoneAndRetryWithRetryTimeoutInSecondsOverride = 1; diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosBadReplicaTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosBadReplicaTests.cs index 3ef86568b8..7a5b387e8e 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosBadReplicaTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosBadReplicaTests.cs @@ -6,7 +6,6 @@ namespace Microsoft.Azure.Cosmos.Tests { using System; using System.Collections.Generic; - using System.Globalization; using System.IO; using System.Linq; using System.Net; @@ -14,44 +13,50 @@ namespace Microsoft.Azure.Cosmos.Tests using System.Text; using System.Threading; using System.Threading.Tasks; - using global::Azure.Core; - using Microsoft.Azure.Cosmos.Fluent; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; - using Newtonsoft.Json.Linq; [TestClass] public class CosmosBadReplicaTests { [TestMethod] [Timeout(30000)] - public async Task TestGoneFromServiceScenarioAsync() + [DataRow(true, DisplayName = "Validate when replica validation is enabled.")] + [DataRow(false, DisplayName = "Validate when replica validation is disabled.")] + public async Task TestGoneFromServiceScenarioAsync( + bool enableReplicaValidation) { - Mock mockHttpHandler = new Mock(MockBehavior.Strict); - Uri endpoint = MockSetupsHelper.SetupSingleRegionAccount( - "mockAccountInfo", - consistencyLevel: ConsistencyLevel.Session, - mockHttpHandler, - out string primaryRegionEndpoint); - - string databaseName = "mockDbName"; - string containerName = "mockContainerName"; - string containerRid = "ccZ1ANCszwk="; - Documents.ResourceId cRid = Documents.ResourceId.Parse(containerRid); - MockSetupsHelper.SetupContainerProperties( - mockHttpHandler: mockHttpHandler, - regionEndpoint: primaryRegionEndpoint, - databaseName: databaseName, - containerName: containerName, - containerRid: containerRid); - - MockSetupsHelper.SetupSinglePartitionKeyRange( - mockHttpHandler, - primaryRegionEndpoint, - cRid, - out IReadOnlyList partitionKeyRanges); - - List replicaIds1 = new List() + try + { + Environment.SetEnvironmentVariable( + variable: ConfigurationManager.ReplicaConnectivityValidationEnabled, + value: enableReplicaValidation.ToString()); + + Mock mockHttpHandler = new Mock(MockBehavior.Strict); + Uri endpoint = MockSetupsHelper.SetupSingleRegionAccount( + "mockAccountInfo", + consistencyLevel: ConsistencyLevel.Session, + mockHttpHandler, + out string primaryRegionEndpoint); + + string databaseName = "mockDbName"; + string containerName = "mockContainerName"; + string containerRid = "ccZ1ANCszwk="; + Documents.ResourceId cRid = Documents.ResourceId.Parse(containerRid); + MockSetupsHelper.SetupContainerProperties( + mockHttpHandler: mockHttpHandler, + regionEndpoint: primaryRegionEndpoint, + databaseName: databaseName, + containerName: containerName, + containerRid: containerRid); + + MockSetupsHelper.SetupSinglePartitionKeyRange( + mockHttpHandler, + primaryRegionEndpoint, + cRid, + out IReadOnlyList partitionKeyRanges); + + List replicaIds1 = new List() { "11111111111111111", "22222222222222222", @@ -59,14 +64,14 @@ public async Task TestGoneFromServiceScenarioAsync() "44444444444444444", }; - HttpResponseMessage replicaSet1 = MockSetupsHelper.CreateAddresses( - replicaIds1, - partitionKeyRanges.First(), - "eastus", - cRid); + HttpResponseMessage replicaSet1 = MockSetupsHelper.CreateAddresses( + replicaIds1, + partitionKeyRanges.First(), + "eastus", + cRid); - // One replica changed on the refresh - List replicaIds2 = new List() + // One replica changed on the refresh + List replicaIds2 = new List() { "11111111111111111", "22222222222222222", @@ -74,118 +79,134 @@ public async Task TestGoneFromServiceScenarioAsync() "55555555555555555", }; - HttpResponseMessage replicaSet2 = MockSetupsHelper.CreateAddresses( - replicaIds2, - partitionKeyRanges.First(), - "eastus", - cRid); - - bool delayCacheRefresh = true; - bool delayRefreshUnblocked = false; - mockHttpHandler.SetupSequence(x => x.SendAsync( - It.Is(r => r.RequestUri.ToString().Contains("addresses")), It.IsAny())) - .Returns(Task.FromResult(replicaSet1)) - .Returns(async ()=> + HttpResponseMessage replicaSet2 = MockSetupsHelper.CreateAddresses( + replicaIds2, + partitionKeyRanges.First(), + "eastus", + cRid); + + bool delayCacheRefresh = true; + bool delayRefreshUnblocked = false; + mockHttpHandler.SetupSequence(x => x.SendAsync( + It.Is(r => r.RequestUri.ToString().Contains("addresses")), It.IsAny())) + .Returns(Task.FromResult(replicaSet1)) + .Returns(async ()=> + { + //block cache refresh to verify bad replica is not visited during refresh + while (delayCacheRefresh) + { + await Task.Delay(TimeSpan.FromMilliseconds(20)); + } + + delayRefreshUnblocked = true; + return replicaSet2; + }); + + int callBack = 0; + List urisVisited = new List(); + Mock mockTransportClient = new Mock(MockBehavior.Strict); + mockTransportClient.Setup(x => x.InvokeResourceOperationAsync(It.IsAny(), It.IsAny())) + .Callback((t, _) => urisVisited.Add(t)) + .Returns(() => { - //block cache refresh to verify bad replica is not visited during refresh - while (delayCacheRefresh) + callBack++; + if (callBack == 1) { - await Task.Delay(TimeSpan.FromMilliseconds(20)); + throw Documents.Rntbd.TransportExceptions.GetGoneException( + new Uri("https://localhost:8081"), + Guid.NewGuid(), + new Documents.TransportException(Documents.TransportErrorCode.ConnectionBroken, + null, + Guid.NewGuid(), + new Uri("https://localhost:8081"), + "Mock", + userPayload: true, + payloadSent: false)); } - delayRefreshUnblocked = true; - return replicaSet2; - }); - - int callBack = 0; - List urisVisited = new List(); - Mock mockTransportClient = new Mock(MockBehavior.Strict); - mockTransportClient.Setup(x => x.InvokeResourceOperationAsync(It.IsAny(), It.IsAny())) - .Callback((t, _) => urisVisited.Add(t)) - .Returns(() => - { - callBack++; - if (callBack == 1) - { - throw Documents.Rntbd.TransportExceptions.GetGoneException( - new Uri("https://localhost:8081"), - Guid.NewGuid(), - new Documents.TransportException(Documents.TransportErrorCode.ConnectionBroken, - null, - Guid.NewGuid(), - new Uri("https://localhost:8081"), - "Mock", - userPayload: true, - payloadSent: false)); - } - - return Task.FromResult(new Documents.StoreResponse() - { - Status = 200, - Headers = new Documents.Collections.StoreResponseNameValueCollection() + return Task.FromResult(new Documents.StoreResponse() { - ActivityId = Guid.NewGuid().ToString(), - LSN = "12345", - PartitionKeyRangeId = "0", - GlobalCommittedLSN = "12345", - SessionToken = "1#12345#1=12345" - }, - ResponseBody = new MemoryStream() + Status = 200, + Headers = new Documents.Collections.StoreResponseNameValueCollection() + { + ActivityId = Guid.NewGuid().ToString(), + LSN = "12345", + PartitionKeyRangeId = "0", + GlobalCommittedLSN = "12345", + SessionToken = "1#12345#1=12345" + }, + ResponseBody = new MemoryStream() + }); }); - }); - - CosmosClientOptions cosmosClientOptions = new CosmosClientOptions() - { - ConsistencyLevel = Cosmos.ConsistencyLevel.Session, - HttpClientFactory = () => new HttpClient(new HttpHandlerHelper(mockHttpHandler.Object)), - TransportClientHandlerFactory = (original) => mockTransportClient.Object, - }; - using (CosmosClient customClient = new CosmosClient( - endpoint.ToString(), - Convert.ToBase64String(Encoding.UTF8.GetBytes(Guid.NewGuid().ToString())), - cosmosClientOptions)) - { - try + CosmosClientOptions cosmosClientOptions = new CosmosClientOptions() { - Container container = customClient.GetContainer(databaseName, containerName); - - for (int i = 0; i < 20; i++) + ConsistencyLevel = Cosmos.ConsistencyLevel.Session, + HttpClientFactory = () => new HttpClient(new HttpHandlerHelper(mockHttpHandler.Object)), + TransportClientHandlerFactory = (original) => mockTransportClient.Object, + }; + + using (CosmosClient customClient = new CosmosClient( + endpoint.ToString(), + Convert.ToBase64String(Encoding.UTF8.GetBytes(Guid.NewGuid().ToString())), + cosmosClientOptions)) + { + try { - ResponseMessage response = await container.ReadItemStreamAsync(Guid.NewGuid().ToString(), new Cosmos.PartitionKey(Guid.NewGuid().ToString())); - Assert.AreEqual(HttpStatusCode.OK, response.StatusCode); + Container container = customClient.GetContainer(databaseName, containerName); + + for (int i = 0; i < 20; i++) + { + ResponseMessage response = await container.ReadItemStreamAsync(Guid.NewGuid().ToString(), new Cosmos.PartitionKey(Guid.NewGuid().ToString())); + Assert.AreEqual(HttpStatusCode.OK, response.StatusCode); + } + + mockTransportClient.VerifyAll(); + mockHttpHandler.VerifyAll(); + + Documents.TransportAddressUri failedReplica = urisVisited.First(); + + // With replica validation enabled in preview mode, the failed replica will be validated as a part of the flow, + // and because any subsequent validation/ connection will be successful, the failed replica will now be marked + // as connected, thus it will be visited more than once. However, note that when replice validation is disabled, + // no validation is done thus the URI will be marked as unhealthy as expected. Therefore the uri will be visited + // just once. + Assert.IsTrue( + enableReplicaValidation + ? urisVisited.Any(x => x.Equals(failedReplica)) + : urisVisited.Count(x => x.Equals(failedReplica)) == 1); + + urisVisited.Clear(); + delayCacheRefresh = false; + do + { + await Task.Delay(TimeSpan.FromMilliseconds(100)); + }while (!delayRefreshUnblocked); + + for (int i = 0; i < 20; i++) + { + ResponseMessage response = await container.ReadItemStreamAsync(Guid.NewGuid().ToString(), new Cosmos.PartitionKey(Guid.NewGuid().ToString())); + Assert.AreEqual(HttpStatusCode.OK, response.StatusCode); + } + + Assert.AreEqual(4, urisVisited.ToHashSet().Count()); + + // Clears all the setups. No network calls should be done on the next operation. + mockHttpHandler.Reset(); + mockTransportClient.Reset(); } - - mockTransportClient.VerifyAll(); - mockHttpHandler.VerifyAll(); - - Documents.TransportAddressUri failedReplica = urisVisited.First(); - Assert.AreEqual(1, urisVisited.Count(x => x.Equals(failedReplica))); - - urisVisited.Clear(); - delayCacheRefresh = false; - do - { - await Task.Delay(TimeSpan.FromMilliseconds(100)); - }while (!delayRefreshUnblocked); - - for (int i = 0; i < 20; i++) + finally { - ResponseMessage response = await container.ReadItemStreamAsync(Guid.NewGuid().ToString(), new Cosmos.PartitionKey(Guid.NewGuid().ToString())); - Assert.AreEqual(HttpStatusCode.OK, response.StatusCode); + mockTransportClient.Setup(x => x.Dispose()); } - - Assert.AreEqual(4, urisVisited.ToHashSet().Count()); - - // Clears all the setups. No network calls should be done on the next operation. - mockHttpHandler.Reset(); - mockTransportClient.Reset(); - } - finally - { - mockTransportClient.Setup(x => x.Dispose()); } } + finally + { + Environment.SetEnvironmentVariable( + variable: ConfigurationManager.ReplicaConnectivityValidationEnabled, + value: null); + } } } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs index 296c131fbf..c855599465 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs @@ -1000,19 +1000,8 @@ public async Task TryGetAddressesAsync_WhenReplicaVlidationEnabled_ShouldValidat MockCosmosUtil.CreateCosmosHttpClient(() => httpClient), openConnectionsHandler: fakeOpenConnectionHandler, suboptimalPartitionForceRefreshIntervalInSeconds: 2, - enableTcpConnectionEndpointRediscovery: true); - - // By default, the replica validation feature is disabled in GatewayAddressCache. Reflection is used to enable the feature - // for the purpose of this test. - FieldInfo fieldInfo = cache - .GetType() - .GetField( - name: "isReplicaAddressValidationEnabled", - bindingAttr: BindingFlags.Instance | BindingFlags.NonPublic); - - fieldInfo.SetValue( - obj: cache, - value: true); + enableTcpConnectionEndpointRediscovery: true, + replicaAddressValidationEnabled: true); DocumentServiceRequest request = DocumentServiceRequest.Create(OperationType.Invalid, ResourceType.Address, AuthorizationTokenType.Invalid); @@ -1156,19 +1145,8 @@ public async Task TryGetAddressesAsync_WhenReplicaVlidationEnabledAndUnhealthyUr MockCosmosUtil.CreateCosmosHttpClient(() => httpClient), openConnectionsHandler: fakeOpenConnectionHandler, suboptimalPartitionForceRefreshIntervalInSeconds: 2, - enableTcpConnectionEndpointRediscovery: true); - - // By default, the replica validation feature is disabled in GatewayAddressCache. Reflection is used to enable the feature - // for the purpose of this test. - FieldInfo fieldInfo = cache - .GetType() - .GetField( - name: "isReplicaAddressValidationEnabled", - bindingAttr: BindingFlags.Instance | BindingFlags.NonPublic); - - fieldInfo.SetValue( - obj: cache, - value: true); + enableTcpConnectionEndpointRediscovery: true, + replicaAddressValidationEnabled: true); DocumentServiceRequest request = DocumentServiceRequest.Create(OperationType.Invalid, ResourceType.Address, AuthorizationTokenType.Invalid); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/RequestEventHandlerTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/RequestEventHandlerTests.cs index f5602a1f29..9a19fb4a90 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/RequestEventHandlerTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/RequestEventHandlerTests.cs @@ -82,7 +82,7 @@ private StoreClient GetMockStoreClient() TransportClient mockTransportClient = this.GetMockTransportClient(); ISessionContainer sessionContainer = new SessionContainer(string.Empty); - StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer); + StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer, false); Mock mockAuthorizationTokenProvider = new Mock(); mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync( diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/StoreReaderTest.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/StoreReaderTest.cs index b92ca3c804..831e32041a 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/StoreReaderTest.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/StoreReaderTest.cs @@ -536,7 +536,8 @@ public void StoreReaderBarrierTest() new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), - sessionContainer); + sessionContainer, + enableReplicaValidation: false); // reads always go to read quorum (2) replicas int replicaCountToRead = 2; @@ -611,14 +612,14 @@ public void GlobalStrongConsistentWriteMockTest() for (int i = 0; i < addressInformation.Length; i++) { TransportClient mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, false, false, false); - StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer); - ConsistencyWriter consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false); + StoreReader storeReader = new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), sessionContainer, false); + ConsistencyWriter consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false); StoreResponse response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result; Assert.AreEqual(100, response.LSN); //globalCommittedLsn never catches up in this case mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, true, false, false); - consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false); + consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false); try { response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result; @@ -629,17 +630,17 @@ public void GlobalStrongConsistentWriteMockTest() } mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, false, true, false); - consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false); + consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false); response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result; Assert.AreEqual(100, response.LSN); mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, false, true, true); - consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false); + consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false); response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result; Assert.AreEqual(100, response.LSN); mockTransportClient = this.GetMockTransportClientForGlobalStrongWrites(addressInformation, i, false, false, true); - consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false); + consistencyWriter = new ConsistencyWriter(addressSelector, sessionContainer, mockTransportClient, mockServiceConfigReader.Object, mockAuthorizationTokenProvider.Object, false, false); response = consistencyWriter.WriteAsync(entity, new TimeoutHelper(TimeSpan.FromSeconds(30)), false).Result; Assert.AreEqual(100, response.LSN); } @@ -703,7 +704,8 @@ public void GlobalStrongConsistencyMockTest() new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), - sessionContainer); + sessionContainer, + false); Mock mockAuthorizationTokenProvider = new Mock(); mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync( @@ -746,7 +748,8 @@ public void GlobalStrongConsistencyMockTest() new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), - sessionContainer); + sessionContainer, + false); Mock mockAuthorizationTokenProvider = new Mock(); mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync( @@ -798,7 +801,8 @@ public void GlobalStrongConsistencyMockTest() new StoreReader(mockTransportClient, addressSelector, new AddressEnumerator(), - sessionContainer); + sessionContainer, + false); Mock mockAuthorizationTokenProvider = new Mock(); mockAuthorizationTokenProvider.Setup(provider => provider.AddSystemAuthorizationHeaderAsync(