Skip to content
Draft
Next Next commit
feature: WIP - Session Consistency Retries
  • Loading branch information
dibahlfi committed Dec 25, 2024
commit 4dc4ee21357497a84fe107eb6adab500b9345573
10 changes: 10 additions & 0 deletions Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ public bool? EnableAdvancedReplicaSelectionForTcp
set;
}


/// <summary>
/// (Direct/TCP) This is an advanced setting that controls the number of TCP connections that will be opened eagerly to each Cosmos DB back-end.
/// </summary>
Expand All @@ -530,6 +531,15 @@ internal CosmosClientTelemetryOptions CosmosClientTelemetryOptions
set;
}

/// <summary>
/// Gets or sets Client Telemetry Options like feature flags and corresponding options
/// </summary>
internal SessionRetryOptions SessionRetryOptions
{
get;
set;
}

/// <summary>
/// GlobalEndpointManager will subscribe to this event if user updates the preferredLocations list in the Azure Cosmos DB service.
/// </summary>
Expand Down
25 changes: 18 additions & 7 deletions Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class CosmosClientOptions

private ConnectionMode connectionMode;
private Protocol connectionProtocol;
private TimeSpan? idleTcpConnectionTimeout;
private TimeSpan? idleTcpConnectionTimeout;
private TimeSpan? openTcpConnectionTimeout;
private int? maxRequestsPerTcpConnection;
private int? maxTcpConnectionsPerEndpoint;
Expand Down Expand Up @@ -315,7 +315,9 @@ public ConnectionMode ConnectionMode
/// This can be used to weaken the database account consistency level for read operations.
/// If this is not set the database account consistency level will be used for all requests.
/// </summary>
public ConsistencyLevel? ConsistencyLevel { get; set; }
public ConsistencyLevel? ConsistencyLevel { get; set; }



/// <summary>
/// Sets the priority level for requests created using cosmos client.
Expand Down Expand Up @@ -460,7 +462,9 @@ public TimeSpan? IdleTcpConnectionTimeout
this.idleTcpConnectionTimeout = value;
this.ValidateDirectTCPSettings();
}
}
}



/// <summary>
/// (Direct/TCP) Controls the amount of time allowed for trying to establish a connection.
Expand Down Expand Up @@ -726,6 +730,11 @@ public Func<HttpClient> HttpClientFactory
internal
#endif
AvailabilityStrategy AvailabilityStrategy { get; set; }

/// <summary>
/// Enable partition key level failover
/// </summary>
internal SessionRetryOptions SessionRetryOptions { get; set; }

/// <summary>
/// Enable partition key level failover
Expand Down Expand Up @@ -919,7 +928,7 @@ internal virtual ConnectionPolicy GetConnectionPolicy(int clientId)
this.ValidateDirectTCPSettings();
this.ValidateLimitToEndpointSettings();
this.ValidatePartitionLevelFailoverSettings();
this.ValidateAvailabilityStrategy();
this.ValidateAvailabilityStrategy();

ConnectionPolicy connectionPolicy = new ConnectionPolicy()
{
Expand All @@ -929,7 +938,8 @@ internal virtual ConnectionPolicy GetConnectionPolicy(int clientId)
ConnectionProtocol = this.ConnectionProtocol,
UserAgentContainer = this.CreateUserAgentContainerWithFeatures(clientId),
UseMultipleWriteLocations = true,
IdleTcpConnectionTimeout = this.IdleTcpConnectionTimeout,
IdleTcpConnectionTimeout = this.IdleTcpConnectionTimeout,
SessionRetryOptions = this.SessionRetryOptions,
OpenTcpConnectionTimeout = this.OpenTcpConnectionTimeout,
MaxRequestsPerTcpConnection = this.MaxRequestsPerTcpConnection,
MaxTcpConnectionsPerEndpoint = this.MaxTcpConnectionsPerEndpoint,
Expand All @@ -940,7 +950,7 @@ internal virtual ConnectionPolicy GetConnectionPolicy(int clientId)
EnableAdvancedReplicaSelectionForTcp = this.EnableAdvancedReplicaSelectionForTcp,
HttpClientFactory = this.httpClientFactory,
ServerCertificateCustomValidationCallback = this.ServerCertificateCustomValidationCallback,
CosmosClientTelemetryOptions = new CosmosClientTelemetryOptions()
CosmosClientTelemetryOptions = new CosmosClientTelemetryOptions()
};

if (this.CosmosClientTelemetryOptions != null)
Expand Down Expand Up @@ -1105,7 +1115,8 @@ private void ValidateAvailabilityStrategy()
{
throw new ArgumentException($"{nameof(this.ApplicationPreferredRegions)} or {nameof(this.ApplicationRegion)} must be set to use {nameof(this.AvailabilityStrategy)}");
}
}
}


