diff --git a/Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs b/Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs index 7abfd76deb..9193abedef 100644 --- a/Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs +++ b/Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs @@ -459,6 +459,18 @@ public Func HttpClientFactory set; } + /// + /// Gets or sets the boolean flag to enable replica validation. + /// + /// + /// The default value for this parameter is false. + /// + public bool? EnableAdvancedReplicaSelectionForTcp + { + get; + set; + } + /// /// (Direct/TCP) This is an advanced setting that controls the number of TCP connections that will be opened eagerly to each Cosmos DB back-end. /// diff --git a/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs b/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs index 1fce195915..330738c00b 100644 --- a/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs +++ b/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs @@ -346,6 +346,17 @@ public ConnectionMode ConnectionMode /// public bool? EnableContentResponseOnWrite { get; set; } + /// + /// Gets or sets the advanced replica selection flag. The advanced replica selection logic keeps track of the replica connection + /// status, and based on status, it prioritizes the replicas which show healthy stable connections, so that the requests can be sent + /// confidently to the particular replica. This helps the cosmos client to become more resilient and effective to any connectivity issues. + /// The default value for this parameter is 'false'. + /// + /// + /// This is optimal for latency-sensitive workloads. Does not apply if is used. + /// + internal bool? EnableAdvancedReplicaSelectionForTcp { get; set; } + /// /// (Direct/TCP) Controls the amount of idle time after which unused connections are closed. /// @@ -758,6 +769,7 @@ internal virtual ConnectionPolicy GetConnectionPolicy(int clientId) EnablePartitionLevelFailover = this.EnablePartitionLevelFailover, PortReuseMode = this.portReuseMode, EnableTcpConnectionEndpointRediscovery = this.EnableTcpConnectionEndpointRediscovery, + EnableAdvancedReplicaSelectionForTcp = this.EnableAdvancedReplicaSelectionForTcp, HttpClientFactory = this.httpClientFactory, ServerCertificateCustomValidationCallback = this.ServerCertificateCustomValidationCallback }; diff --git a/Microsoft.Azure.Cosmos/src/DocumentClient.cs b/Microsoft.Azure.Cosmos/src/DocumentClient.cs index 752ec8f5a5..c7aeb07a6f 100644 --- a/Microsoft.Azure.Cosmos/src/DocumentClient.cs +++ b/Microsoft.Azure.Cosmos/src/DocumentClient.cs @@ -233,7 +233,7 @@ public DocumentClient(Uri serviceEndpoint, this.Initialize(serviceEndpoint, connectionPolicy, desiredConsistencyLevel); this.initTaskCache = new AsyncCacheNonBlocking(cancellationToken: this.cancellationTokenSource.Token); - this.isReplicaAddressValidationEnabled = ConfigurationManager.IsReplicaAddressValidationEnabled(); + this.isReplicaAddressValidationEnabled = ConfigurationManager.IsReplicaAddressValidationEnabled(connectionPolicy); } /// diff --git a/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs b/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs index 8ac0719dcd..058e9c3ce4 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs @@ -67,7 +67,7 @@ public GlobalAddressResolver( this.enableTcpConnectionEndpointRediscovery = connectionPolicy.EnableTcpConnectionEndpointRediscovery; - this.isReplicaAddressValidationEnabled = ConfigurationManager.IsReplicaAddressValidationEnabled(); + this.isReplicaAddressValidationEnabled = ConfigurationManager.IsReplicaAddressValidationEnabled(connectionPolicy); this.maxEndpoints = maxBackupReadEndpoints + 2; // for write and alternate write endpoint (during failover) diff --git a/Microsoft.Azure.Cosmos/src/Util/ConfigurationManager.cs b/Microsoft.Azure.Cosmos/src/Util/ConfigurationManager.cs index ed0f5a47a6..216e1295b5 100644 --- a/Microsoft.Azure.Cosmos/src/Util/ConfigurationManager.cs +++ b/Microsoft.Azure.Cosmos/src/Util/ConfigurationManager.cs @@ -32,13 +32,20 @@ public static T GetEnvironmentVariable(string variable, T defaultValue) /// both preview and GA. The method will eventually be removed, once replica valdiatin is enabled by default /// for both preview and GA. /// + /// An instance of containing the client options. /// A boolean flag indicating if replica validation is enabled. - public static bool IsReplicaAddressValidationEnabled() + public static bool IsReplicaAddressValidationEnabled( + ConnectionPolicy connectionPolicy) { bool replicaValidationDefaultValue = false; #if PREVIEW replicaValidationDefaultValue = true; #endif + if (connectionPolicy != null + && connectionPolicy.EnableAdvancedReplicaSelectionForTcp.HasValue) + { + return connectionPolicy.EnableAdvancedReplicaSelectionForTcp.Value; + } return ConfigurationManager .GetEnvironmentVariable( diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosReadManyItemsTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosReadManyItemsTests.cs index d215b6b18b..05184d7826 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosReadManyItemsTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosReadManyItemsTests.cs @@ -51,27 +51,58 @@ public async Task Cleanup() } [TestMethod] - public async Task ReadManyTypedTest() + [DataRow(true, DisplayName = "Validates Read Many scenario with advanced replica selection enabled.")] + [DataRow(false, DisplayName = "Validates Read Many scenario with advanced replica selection disabled.")] + public async Task ReadManyTypedTestWithAdvancedReplicaSelection( + bool advancedReplicaSelectionEnabled) { - List<(string, PartitionKey)> itemList = new List<(string, PartitionKey)>(); - for (int i=0; i<10; i++) + CosmosClientOptions clientOptions = new () { - itemList.Add((i.ToString(), new PartitionKey("pk" + i.ToString()))); - } + EnableAdvancedReplicaSelectionForTcp = advancedReplicaSelectionEnabled, + }; - FeedResponse feedResponse= await this.Container.ReadManyItemsAsync(itemList); - Assert.IsNotNull(feedResponse); - Assert.AreEqual(feedResponse.Count, 10); - Assert.IsTrue(feedResponse.Headers.RequestCharge > 0); - Assert.IsNotNull(feedResponse.Diagnostics); + Database database = null; + CosmosClient cosmosClient = TestCommon.CreateCosmosClient(clientOptions); + try + { + database = await cosmosClient.CreateDatabaseAsync("ReadManyTypedTestScenarioDb"); + Container container = await database.CreateContainerAsync("ReadManyTypedTestContainer", "/pk"); - int count = 0; - foreach (ToDoActivity item in feedResponse) + // Create items with different pk values + for (int i = 0; i < 500; i++) + { + ToDoActivity item = ToDoActivity.CreateRandomToDoActivity(); + item.pk = "pk" + i.ToString(); + item.id = i.ToString(); + ItemResponse itemResponse = await container.CreateItemAsync(item); + Assert.AreEqual(HttpStatusCode.Created, itemResponse.StatusCode); + } + + List<(string, PartitionKey)> itemList = new List<(string, PartitionKey)>(); + for (int i = 0; i < 20; i++) + { + itemList.Add((i.ToString(), new PartitionKey("pk" + i.ToString()))); + } + + FeedResponse feedResponse = await container.ReadManyItemsAsync(itemList); + Assert.IsNotNull(feedResponse); + Assert.AreEqual(20, feedResponse.Count); + Assert.IsTrue(feedResponse.Headers.RequestCharge > 0); + Assert.IsNotNull(feedResponse.Diagnostics); + + int count = 0; + foreach (ToDoActivity item in feedResponse) + { + count++; + Assert.IsNotNull(item); + } + Assert.AreEqual(20, count); + } + finally { - count++; - Assert.IsNotNull(item); + await database.DeleteAsync(); + cosmosClient.Dispose(); } - Assert.AreEqual(count, 10); } [TestMethod] diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosBadReplicaTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosBadReplicaTests.cs index 7a5b387e8e..151e0690a8 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosBadReplicaTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosBadReplicaTests.cs @@ -21,16 +21,23 @@ public class CosmosBadReplicaTests { [TestMethod] [Timeout(30000)] - [DataRow(true, DisplayName = "Validate when replica validation is enabled.")] - [DataRow(false, DisplayName = "Validate when replica validation is disabled.")] + [DataRow(true, true, false, DisplayName = "Validate when replica validation is enabled using environment variable.")] + [DataRow(false, true, false, DisplayName = "Validate when replica validation is disabled using environment variable.")] + [DataRow(true, false, true, DisplayName = "Validate when replica validation is enabled using cosmos client options.")] + [DataRow(false, false, true, DisplayName = "Validate when replica validation is disabled using cosmos client options.")] public async Task TestGoneFromServiceScenarioAsync( - bool enableReplicaValidation) + bool enableReplicaValidation, + bool useEnvironmentVariable, + bool useCosmosClientOptions) { try { - Environment.SetEnvironmentVariable( - variable: ConfigurationManager.ReplicaConnectivityValidationEnabled, - value: enableReplicaValidation.ToString()); + if (useEnvironmentVariable) + { + Environment.SetEnvironmentVariable( + variable: ConfigurationManager.ReplicaConnectivityValidationEnabled, + value: enableReplicaValidation.ToString()); + } Mock mockHttpHandler = new Mock(MockBehavior.Strict); Uri endpoint = MockSetupsHelper.SetupSingleRegionAccount( @@ -56,28 +63,28 @@ public async Task TestGoneFromServiceScenarioAsync( cRid, out IReadOnlyList partitionKeyRanges); - List replicaIds1 = new List() - { - "11111111111111111", - "22222222222222222", - "33333333333333333", - "44444444444444444", - }; - - HttpResponseMessage replicaSet1 = MockSetupsHelper.CreateAddresses( - replicaIds1, - partitionKeyRanges.First(), - "eastus", - cRid); + List replicaIds1 = new List() + { + "11111111111111111", + "22222222222222222", + "33333333333333333", + "44444444444444444", + }; - // One replica changed on the refresh - List replicaIds2 = new List() - { - "11111111111111111", - "22222222222222222", - "33333333333333333", - "55555555555555555", - }; + HttpResponseMessage replicaSet1 = MockSetupsHelper.CreateAddresses( + replicaIds1, + partitionKeyRanges.First(), + "eastus", + cRid); + + // One replica changed on the refresh + List replicaIds2 = new List() + { + "11111111111111111", + "22222222222222222", + "33333333333333333", + "55555555555555555", + }; HttpResponseMessage replicaSet2 = MockSetupsHelper.CreateAddresses( replicaIds2, @@ -146,6 +153,11 @@ public async Task TestGoneFromServiceScenarioAsync( TransportClientHandlerFactory = (original) => mockTransportClient.Object, }; + if (useCosmosClientOptions) + { + cosmosClientOptions.EnableAdvancedReplicaSelectionForTcp = enableReplicaValidation; + } + using (CosmosClient customClient = new CosmosClient( endpoint.ToString(), Convert.ToBase64String(Encoding.UTF8.GetBytes(Guid.NewGuid().ToString())), @@ -203,9 +215,12 @@ public async Task TestGoneFromServiceScenarioAsync( } finally { - Environment.SetEnvironmentVariable( - variable: ConfigurationManager.ReplicaConnectivityValidationEnabled, - value: null); + if (useEnvironmentVariable) + { + Environment.SetEnvironmentVariable( + variable: ConfigurationManager.ReplicaConnectivityValidationEnabled, + value: null); + } } } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientOptionsUnitTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientOptionsUnitTests.cs index efb06d7cc5..37d89c389a 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientOptionsUnitTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientOptionsUnitTests.cs @@ -81,6 +81,7 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated() Assert.IsNull(clientOptions.HttpClientFactory); Assert.AreNotEqual(consistencyLevel, clientOptions.ConsistencyLevel); Assert.IsFalse(clientOptions.EnablePartitionLevelFailover); + Assert.IsFalse(clientOptions.EnableAdvancedReplicaSelectionForTcp.HasValue); //Verify GetConnectionPolicy returns the correct values for default ConnectionPolicy policy = clientOptions.GetConnectionPolicy(clientId: 0); @@ -97,6 +98,7 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated() Assert.IsNull(policy.HttpClientFactory); Assert.AreNotEqual(Cosmos.ConsistencyLevel.Session, clientOptions.ConsistencyLevel); Assert.IsFalse(policy.EnablePartitionLevelFailover); + Assert.IsFalse(clientOptions.EnableAdvancedReplicaSelectionForTcp.HasValue); cosmosClientBuilder.WithApplicationRegion(region) .WithConnectionModeGateway(maxConnections, webProxy) @@ -112,6 +114,7 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated() cosmosClient = cosmosClientBuilder.Build(new MockDocumentClient()); clientOptions = cosmosClient.ClientOptions; + clientOptions.EnableAdvancedReplicaSelectionForTcp = true; //Verify all the values are updated Assert.AreEqual(region, clientOptions.ApplicationRegion); @@ -131,6 +134,7 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated() Assert.IsTrue(clientOptions.AllowBulkExecution); Assert.AreEqual(consistencyLevel, clientOptions.ConsistencyLevel); Assert.IsTrue(clientOptions.EnablePartitionLevelFailover); + Assert.IsTrue(clientOptions.EnableAdvancedReplicaSelectionForTcp.HasValue && clientOptions.EnableAdvancedReplicaSelectionForTcp.Value); //Verify GetConnectionPolicy returns the correct values policy = clientOptions.GetConnectionPolicy(clientId: 0); @@ -145,7 +149,8 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated() Assert.AreEqual((int)maxRetryWaitTime.TotalSeconds, policy.RetryOptions.MaxRetryWaitTimeInSeconds); Assert.AreEqual((Documents.ConsistencyLevel)consistencyLevel, clientOptions.GetDocumentsConsistencyLevel()); Assert.IsTrue(policy.EnablePartitionLevelFailover); - + Assert.IsTrue(clientOptions.EnableAdvancedReplicaSelectionForTcp.Value); + IReadOnlyList preferredLocations = new List() { Regions.AustraliaCentral, Regions.AustraliaCentral2 }; //Verify Direct Mode settings cosmosClientBuilder = new CosmosClientBuilder(