Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,14 @@ public System.Text.Json.JsonSerializerOptions UseSystemTextJsonSerializerWithOpt
/// <remarks>
/// <para>This is optimal for latency-sensitive workloads. Does not apply if <see cref="ConnectionMode.Gateway"/> is used.</para>
/// </remarks>
internal bool? EnableAdvancedReplicaSelectionForTcp { get; set; }
internal bool? EnableAdvancedReplicaSelectionForTcp { get; set; }

/// <summary>
/// Gets or sets stack trace optimization to reduce stack trace proliferation in high-concurrency scenarios where exceptions are frequently thrown.
/// When enabled, critical SDK components optimize exception handling to minimize performance overhead.
/// The default value is 'true'.
/// </summary>
internal bool EnableAsyncCacheExceptionNoSharing { get; set; } = true;

/// <summary>
/// (Direct/TCP) Controls the amount of idle time after which unused connections are closed.
Expand Down
29 changes: 20 additions & 9 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider

private readonly bool IsLocalQuorumConsistency = false;
private readonly bool isReplicaAddressValidationEnabled;
private readonly bool enableAsyncCacheExceptionNoSharing;

//Fault Injection
private readonly IChaosInterceptorFactory chaosInterceptorFactory;
Expand Down Expand Up @@ -243,7 +244,9 @@ public DocumentClient(Uri serviceEndpoint,
}

this.Initialize(serviceEndpoint, connectionPolicy, desiredConsistencyLevel);
this.initTaskCache = new AsyncCacheNonBlocking<string, bool>(cancellationToken: this.cancellationTokenSource.Token);
this.initTaskCache = new AsyncCacheNonBlocking<string, bool>(
cancellationToken: this.cancellationTokenSource.Token,
enableAsyncCacheExceptionNoSharing: this.enableAsyncCacheExceptionNoSharing);
this.isReplicaAddressValidationEnabled = ConfigurationManager.IsReplicaAddressValidationEnabled(connectionPolicy);
}

Expand Down Expand Up @@ -444,6 +447,7 @@ internal DocumentClient(Uri serviceEndpoint,
/// <param name="remoteCertificateValidationCallback">This delegate responsible for validating the third party certificate. </param>
/// <param name="cosmosClientTelemetryOptions">This is distributed tracing flag</param>
/// <param name="chaosInterceptorFactory">This is the chaos interceptor used for fault injection</param>
/// <param name="enableAsyncCacheExceptionNoSharing">A boolean flag indicating if stack trace optimization is enabled.</param>
/// <remarks>
/// The service endpoint can be obtained from the Azure Management Portal.
/// If you are connecting using one of the Master Keys, these can be obtained along with the endpoint from the Azure Management Portal
Expand Down Expand Up @@ -472,7 +476,8 @@ internal DocumentClient(Uri serviceEndpoint,
string cosmosClientId = null,
RemoteCertificateValidationCallback remoteCertificateValidationCallback = null,
CosmosClientTelemetryOptions cosmosClientTelemetryOptions = null,
IChaosInterceptorFactory chaosInterceptorFactory = null)
IChaosInterceptorFactory chaosInterceptorFactory = null,
bool enableAsyncCacheExceptionNoSharing = true)
{
if (sendingRequestEventArgs != null)
{
Expand All @@ -491,10 +496,13 @@ internal DocumentClient(Uri serviceEndpoint,
this.receivedResponse += receivedResponseEventArgs;
}

this.enableAsyncCacheExceptionNoSharing = enableAsyncCacheExceptionNoSharing;
this.cosmosAuthorization = cosmosAuthorization ?? throw new ArgumentNullException(nameof(cosmosAuthorization));
this.transportClientHandlerFactory = transportClientHandlerFactory;
this.IsLocalQuorumConsistency = isLocalQuorumConsistency;
this.initTaskCache = new AsyncCacheNonBlocking<string, bool>(cancellationToken: this.cancellationTokenSource.Token);
this.initTaskCache = new AsyncCacheNonBlocking<string, bool>(
cancellationToken: this.cancellationTokenSource.Token,
enableAsyncCacheExceptionNoSharing: this.enableAsyncCacheExceptionNoSharing);
this.chaosInterceptorFactory = chaosInterceptorFactory;
this.chaosInterceptor = chaosInterceptorFactory?.CreateInterceptor(this);

Expand Down Expand Up @@ -675,8 +683,9 @@ private async Task OpenPrivateAsync(CancellationToken cancellationToken)
storeModel: this.GatewayStoreModel,
tokenProvider: this,
retryPolicy: this.retryPolicy,
telemetryToServiceHelper: this.telemetryToServiceHelper);
this.partitionKeyRangeCache = new PartitionKeyRangeCache(this, this.GatewayStoreModel, this.collectionCache, this.GlobalEndpointManager);
telemetryToServiceHelper: this.telemetryToServiceHelper,
enableAsyncCacheExceptionNoSharing: this.enableAsyncCacheExceptionNoSharing);
this.partitionKeyRangeCache = new PartitionKeyRangeCache(this, this.GatewayStoreModel, this.collectionCache, this.GlobalEndpointManager, this.enableAsyncCacheExceptionNoSharing);