private void ValidateDirectTCPSettings()
{
Expand Down
3 changes: 2 additions & 1 deletion Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6723,7 +6723,8 @@ private void CreateStoreModel(bool subscribeRntbdStatus)
!this.enableRntbdChannel,
this.UseMultipleWriteLocations && (this.accountServiceConfiguration.DefaultConsistencyLevel != Documents.ConsistencyLevel.Strong),
true,
enableReplicaValidation: this.isReplicaAddressValidationEnabled);
enableReplicaValidation: this.isReplicaAddressValidationEnabled,
this.ConnectionPolicy.SessionRetryOptions);

if (subscribeRntbdStatus)
{
Expand Down
17 changes: 16 additions & 1 deletion Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,22 @@ public CosmosClientBuilder WithSerializerOptions(CosmosSerializationOptions cosm
{
this.clientOptions.SerializerOptions = cosmosSerializerOptions;
return this;
}
}

/// <summary>
/// Set a custom serializer option.
/// </summary>
/// <param name="sessionRetryOptions">The custom class that implements <see cref="CosmosSerializer"/> </param>
/// <returns>The <see cref="CosmosClientBuilder"/> object</returns>
/// <seealso cref="CosmosSerializer"/>
/// <seealso cref="CosmosClientOptions.SerializerOptions"/>
public CosmosClientBuilder WithSessionRetryOptions(SessionRetryOptions sessionRetryOptions)
{
this.clientOptions.SessionRetryOptions = sessionRetryOptions;
return this;
}



/// <summary>
/// Set a custom JSON serializer.
Expand Down
2 changes: 2 additions & 0 deletions Microsoft.Azure.Cosmos/src/RequestOptions/RequestOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ public class RequestOptions
internal
#endif
AvailabilityStrategy AvailabilityStrategy { get; set; }

SessionRetryOptions SessionRetryOptions { get; set; }

/// <summary>
/// Gets or sets the boolean to use effective partition key routing in the cosmos db request.
Expand Down
36 changes: 36 additions & 0 deletions Microsoft.Azure.Cosmos/src/SessionRetryOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.Text;
/// <summary>
/// Telemetry Options for Cosmos Client to enable/disable telemetry and distributed tracing along with corresponding threshold values.
/// </summary>
public class SessionRetryOptions
{
/// <summary>
/// Disable sending telemetry data to Microsoft, <see cref="Microsoft.Azure.Cosmos.CosmosThresholdOptions"/> is not applicable for this.
/// </summary>
/// <remarks>This feature has to be enabled at 2 places:
/// <list type="bullet">
/// <item>Opt-in from portal to subscribe for this feature.</item>
/// <item>Setting this property to false, to enable it for a particular client instance.</item>
/// </list>
/// </remarks>
/// <value>true</value>
public int MinInRegionRetryTime { get; set; }

/// <summary>
/// Disable sending telemetry data to Microsoft, <see cref="Microsoft.Azure.Cosmos.CosmosThresholdOptions"/> is not applicable for this.
/// </summary>
/// <remarks>This feature has to be enabled at 2 places:
/// <list type="bullet">
/// <item>Opt-in from portal to subscribe for this feature.</item>
/// <item>Setting this property to false, to enable it for a particular client instance.</item>
/// </list>
/// </remarks>
/// <value>true</value>
public int MaxInRegionRetryCount { get; set; }

}
}
9 changes: 7 additions & 2 deletions Microsoft.Azure.Cosmos/src/direct/ConsistencyReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace Microsoft.Azure.Documents
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Documents.Collections;

Expand Down Expand Up @@ -138,18 +139,21 @@ internal sealed class ConsistencyReader
private readonly IAuthorizationTokenProvider authorizationTokenProvider;
private readonly StoreReader storeReader;
private readonly QuorumReader quorumReader;
private readonly SessionRetryOptions sessionRetryOptions;

