Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,18 @@ public Func<HttpClient> HttpClientFactory
set;
}

/// <summary>
/// Gets or sets the boolean flag to enable replica validation.
/// </summary>
/// <value>
/// The default value for this parameter is false.
/// </value>
public bool EnableAdvancedReplicaSelectionForTcp
{
get;
set;
}

/// <summary>
/// (Direct/TCP) This is an advanced setting that controls the number of TCP connections that will be opened eagerly to each Cosmos DB back-end.
/// </summary>
Expand Down
12 changes: 12 additions & 0 deletions Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,17 @@ public ConnectionMode ConnectionMode
/// <seealso cref="TransactionalBatchItemRequestOptions.EnableContentResponseOnWrite"/>
public bool? EnableContentResponseOnWrite { get; set; }

/// <summary>
/// Gets or sets the advanced replica selection flag. The advanced replica selection logic keeps track of the replica connection
/// status, and based on status, it prioritizes the replicas which show healthy stable connections, so that the requests can be sent
/// confidently to the particular replica. This helps the cosmos client to become more resilient and effective to any connectivity issues.
/// The default value for this parameter is 'false'.
/// </summary>
/// <remarks>
/// <para>This is optimal for latency-sensitive workloads. Does not apply if <see cref="ConnectionMode.Gateway"/> is used.</para>
/// </remarks>
internal bool? EnableAdvancedReplicaSelectionForTcp { get; set; }