DefaultTrace.TraceWarning("{0} occurred while OpenAsync. Exception Message: {1}", ex.ToString(), ex.Message);
}
Expand Down Expand Up @@ -938,7 +947,7 @@ internal virtual void Initialize(Uri serviceEndpoint,
servicePoint.ConnectionLimit = this.ConnectionPolicy.MaxConnectionLimit;
#endif

this.GlobalEndpointManager = new GlobalEndpointManager(this, this.ConnectionPolicy);
this.GlobalEndpointManager = new GlobalEndpointManager(this, this.ConnectionPolicy, this.enableAsyncCacheExceptionNoSharing);
this.PartitionKeyRangeLocation = this.ConnectionPolicy.EnablePartitionLevelFailover || this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker
? new GlobalPartitionEndpointManagerCore(
this.GlobalEndpointManager,
Expand Down Expand Up @@ -1059,8 +1068,9 @@ private async Task<bool> GetInitializationTaskAsync(IStoreClientFactory storeCli
storeModel: this.GatewayStoreModel,
tokenProvider: this,
retryPolicy: this.retryPolicy,
telemetryToServiceHelper: this.telemetryToServiceHelper);
this.partitionKeyRangeCache = new PartitionKeyRangeCache(this, this.GatewayStoreModel, this.collectionCache, this.GlobalEndpointManager);
telemetryToServiceHelper: this.telemetryToServiceHelper,
enableAsyncCacheExceptionNoSharing: this.enableAsyncCacheExceptionNoSharing);
this.partitionKeyRangeCache = new PartitionKeyRangeCache(this, this.GatewayStoreModel, this.collectionCache, this.GlobalEndpointManager, this.enableAsyncCacheExceptionNoSharing);
this.ResetSessionTokenRetryPolicy = new ResetSessionTokenRetryPolicyFactory(this.sessionContainer, this.collectionCache, this.retryPolicy);

gatewayStoreModel.SetCaches(this.partitionKeyRangeCache, this.collectionCache);
Expand Down Expand Up @@ -6722,7 +6732,8 @@ private void InitializeDirectConnectivity(IStoreClientFactory storeClientFactory
this.accountServiceConfiguration,
this.ConnectionPolicy,
this.httpClient,
this.storeClientFactory.GetConnectionStateListener());
this.storeClientFactory.GetConnectionStateListener(),
this.enableAsyncCacheExceptionNoSharing);

this.CreateStoreModel(subscribeRntbdStatus: true);
}
Expand Down
3 changes: 2 additions & 1 deletion Microsoft.Azure.Cosmos/src/Resource/ClientContextCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ internal static CosmosClientContext Create(
cosmosClientId: cosmosClient.Id,
remoteCertificateValidationCallback: ClientContextCore.SslCustomValidationCallBack(clientOptions.GetServerCertificateCustomValidationCallback()),
cosmosClientTelemetryOptions: clientOptions.CosmosClientTelemetryOptions,
chaosInterceptorFactory: clientOptions.ChaosInterceptorFactory);
chaosInterceptorFactory: clientOptions.ChaosInterceptorFactory,
enableAsyncCacheExceptionNoSharing: clientOptions.EnableAsyncCacheExceptionNoSharing);