public ConsistencyReader(
AddressSelector addressSelector,
ISessionContainer sessionContainer,
TransportClient transportClient,
IServiceConfigurationReader serviceConfigReader,
IAuthorizationTokenProvider authorizationTokenProvider,
bool enableReplicaValidation)
bool enableReplicaValidation,
SessionRetryOptions sessionRetryOptions = null)
{
this.addressSelector = addressSelector;
this.serviceConfigReader = serviceConfigReader;
this.authorizationTokenProvider = authorizationTokenProvider;
this.sessionRetryOptions = sessionRetryOptions;
this.storeReader = new StoreReader(transportClient, addressSelector, new AddressEnumerator(), sessionContainer, enableReplicaValidation);
this.quorumReader = new QuorumReader(transportClient, addressSelector, this.storeReader, serviceConfigReader, authorizationTokenProvider);
}
Expand Down Expand Up @@ -233,7 +237,8 @@ public Task<StoreResponse> ReadAsync(
{
return BackoffRetryUtility<StoreResponse>.ExecuteAsync(
callbackMethod: () => this.ReadSessionAsync(entity, desiredReadMode),
retryPolicy: new SessionTokenMismatchRetryPolicy(),
retryPolicy: new SessionTokenMismatchRetryPolicy(
sessionRetryOptions: this.sessionRetryOptions),
cancellationToken: cancellationToken);
}
else
Expand Down
9 changes: 7 additions & 2 deletions Microsoft.Azure.Cosmos/src/direct/ConsistencyWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace Microsoft.Azure.Documents
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Cosmos.Core.Trace;

/*
Expand Down Expand Up @@ -57,6 +58,7 @@ internal sealed class ConsistencyWriter
private readonly IServiceConfigurationReader serviceConfigReader;
private readonly IAuthorizationTokenProvider authorizationTokenProvider;
private readonly bool useMultipleWriteLocations;
private readonly SessionRetryOptions sessionRetryOptions;

public ConsistencyWriter(
AddressSelector addressSelector,
Expand All @@ -65,14 +67,16 @@ public ConsistencyWriter(
IServiceConfigurationReader serviceConfigReader,
IAuthorizationTokenProvider authorizationTokenProvider,
bool useMultipleWriteLocations,
bool enableReplicaValidation)
bool enableReplicaValidation,
SessionRetryOptions sessionRetryOptions = null)
{
this.transportClient = transportClient;
this.addressSelector = addressSelector;
this.sessionContainer = sessionContainer;
this.serviceConfigReader = serviceConfigReader;
this.authorizationTokenProvider = authorizationTokenProvider;
this.useMultipleWriteLocations = useMultipleWriteLocations;
this.sessionRetryOptions = sessionRetryOptions;
this.storeReader = new StoreReader(
transportClient,
addressSelector,
Expand Down Expand Up @@ -130,7 +134,8 @@ public async Task<StoreResponse> WriteAsync(
{
return await BackoffRetryUtility<StoreResponse>.ExecuteAsync(
callbackMethod: () => this.WritePrivateAsync(entity, timeout, forceRefresh),
retryPolicy: new SessionTokenMismatchRetryPolicy(),
retryPolicy: new SessionTokenMismatchRetryPolicy(
sessionRetryOptions: this.sessionRetryOptions),
cancellationToken: cancellationToken);
}
finally
Expand Down
5 changes: 4 additions & 1 deletion Microsoft.Azure.Cosmos/src/direct/IStoreClientFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Microsoft.Azure.Documents
{
using System;
using Microsoft.Azure.Cosmos;

internal interface IStoreClientFactory: IDisposable
{
Expand All @@ -18,6 +19,8 @@ StoreClient CreateStoreClient(
bool useFallbackClient = true,
bool useMultipleWriteLocations = false,
bool detectClientConnectivityIssues = false,
bool enableReplicaValidation = false);
bool enableReplicaValidation = false,
SessionRetryOptions sessionRetryOptions = null
);
}
}
10 changes: 7 additions & 3 deletions Microsoft.Azure.Cosmos/src/direct/ReplicatedResourceClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace Microsoft.Azure.Documents
using System.Globalization;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Documents.Client;

Expand Down Expand Up @@ -70,7 +71,8 @@ public ReplicatedResourceClient(
bool detectClientConnectivityIssues,
bool disableRetryWithRetryPolicy,
bool enableReplicaValidation,
RetryWithConfiguration retryWithConfiguration = null)
RetryWithConfiguration retryWithConfiguration = null,
SessionRetryOptions sessionRetryOptions = null)
{
this.addressResolver = addressResolver;
this.addressSelector = new AddressSelector(addressResolver, protocol);
Expand All @@ -90,15 +92,17 @@ public ReplicatedResourceClient(
transportClient,
serviceConfigReader,
authorizationTokenProvider,
enableReplicaValidation);
enableReplicaValidation,
sessionRetryOptions);
this.consistencyWriter = new ConsistencyWriter(
this.addressSelector,
sessionContainer,
transportClient,
serviceConfigReader,
authorizationTokenProvider,
useMultipleWriteLocations,
enableReplicaValidation);
enableReplicaValidation,
sessionRetryOptions);
this.enableReadRequestsFallback = enableReadRequestsFallback;
this.useMultipleWriteLocations = useMultipleWriteLocations;
this.detectClientConnectivityIssues = detectClientConnectivityIssues;
Expand Down
Loading