/// <summary>
/// (Direct/TCP) Controls the amount of idle time after which unused connections are closed.
/// </summary>
Expand Down Expand Up @@ -758,6 +769,7 @@ internal virtual ConnectionPolicy GetConnectionPolicy(int clientId)
EnablePartitionLevelFailover = this.EnablePartitionLevelFailover,
PortReuseMode = this.portReuseMode,
EnableTcpConnectionEndpointRediscovery = this.EnableTcpConnectionEndpointRediscovery,
EnableAdvancedReplicaSelectionForTcp = this.EnableAdvancedReplicaSelectionForTcp.HasValue && this.EnableAdvancedReplicaSelectionForTcp.Value,
HttpClientFactory = this.httpClientFactory,
ServerCertificateCustomValidationCallback = this.ServerCertificateCustomValidationCallback
};
Expand Down
2 changes: 1 addition & 1 deletion Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public DocumentClient(Uri serviceEndpoint,

this.Initialize(serviceEndpoint, connectionPolicy, desiredConsistencyLevel);
this.initTaskCache = new AsyncCacheNonBlocking<string, bool>(cancellationToken: this.cancellationTokenSource.Token);
this.isReplicaAddressValidationEnabled = ConfigurationManager.IsReplicaAddressValidationEnabled();
this.isReplicaAddressValidationEnabled = ConfigurationManager.IsReplicaAddressValidationEnabled(connectionPolicy);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public GlobalAddressResolver(

this.enableTcpConnectionEndpointRediscovery = connectionPolicy.EnableTcpConnectionEndpointRediscovery;

this.isReplicaAddressValidationEnabled = ConfigurationManager.IsReplicaAddressValidationEnabled();
this.isReplicaAddressValidationEnabled = ConfigurationManager.IsReplicaAddressValidationEnabled(connectionPolicy);

this.maxEndpoints = maxBackupReadEndpoints + 2; // for write and alternate write endpoint (during failover)

Expand Down
9 changes: 8 additions & 1 deletion Microsoft.Azure.Cosmos/src/Util/ConfigurationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,20 @@ public static T GetEnvironmentVariable<T>(string variable, T defaultValue)
/// both preview and GA. The method will eventually be removed, once replica valdiatin is enabled by default
/// for both preview and GA.
/// </summary>
/// <param name="connectionPolicy">An instance of <see cref="ConnectionPolicy"/> containing the client options.</param>
/// <returns>A boolean flag indicating if replica validation is enabled.</returns>
public static bool IsReplicaAddressValidationEnabled()
public static bool IsReplicaAddressValidationEnabled(
ConnectionPolicy connectionPolicy)
{
bool replicaValidationDefaultValue = false;
#if PREVIEW
replicaValidationDefaultValue = true;
#endif
if (connectionPolicy != null
&& connectionPolicy.EnableAdvancedReplicaSelectionForTcp)
{
return true;
}

return ConfigurationManager
.GetEnvironmentVariable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,27 +51,58 @@ public async Task Cleanup()
}

[TestMethod]
public async Task ReadManyTypedTest()
[DataRow(true, DisplayName = "Validates Read Many scenario with advanced replica selection enabled.")]
[DataRow(false, DisplayName = "Validates Read Many scenario with advanced replica selection disabled.")]
public async Task ReadManyTypedTestWithAdvancedReplicaSelection(
bool advancedReplicaSelectionEnabled)
{
List<(string, PartitionKey)> itemList = new List<(string, PartitionKey)>();
for (int i=0; i<10; i++)
CosmosClientOptions clientOptions = new ()
{
itemList.Add((i.ToString(), new PartitionKey("pk" + i.ToString())));
}
EnableAdvancedReplicaSelectionForTcp = advancedReplicaSelectionEnabled,
};

FeedResponse<ToDoActivity> feedResponse= await this.Container.ReadManyItemsAsync<ToDoActivity>(itemList);
Assert.IsNotNull(feedResponse);
Assert.AreEqual(feedResponse.Count, 10);
Assert.IsTrue(feedResponse.Headers.RequestCharge > 0);
Assert.IsNotNull(feedResponse.Diagnostics);
Database database = null;
CosmosClient cosmosClient = TestCommon.CreateCosmosClient(clientOptions);
try
{
database = await cosmosClient.CreateDatabaseAsync("ReadManyTypedTestScenarioDb");
Container container = await database.CreateContainerAsync("ReadManyTypedTestContainer", "/pk");

int count = 0;
foreach (ToDoActivity item in feedResponse)
// Create items with different pk values
for (int i = 0; i < 500; i++)
{
ToDoActivity item = ToDoActivity.CreateRandomToDoActivity();
item.pk = "pk" + i.ToString();
item.id = i.ToString();
ItemResponse<ToDoActivity> itemResponse = await container.CreateItemAsync(item);
Assert.AreEqual(HttpStatusCode.Created, itemResponse.StatusCode);
}

List<(string, PartitionKey)> itemList = new List<(string, PartitionKey)>();
for (int i = 0; i < 20; i++)
{
itemList.Add((i.ToString(), new PartitionKey("pk" + i.ToString())));
}

FeedResponse<ToDoActivity> feedResponse = await container.ReadManyItemsAsync<ToDoActivity>(itemList);
Assert.IsNotNull(feedResponse);
Assert.AreEqual(20, feedResponse.Count);
Assert.IsTrue(feedResponse.Headers.RequestCharge > 0);
Assert.IsNotNull(feedResponse.Diagnostics);

int count = 0;
foreach (ToDoActivity item in feedResponse)
{
count++;
Assert.IsNotNull(item);
}
Assert.AreEqual(20, count);
}
finally
{
count++;
Assert.IsNotNull(item);
await database.DeleteAsync();
cosmosClient.Dispose();
}
Assert.AreEqual(count, 10);
}

[TestMethod]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,23 @@ public class CosmosBadReplicaTests
{
[TestMethod]
[Timeout(30000)]
[DataRow(true, DisplayName = "Validate when replica validation is enabled.")]
[DataRow(false, DisplayName = "Validate when replica validation is disabled.")]
[DataRow(true, true, false, DisplayName = "Validate when replica validation is enabled using environment variable.")]
[DataRow(false, true, false, DisplayName = "Validate when replica validation is disabled using environment variable.")]
[DataRow(true, false, true, DisplayName = "Validate when replica validation is enabled using cosmos client options.")]
[DataRow(false, false, true, DisplayName = "Validate when replica validation is disabled using cosmos client options.")]
public async Task TestGoneFromServiceScenarioAsync(
bool enableReplicaValidation)
bool enableReplicaValidation,
bool useEnvironmentVariable,
bool useCosmosClientOptions)
{
try
{
Environment.SetEnvironmentVariable(
variable: ConfigurationManager.ReplicaConnectivityValidationEnabled,
value: enableReplicaValidation.ToString());
if (useEnvironmentVariable)
{
Environment.SetEnvironmentVariable(
variable: ConfigurationManager.ReplicaConnectivityValidationEnabled,
value: enableReplicaValidation.ToString());
}

Mock<IHttpHandler> mockHttpHandler = new Mock<IHttpHandler>(MockBehavior.Strict);
Uri endpoint = MockSetupsHelper.SetupSingleRegionAccount(
Expand Down Expand Up @@ -146,6 +153,11 @@ public async Task TestGoneFromServiceScenarioAsync(
TransportClientHandlerFactory = (original) => mockTransportClient.Object,
};

if (useCosmosClientOptions)
{
cosmosClientOptions.EnableAdvancedReplicaSelectionForTcp = enableReplicaValidation;
}

using (CosmosClient customClient = new CosmosClient(
endpoint.ToString(),
Convert.ToBase64String(Encoding.UTF8.GetBytes(Guid.NewGuid().ToString())),
Expand Down Expand Up @@ -203,9 +215,12 @@ public async Task TestGoneFromServiceScenarioAsync(
}
finally
{
Environment.SetEnvironmentVariable(
variable: ConfigurationManager.ReplicaConnectivityValidationEnabled,
value: null);
if (useEnvironmentVariable)
{
Environment.SetEnvironmentVariable(
variable: ConfigurationManager.ReplicaConnectivityValidationEnabled,
value: null);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated()
Assert.IsNull(clientOptions.HttpClientFactory);
Assert.AreNotEqual(consistencyLevel, clientOptions.ConsistencyLevel);
Assert.IsFalse(clientOptions.EnablePartitionLevelFailover);
Assert.IsFalse(clientOptions.EnableAdvancedReplicaSelectionForTcp.HasValue);

//Verify GetConnectionPolicy returns the correct values for default
ConnectionPolicy policy = clientOptions.GetConnectionPolicy(clientId: 0);
Expand All @@ -97,6 +98,7 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated()
Assert.IsNull(policy.HttpClientFactory);
Assert.AreNotEqual(Cosmos.ConsistencyLevel.Session, clientOptions.ConsistencyLevel);
Assert.IsFalse(policy.EnablePartitionLevelFailover);
Assert.IsFalse(clientOptions.EnableAdvancedReplicaSelectionForTcp.HasValue);

cosmosClientBuilder.WithApplicationRegion(region)
.WithConnectionModeGateway(maxConnections, webProxy)
Expand All @@ -112,6 +114,7 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated()

cosmosClient = cosmosClientBuilder.Build(new MockDocumentClient());
clientOptions = cosmosClient.ClientOptions;
clientOptions.EnableAdvancedReplicaSelectionForTcp = true;

//Verify all the values are updated
Assert.AreEqual(region, clientOptions.ApplicationRegion);
Expand All @@ -131,6 +134,7 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated()
Assert.IsTrue(clientOptions.AllowBulkExecution);
Assert.AreEqual(consistencyLevel, clientOptions.ConsistencyLevel);
Assert.IsTrue(clientOptions.EnablePartitionLevelFailover);
Assert.IsTrue(clientOptions.EnableAdvancedReplicaSelectionForTcp.HasValue && clientOptions.EnableAdvancedReplicaSelectionForTcp.Value);

//Verify GetConnectionPolicy returns the correct values
policy = clientOptions.GetConnectionPolicy(clientId: 0);
Expand All @@ -145,7 +149,8 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated()
Assert.AreEqual((int)maxRetryWaitTime.TotalSeconds, policy.RetryOptions.MaxRetryWaitTimeInSeconds);
Assert.AreEqual((Documents.ConsistencyLevel)consistencyLevel, clientOptions.GetDocumentsConsistencyLevel());
Assert.IsTrue(policy.EnablePartitionLevelFailover);

Assert.IsTrue(clientOptions.EnableAdvancedReplicaSelectionForTcp.Value);

IReadOnlyList<string> preferredLocations = new List<string>() { Regions.AustraliaCentral, Regions.AustraliaCentral2 };
//Verify Direct Mode settings
cosmosClientBuilder = new CosmosClientBuilder(
Expand Down