return ClientContextCore.Create(
cosmosClient,
Expand Down
12 changes: 9 additions & 3 deletions Microsoft.Azure.Cosmos/src/Routing/AsyncCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,26 @@ namespace Microsoft.Azure.Cosmos.Common
/// <typeparam name="TValue">Type of values.</typeparam>
internal sealed class AsyncCache<TKey, TValue>
{
private readonly bool enableAsyncCacheExceptionNoSharing;
private readonly IEqualityComparer<TValue> valueEqualityComparer;
private readonly IEqualityComparer<TKey> keyEqualityComparer;

private ConcurrentDictionary<TKey, AsyncLazy<TValue>> values;

public AsyncCache(IEqualityComparer<TValue> valueEqualityComparer, IEqualityComparer<TKey> keyEqualityComparer = null)
public AsyncCache(
IEqualityComparer<TValue> valueEqualityComparer,
IEqualityComparer<TKey> keyEqualityComparer = null,
bool enableAsyncCacheExceptionNoSharing = true)
{
this.keyEqualityComparer = keyEqualityComparer ?? EqualityComparer<TKey>.Default;
this.values = new ConcurrentDictionary<TKey, AsyncLazy<TValue>>(this.keyEqualityComparer);
this.valueEqualityComparer = valueEqualityComparer;
this.enableAsyncCacheExceptionNoSharing = enableAsyncCacheExceptionNoSharing;
}

public AsyncCache()
: this(EqualityComparer<TValue>.Default)
public AsyncCache(bool enableAsyncCacheExceptionNoSharing = true)
: this(valueEqualityComparer: EqualityComparer<TValue>.Default,
enableAsyncCacheExceptionNoSharing: enableAsyncCacheExceptionNoSharing)
{
}

Expand Down
13 changes: 9 additions & 4 deletions Microsoft.Azure.Cosmos/src/Routing/AsyncCacheNonBlocking.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ namespace Microsoft.Azure.Cosmos
/// </summary>
internal sealed class AsyncCacheNonBlocking<TKey, TValue> : IDisposable
{
private readonly bool enableAsyncCacheExceptionNoSharing;
private readonly CancellationTokenSource cancellationTokenSource;
private readonly ConcurrentDictionary<TKey, AsyncLazyWithRefreshTask<TValue>> values;
private readonly Func<Exception, bool> removeFromCacheOnBackgroundRefreshException;
Expand All @@ -30,18 +31,22 @@ internal sealed class AsyncCacheNonBlocking<TKey, TValue> : IDisposable
public AsyncCacheNonBlocking(
Func<Exception, bool> removeFromCacheOnBackgroundRefreshException = null,
IEqualityComparer<TKey> keyEqualityComparer = null,
CancellationToken cancellationToken = default)
CancellationToken cancellationToken = default,
bool enableAsyncCacheExceptionNoSharing = true)
{
this.keyEqualityComparer = keyEqualityComparer ?? EqualityComparer<TKey>.Default;
this.values = new ConcurrentDictionary<TKey, AsyncLazyWithRefreshTask<TValue>>(this.keyEqualityComparer);
this.removeFromCacheOnBackgroundRefreshException = removeFromCacheOnBackgroundRefreshException ?? AsyncCacheNonBlocking<TKey, TValue>.RemoveNotFoundFromCacheOnException;
this.cancellationTokenSource = cancellationToken == default
? new CancellationTokenSource()
: CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
this.enableAsyncCacheExceptionNoSharing = enableAsyncCacheExceptionNoSharing;
}

public AsyncCacheNonBlocking()
: this(removeFromCacheOnBackgroundRefreshException: null, keyEqualityComparer: null)
public AsyncCacheNonBlocking(bool enableAsyncCacheExceptionNoSharing = true)
: this(removeFromCacheOnBackgroundRefreshException: null,
keyEqualityComparer: null,
enableAsyncCacheExceptionNoSharing: enableAsyncCacheExceptionNoSharing)
{
}

Expand Down Expand Up @@ -279,7 +284,7 @@ public AsyncLazyWithRefreshTask(

public bool IsValueCreated => this.value != null;

public Task<T> GetValueAsync(
public Task<T> GetValueAsync(
Func<T, Task<T>> createValueFunc)
{
// The task was already created so just return it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ public ClientCollectionCache(
IStoreModel storeModel,
ICosmosAuthorizationTokenProvider tokenProvider,
IRetryPolicyFactory retryPolicy,
TelemetryToServiceHelper telemetryToServiceHelper)
TelemetryToServiceHelper telemetryToServiceHelper,
bool enableAsyncCacheExceptionNoSharing = true)
: base(enableAsyncCacheExceptionNoSharing)
{
this.storeModel = storeModel ?? throw new ArgumentNullException("storeModel");
this.tokenProvider = tokenProvider;
Expand Down
20 changes: 14 additions & 6 deletions Microsoft.Azure.Cosmos/src/Routing/CollectionCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,17 @@ internal abstract class CollectionCache
/// </summary>
protected class InternalCache
{
internal InternalCache()
internal InternalCache(
bool enableAsyncCacheExceptionNoSharing = true)
{
this.collectionInfoByName = new AsyncCache<string, ContainerProperties>(new CollectionRidComparer());
this.collectionInfoById = new AsyncCache<string, ContainerProperties>(new CollectionRidComparer());
this.collectionInfoByName = new AsyncCache<string, ContainerProperties>(
new CollectionRidComparer(),
enableAsyncCacheExceptionNoSharing: enableAsyncCacheExceptionNoSharing);

this.collectionInfoById = new AsyncCache<string, ContainerProperties>(
new CollectionRidComparer(),
enableAsyncCacheExceptionNoSharing: enableAsyncCacheExceptionNoSharing);

this.collectionInfoByNameLastRefreshTime = new ConcurrentDictionary<string, DateTime>();
this.collectionInfoByIdLastRefreshTime = new ConcurrentDictionary<string, DateTime>();
}
Expand All @@ -48,11 +55,12 @@ internal InternalCache()
/// </summary>
protected readonly InternalCache[] cacheByApiList;

protected CollectionCache()
protected CollectionCache(
bool enableAsyncCacheExceptionNoSharing = true)
{
this.cacheByApiList = new InternalCache[2];
this.cacheByApiList[0] = new InternalCache(); // for API version < 2018-12-31
this.cacheByApiList[1] = new InternalCache(); // for API version >= 2018-12-31
this.cacheByApiList[0] = new InternalCache(enableAsyncCacheExceptionNoSharing); // for API version < 2018-12-31
this.cacheByApiList[1] = new InternalCache(enableAsyncCacheExceptionNoSharing); // for API version >= 2018-12-31
}

/// <summary>
Expand Down
5 changes: 3 additions & 2 deletions Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,15 @@ public GatewayAddressCache(
IConnectionStateListener connectionStateListener,
long suboptimalPartitionForceRefreshIntervalInSeconds = 600,
bool enableTcpConnectionEndpointRediscovery = false,
bool replicaAddressValidationEnabled = false)
bool replicaAddressValidationEnabled = false,
bool enableAsyncCacheExceptionNoSharing = true)
{
this.addressEndpoint = new Uri(serviceEndpoint + "/" + Paths.AddressPathSegment);
this.protocol = protocol;
this.tokenProvider = tokenProvider;
this.serviceEndpoint = serviceEndpoint;
this.serviceConfigReader = serviceConfigReader;
this.serverPartitionAddressCache = new AsyncCacheNonBlocking<PartitionKeyRangeIdentity, PartitionAddressInformation>();
this.serverPartitionAddressCache = new AsyncCacheNonBlocking<PartitionKeyRangeIdentity, PartitionAddressInformation>(enableAsyncCacheExceptionNoSharing);
this.suboptimalServerPartitionTimestamps = new ConcurrentDictionary<PartitionKeyRangeIdentity, DateTime>();
this.serverPartitionAddressToPkRangeIdMap = new ConcurrentDictionary<ServerKey, HashSet<PartitionKeyRangeIdentity>>();
this.suboptimalMasterPartitionTimestamp = DateTime.MaxValue;
Expand Down
9 changes: 7 additions & 2 deletions Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ internal sealed class GlobalAddressResolver : IAddressResolverExtension, IDispos
private readonly ConcurrentDictionary<Uri, EndpointCache> addressCacheByEndpoint;
private readonly bool enableTcpConnectionEndpointRediscovery;
private readonly bool isReplicaAddressValidationEnabled;
private readonly bool enableAsyncCacheExceptionNoSharing;
private readonly IConnectionStateListener connectionStateListener;
private IOpenConnectionsHandler openConnectionsHandler;

Expand All @@ -52,7 +53,8 @@ public GlobalAddressResolver(
IServiceConfigurationReader serviceConfigReader,
ConnectionPolicy connectionPolicy,
CosmosHttpClient httpClient,
IConnectionStateListener connectionStateListener)
IConnectionStateListener connectionStateListener,
bool enableAsyncCacheExceptionNoSharing = true)
{
this.endpointManager = endpointManager;
this.partitionKeyRangeLocationCache = partitionKeyRangeLocationCache;
Expand All @@ -72,6 +74,8 @@ public GlobalAddressResolver(

this.isReplicaAddressValidationEnabled = ConfigurationManager.IsReplicaAddressValidationEnabled(connectionPolicy);

this.enableAsyncCacheExceptionNoSharing = enableAsyncCacheExceptionNoSharing;

this.maxEndpoints = maxBackupReadEndpoints + 2; // for write and alternate write endpoint (during failover)

this.addressCacheByEndpoint = new ConcurrentDictionary<Uri, EndpointCache>();
Expand Down Expand Up @@ -344,7 +348,8 @@ private EndpointCache GetOrAddEndpoint(Uri endpoint)
this.openConnectionsHandler,
this.connectionStateListener,
enableTcpConnectionEndpointRediscovery: this.enableTcpConnectionEndpointRediscovery,
replicaAddressValidationEnabled: this.isReplicaAddressValidationEnabled);
replicaAddressValidationEnabled: this.isReplicaAddressValidationEnabled,
enableAsyncCacheExceptionNoSharing: this.enableAsyncCacheExceptionNoSharing);

string location = this.endpointManager.GetLocation(endpoint);
AddressResolver addressResolver = new AddressResolver(null, new NullRequestSigner(), location);
Expand Down
Loading
Loading