diff --git a/Microsoft.Azure.Cosmos/src/Handler/RequestInvokerHandler.cs b/Microsoft.Azure.Cosmos/src/Handler/RequestInvokerHandler.cs index 01d37d3378..0c1a78ce79 100644 --- a/Microsoft.Azure.Cosmos/src/Handler/RequestInvokerHandler.cs +++ b/Microsoft.Azure.Cosmos/src/Handler/RequestInvokerHandler.cs @@ -90,20 +90,7 @@ public override async Task SendAsync( await request.AssertPartitioningDetailsAsync(this.client, cancellationToken, request.Trace); this.FillMultiMasterContext(request); - AvailabilityStrategyInternal strategy = this.AvailabilityStrategy(request); - - ResponseMessage response = strategy != null && strategy.Enabled() - ? await strategy.ExecuteAvailabilityStrategyAsync( - this.BaseSendAsync, - this.client, - request, - cancellationToken) - : await this.BaseSendAsync(request, cancellationToken); - - if (request.RequestOptions?.ExcludeRegions != null) - { - ((CosmosTraceDiagnostics)response.Diagnostics).Value.AddOrUpdateDatum("ExcludedRegions", request.RequestOptions.ExcludeRegions); - } + ResponseMessage response = await base.SendAsync(request, cancellationToken); if (ConfigurationManager.IsBinaryEncodingEnabled() && RequestInvokerHandler.IsPointOperationSupportedForBinaryEncoding(request) @@ -137,13 +124,6 @@ public AvailabilityStrategyInternal AvailabilityStrategy(RequestMessage request) return strategy as AvailabilityStrategyInternal; } - public virtual async Task BaseSendAsync( - RequestMessage request, - CancellationToken cancellationToken) - { - return await base.SendAsync(request, cancellationToken); - } - public virtual async Task SendAsync( string resourceUri, ResourceType resourceType, @@ -177,6 +157,166 @@ public virtual async Task SendAsync( return responseCreator(responseMessage); } + internal virtual async Task SendInternalAsync( + RequestMessage request, + ITrace childTrace, + string resourceUriString, + ResourceType resourceType, + OperationType operationType, + RequestOptions requestOptions, + ContainerInternal cosmosContainerCore, + FeedRange feedRange, + Stream streamPayload, + Action requestEnricher, + ITrace trace, + CancellationToken cancellationToken) + { + if (feedRange != null) + { + if (!request.OperationType.IsPointOperation()) + { + feedRange = await RequestInvokerHandler.ResolveFeedRangeBasedOnPrefixContainerAsync( + feedRange: feedRange, + cosmosContainerCore: cosmosContainerCore, + cancellationToken: cancellationToken); + } + + if (feedRange is FeedRangePartitionKey feedRangePartitionKey) + { + if (cosmosContainerCore == null && object.ReferenceEquals(feedRangePartitionKey.PartitionKey, Cosmos.PartitionKey.None)) + { + throw new ArgumentException($"{nameof(cosmosContainerCore)} can not be null with partition key as PartitionKey.None"); + } + else if (feedRangePartitionKey.PartitionKey.IsNone) + { + try + { + PartitionKeyInternal partitionKeyInternal = await cosmosContainerCore.GetNonePartitionKeyValueAsync( + childTrace, + cancellationToken); + request.Headers.PartitionKey = partitionKeyInternal.ToJsonString(); + } + catch (DocumentClientException dce) + { + return dce.ToCosmosResponseMessage(request); + } + catch (CosmosException ce) + { + return ce.ToCosmosResponseMessage(request); + } + } + else + { + request.Headers.PartitionKey = feedRangePartitionKey.PartitionKey.ToJsonString(); + } + } + else if (feedRange is FeedRangeEpk feedRangeEpk) + { + ContainerProperties collectionFromCache; + try + { + if (cosmosContainerCore == null) + { + throw new ArgumentException($"The container core can not be null for FeedRangeEpk"); + } + + collectionFromCache = await cosmosContainerCore.GetCachedContainerPropertiesAsync( + forceRefresh: false, + childTrace, + cancellationToken); + } + catch (CosmosException ex) + { + return ex.ToCosmosResponseMessage(request); + } + + PartitionKeyRangeCache routingMapProvider = await this.client.DocumentClient.GetPartitionKeyRangeCacheAsync(childTrace); + IReadOnlyList overlappingRanges = await routingMapProvider.TryGetOverlappingRangesAsync( + collectionFromCache.ResourceId, + feedRangeEpk.Range, + childTrace, + forceRefresh: false); + if (overlappingRanges == null) + { + CosmosException notFound = new CosmosException( + $"Stale cache for rid '{collectionFromCache.ResourceId}'", + statusCode: System.Net.HttpStatusCode.NotFound, + subStatusCode: default, + activityId: Guid.Empty.ToString(), + requestCharge: default); + return notFound.ToCosmosResponseMessage(request); + } + + // For epk range filtering we can end up in one of 3 cases: + if (overlappingRanges.Count > 1) + { + // 1) The EpkRange spans more than one physical partition + // In this case it means we have encountered a split and + // we need to bubble that up to the higher layers to update their datastructures + CosmosException goneException = new CosmosException( + message: $"Epk Range: {feedRangeEpk.Range} is gone.", + statusCode: System.Net.HttpStatusCode.Gone, + subStatusCode: (int)SubStatusCodes.PartitionKeyRangeGone, + activityId: Guid.NewGuid().ToString(), + requestCharge: default); + + return goneException.ToCosmosResponseMessage(request); + } + // overlappingRanges.Count == 1 + else + { + Range singleRange = overlappingRanges[0].ToRange(); + if ((singleRange.Min == feedRangeEpk.Range.Min) && (singleRange.Max == feedRangeEpk.Range.Max)) + { + // 2) The EpkRange spans exactly one physical partition + // In this case we can route to the physical pkrange id + request.PartitionKeyRangeId = new Documents.PartitionKeyRangeIdentity(overlappingRanges[0].Id); + } + else + { + // 3) The EpkRange spans less than single physical partition + // In this case we route to the physical partition and + // pass the epk range headers to filter within partition + request.PartitionKeyRangeId = new Documents.PartitionKeyRangeIdentity(overlappingRanges[0].Id); + request.Headers.ReadFeedKeyType = RntbdConstants.RntdbReadFeedKeyType.EffectivePartitionKeyRange.ToString(); + request.Headers.StartEpk = feedRangeEpk.Range.Min; + request.Headers.EndEpk = feedRangeEpk.Range.Max; + } + } + } + else + { + request.PartitionKeyRangeId = feedRange is FeedRangePartitionKeyRange feedRangePartitionKeyRange + ? new Documents.PartitionKeyRangeIdentity(feedRangePartitionKeyRange.PartitionKeyRangeId) + : throw new InvalidOperationException($"Unknown feed range type: '{feedRange.GetType()}'."); + } + } + + if (operationType == OperationType.Upsert) + { + request.Headers.IsUpsert = bool.TrueString; + } + else if (operationType == OperationType.Patch) + { + request.Headers.ContentType = RuntimeConstants.MediaTypes.JsonPatch; + } + + if (ChangeFeedHelper.IsChangeFeedWithQueryRequest(operationType, streamPayload != null)) + { + request.Headers.Add(HttpConstants.HttpHeaders.IsQuery, bool.TrueString); + request.Headers.Add(HttpConstants.HttpHeaders.ContentType, RuntimeConstants.MediaTypes.QueryJson); + } + + if (cosmosContainerCore != null) + { + request.ContainerId = cosmosContainerCore?.Id; + request.DatabaseId = cosmosContainerCore?.Database.Id; + } + requestEnricher?.Invoke(request); + + return await this.SendAsync(request, cancellationToken); + } + public virtual async Task SendAsync( string resourceUriString, ResourceType resourceType, @@ -226,150 +366,44 @@ public virtual async Task SendAsync( request.Headers.SDKSupportedCapabilities = Headers.SDKSUPPORTEDCAPABILITIES; - if (feedRange != null) - { - if (!request.OperationType.IsPointOperation()) - { - feedRange = await RequestInvokerHandler.ResolveFeedRangeBasedOnPrefixContainerAsync( - feedRange: feedRange, - cosmosContainerCore: cosmosContainerCore, - cancellationToken: cancellationToken); - } - - if (feedRange is FeedRangePartitionKey feedRangePartitionKey) - { - if (cosmosContainerCore == null && object.ReferenceEquals(feedRangePartitionKey.PartitionKey, Cosmos.PartitionKey.None)) - { - throw new ArgumentException($"{nameof(cosmosContainerCore)} can not be null with partition key as PartitionKey.None"); - } - else if (feedRangePartitionKey.PartitionKey.IsNone) - { - try - { - PartitionKeyInternal partitionKeyInternal = await cosmosContainerCore.GetNonePartitionKeyValueAsync( - childTrace, - cancellationToken); - request.Headers.PartitionKey = partitionKeyInternal.ToJsonString(); - } - catch (DocumentClientException dce) - { - return dce.ToCosmosResponseMessage(request); - } - catch (CosmosException ce) - { - return ce.ToCosmosResponseMessage(request); - } - } - else - { - request.Headers.PartitionKey = feedRangePartitionKey.PartitionKey.ToJsonString(); - } - } - else if (feedRange is FeedRangeEpk feedRangeEpk) - { - ContainerProperties collectionFromCache; - try - { - if (cosmosContainerCore == null) - { - throw new ArgumentException($"The container core can not be null for FeedRangeEpk"); - } - - collectionFromCache = await cosmosContainerCore.GetCachedContainerPropertiesAsync( - forceRefresh: false, - childTrace, - cancellationToken); - } - catch (CosmosException ex) - { - return ex.ToCosmosResponseMessage(request); - } - - PartitionKeyRangeCache routingMapProvider = await this.client.DocumentClient.GetPartitionKeyRangeCacheAsync(childTrace); - IReadOnlyList overlappingRanges = await routingMapProvider.TryGetOverlappingRangesAsync( - collectionFromCache.ResourceId, - feedRangeEpk.Range, - childTrace, - forceRefresh: false); - if (overlappingRanges == null) - { - CosmosException notFound = new CosmosException( - $"Stale cache for rid '{collectionFromCache.ResourceId}'", - statusCode: System.Net.HttpStatusCode.NotFound, - subStatusCode: default, - activityId: Guid.Empty.ToString(), - requestCharge: default); - return notFound.ToCosmosResponseMessage(request); - } - - // For epk range filtering we can end up in one of 3 cases: - if (overlappingRanges.Count > 1) - { - // 1) The EpkRange spans more than one physical partition - // In this case it means we have encountered a split and - // we need to bubble that up to the higher layers to update their datastructures - CosmosException goneException = new CosmosException( - message: $"Epk Range: {feedRangeEpk.Range} is gone.", - statusCode: System.Net.HttpStatusCode.Gone, - subStatusCode: (int)SubStatusCodes.PartitionKeyRangeGone, - activityId: Guid.NewGuid().ToString(), - requestCharge: default); - - return goneException.ToCosmosResponseMessage(request); - } - // overlappingRanges.Count == 1 - else - { - Range singleRange = overlappingRanges[0].ToRange(); - if ((singleRange.Min == feedRangeEpk.Range.Min) && (singleRange.Max == feedRangeEpk.Range.Max)) - { - // 2) The EpkRange spans exactly one physical partition - // In this case we can route to the physical pkrange id - request.PartitionKeyRangeId = new Documents.PartitionKeyRangeIdentity(overlappingRanges[0].Id); - } - else - { - // 3) The EpkRange spans less than single physical partition - // In this case we route to the physical partition and - // pass the epk range headers to filter within partition - request.PartitionKeyRangeId = new Documents.PartitionKeyRangeIdentity(overlappingRanges[0].Id); - request.Headers.ReadFeedKeyType = RntbdConstants.RntdbReadFeedKeyType.EffectivePartitionKeyRange.ToString(); - request.Headers.StartEpk = feedRangeEpk.Range.Min; - request.Headers.EndEpk = feedRangeEpk.Range.Max; - } - } - } - else - { - request.PartitionKeyRangeId = feedRange is FeedRangePartitionKeyRange feedRangePartitionKeyRange - ? new Documents.PartitionKeyRangeIdentity(feedRangePartitionKeyRange.PartitionKeyRangeId) - : throw new InvalidOperationException($"Unknown feed range type: '{feedRange.GetType()}'."); - } - } - - if (operationType == OperationType.Upsert) - { - request.Headers.IsUpsert = bool.TrueString; - } - else if (operationType == OperationType.Patch) - { - request.Headers.ContentType = RuntimeConstants.MediaTypes.JsonPatch; - } - - if (ChangeFeedHelper.IsChangeFeedWithQueryRequest(operationType, streamPayload != null)) - { - request.Headers.Add(HttpConstants.HttpHeaders.IsQuery, bool.TrueString); - request.Headers.Add(HttpConstants.HttpHeaders.ContentType, RuntimeConstants.MediaTypes.QueryJson); - } - - if (cosmosContainerCore != null) + AvailabilityStrategyInternal strategy = this.AvailabilityStrategy(request); + + ResponseMessage response = strategy != null && strategy.Enabled() + ? await strategy.ExecuteAvailabilityStrategyAsync( + this.SendInternalAsync, + this.client, + request, + childTrace, + resourceUriString, + resourceType, + operationType, + requestOptions, + cosmosContainerCore, + feedRange, + streamPayload, + requestEnricher, + trace, + cancellationToken) + : await this.SendInternalAsync( + request, + childTrace, + resourceUriString, + resourceType, + operationType, + requestOptions, + cosmosContainerCore, + feedRange, + streamPayload, + requestEnricher, + trace, + cancellationToken); + + if (request.RequestOptions?.ExcludeRegions != null) { - request.ContainerId = cosmosContainerCore?.Id; - request.DatabaseId = cosmosContainerCore?.Database.Id; + ((CosmosTraceDiagnostics)response.Diagnostics).Value.AddOrUpdateDatum("ExcludedRegions", request.RequestOptions.ExcludeRegions); } - requestEnricher?.Invoke(request); - return await this.SendAsync(request, cancellationToken); + return response; } finally { diff --git a/Microsoft.Azure.Cosmos/src/Routing/AvailabilityStrategy/AvailabilityStrategyInternal.cs b/Microsoft.Azure.Cosmos/src/Routing/AvailabilityStrategy/AvailabilityStrategyInternal.cs index 500a638de2..40beb0c360 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/AvailabilityStrategy/AvailabilityStrategyInternal.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/AvailabilityStrategy/AvailabilityStrategyInternal.cs @@ -5,8 +5,11 @@ namespace Microsoft.Azure.Cosmos { using System; + using System.IO; using System.Threading; using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.Tracing; + using Microsoft.Azure.Documents; internal abstract class AvailabilityStrategyInternal : AvailabilityStrategy { @@ -16,12 +19,43 @@ internal abstract class AvailabilityStrategyInternal : AvailabilityStrategy /// /// /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// /// /// The response from the service after the availability strategy is executed internal abstract Task ExecuteAvailabilityStrategyAsync( - Func> sender, + Func, + ITrace, CancellationToken, + Task> sender, CosmosClient client, RequestMessage requestMessage, + ITrace childTrace, + string resourceUriString, + ResourceType resourceType, + OperationType operationType, + RequestOptions requestOptions, + ContainerInternal cosmosContainerCore, + FeedRange feedRange, + Stream streamPayload, + Action requestEnricher, + ITrace trace, CancellationToken cancellationToken); /// diff --git a/Microsoft.Azure.Cosmos/src/Routing/AvailabilityStrategy/CrossRegionHedgingAvailabilityStrategy.cs b/Microsoft.Azure.Cosmos/src/Routing/AvailabilityStrategy/CrossRegionHedgingAvailabilityStrategy.cs index 01d7a513fa..09ca191df8 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/AvailabilityStrategy/CrossRegionHedgingAvailabilityStrategy.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/AvailabilityStrategy/CrossRegionHedgingAvailabilityStrategy.cs @@ -4,8 +4,10 @@ namespace Microsoft.Azure.Cosmos { using System; + using System.ClientModel.Primitives; using System.Collections.Generic; using System.Diagnostics; + using System.IO; using System.Linq; using System.Net; using System.Threading; @@ -24,7 +26,6 @@ namespace Microsoft.Azure.Cosmos internal class CrossRegionHedgingAvailabilityStrategy : AvailabilityStrategyInternal { private const string HedgeContext = "Hedge Context"; - private const string ResponseRegion = "Response Region"; /// /// Latency threshold which activates the first region hedging @@ -111,21 +112,65 @@ internal bool ShouldHedge(RequestMessage request, CosmosClient client) /// /// /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// /// /// The response after executing cross region hedging internal override async Task ExecuteAvailabilityStrategyAsync( - Func> sender, + Func, + ITrace, + CancellationToken, + Task> sender, CosmosClient client, RequestMessage request, + ITrace childTrace, + string resourceUriString, + ResourceType resourceType, + OperationType operationType, + RequestOptions requestOptions, + ContainerInternal cosmosContainerCore, + FeedRange feedRange, + Stream streamPayload, + Action requestEnricher, + ITrace trace, CancellationToken cancellationToken) { if (!this.ShouldHedge(request, client) || client.DocumentClient.GlobalEndpointManager.ReadEndpoints.Count == 1) { - return await sender(request, cancellationToken); + return await sender( + request, + childTrace, + resourceUriString, + resourceType, + operationType, + requestOptions, + cosmosContainerCore, + feedRange, + streamPayload, + requestEnricher, + trace, + cancellationToken); } - - ITrace trace = request.Trace; + + ITrace hedgingTrace = request.Trace; using (CancellationTokenSource cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)) { @@ -133,139 +178,179 @@ internal override async Task ExecuteAvailabilityStrategyAsync( ? null : await StreamExtension.AsClonableStreamAsync(request.Content))) { - IReadOnlyCollection hedgeRegions = client.DocumentClient.GlobalEndpointManager - .GetApplicableRegions( - request.RequestOptions?.ExcludeRegions, - OperationTypeExtensions.IsReadOperation(request.OperationType)); - - List requestTasks = new List(hedgeRegions.Count + 1); - - Task primaryRequest = null; - HedgingResponse hedgeResponse = null; - - //Send out hedged requests - for (int requestNumber = 0; requestNumber < hedgeRegions.Count; requestNumber++) + using (RequestMessage nonModifiedRequestClone = request.Clone(hedgingTrace, clonedBody)) { - TimeSpan awaitTime = requestNumber == 0 ? this.Threshold : this.ThresholdStep; + IReadOnlyCollection hedgeRegions = client.DocumentClient.GlobalEndpointManager + .GetApplicableRegions( + request.RequestOptions?.ExcludeRegions, + OperationTypeExtensions.IsReadOperation(request.OperationType)); - using (CancellationTokenSource timerTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)) - { - CancellationToken timerToken = timerTokenSource.Token; - using (Task hedgeTimer = Task.Delay(awaitTime, timerToken)) - { - if (requestNumber == 0) - { - primaryRequest = this.RequestSenderAndResultCheckAsync( - sender, - request, - hedgeRegions.ElementAt(requestNumber), - cancellationToken, - cancellationTokenSource, - trace); - - requestTasks.Add(primaryRequest); - } - else - { - Task requestTask = this.CloneAndSendAsync( - sender: sender, - request: request, - clonedBody: clonedBody, - hedgeRegions: hedgeRegions, - requestNumber: requestNumber, - trace: trace, - cancellationToken: cancellationToken, - cancellationTokenSource: cancellationTokenSource); - - requestTasks.Add(requestTask); - } - - requestTasks.Add(hedgeTimer); - - Task completedTask = await Task.WhenAny(requestTasks); - requestTasks.Remove(completedTask); + List requestTasks = new List(hedgeRegions.Count + 1); - if (completedTask == hedgeTimer) - { - continue; - } + Task primaryRequest = null; + HedgingResponse hedgeResponse = null; - timerTokenSource.Cancel(); - requestTasks.Remove(hedgeTimer); - - if (completedTask.IsFaulted) - { - AggregateException innerExceptions = completedTask.Exception.Flatten(); - } + //Send out hedged requests + for (int requestNumber = 0; requestNumber < hedgeRegions.Count; requestNumber++) + { + TimeSpan awaitTime = requestNumber == 0 ? this.Threshold : this.ThresholdStep; - hedgeResponse = await (Task)completedTask; - if (hedgeResponse.IsNonTransient) + using (CancellationTokenSource timerTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)) + { + CancellationToken timerToken = timerTokenSource.Token; + using (Task hedgeTimer = Task.Delay(awaitTime, timerToken)) { - cancellationTokenSource.Cancel(); - //Take is not inclusive, so we need to add 1 to the request number which starts at 0 - ((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum( - HedgeContext, - hedgeRegions.Take(requestNumber + 1)); - ((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum( - ResponseRegion, - hedgeResponse.ResponseRegion); - return hedgeResponse.ResponseMessage; + if (requestNumber == 0) + { + primaryRequest = this.RequestSenderAndResultCheckAsync( + sender, + request, + childTrace, + resourceUriString, + resourceType, + operationType, + requestOptions, + cosmosContainerCore, + feedRange, + streamPayload, + requestEnricher, + trace, + cancellationToken, + cancellationTokenSource, + hedgingTrace); + + requestTasks.Add(primaryRequest); + } + else + { + Task requestTask = this.CloneAndSendAsync( + sender: sender, + request: nonModifiedRequestClone, + childTrace: childTrace, + resourceUriString: resourceUriString, + resourceType: resourceType, + operationType: operationType, + requestOptions: requestOptions, + cosmosContainerCore: cosmosContainerCore, + feedRange: feedRange, + streamPayload: streamPayload, + requestEnricher: requestEnricher, + trace: trace, + clonedBody: clonedBody, + hedgeRegions: hedgeRegions, + requestNumber: requestNumber, + hedgingTrace: hedgingTrace, + cancellationToken: cancellationToken, + cancellationTokenSource: cancellationTokenSource); + + requestTasks.Add(requestTask); + } + + requestTasks.Add(hedgeTimer); + + Task completedTask = await Task.WhenAny(requestTasks); + requestTasks.Remove(completedTask); + + if (completedTask == hedgeTimer) + { + continue; + } + + timerTokenSource.Cancel(); + requestTasks.Remove(hedgeTimer); + + if (completedTask.IsFaulted) + { + AggregateException innerExceptions = completedTask.Exception.Flatten(); + } + + hedgeResponse = await (Task)completedTask; + if (hedgeResponse.IsNonTransient) + { + cancellationTokenSource.Cancel(); + //Take is not inclusive, so we need to add 1 to the request number which starts at 0 + ((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum( + HedgeContext, + hedgeRegions.Take(requestNumber + 1)); + return hedgeResponse.ResponseMessage; + } } } } - } - //Wait for a good response from the hedged requests/primary request - Exception lastException = null; - while (requestTasks.Any()) - { - Task completedTask = await Task.WhenAny(requestTasks); - requestTasks.Remove(completedTask); - if (completedTask.IsFaulted) + //Wait for a good response from the hedged requests/primary request + Exception lastException = null; + while (requestTasks.Any()) { - AggregateException innerExceptions = completedTask.Exception.Flatten(); - lastException = innerExceptions.InnerExceptions.FirstOrDefault(); + Task completedTask = await Task.WhenAny(requestTasks); + requestTasks.Remove(completedTask); + if (completedTask.IsFaulted) + { + AggregateException innerExceptions = completedTask.Exception.Flatten(); + lastException = innerExceptions.InnerExceptions.FirstOrDefault(); + } + + hedgeResponse = await (Task)completedTask; + if (hedgeResponse.IsNonTransient || requestTasks.Count == 0) + { + cancellationTokenSource.Cancel(); + ((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum( + HedgeContext, + hedgeRegions); + return hedgeResponse.ResponseMessage; + } } - hedgeResponse = await (Task)completedTask; - if (hedgeResponse.IsNonTransient || requestTasks.Count == 0) + if (lastException != null) { - cancellationTokenSource.Cancel(); - ((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum( - HedgeContext, - hedgeRegions); - ((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum( - ResponseRegion, - hedgeResponse.ResponseRegion); - return hedgeResponse.ResponseMessage; + throw lastException; } - } - if (lastException != null) - { - throw lastException; + Debug.Assert(hedgeResponse != null); + ((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum( + HedgeContext, + hedgeRegions); + return hedgeResponse.ResponseMessage; } - - Debug.Assert(hedgeResponse != null); - return hedgeResponse.ResponseMessage; } } } private async Task CloneAndSendAsync( - Func> sender, + Func, + ITrace, CancellationToken, + Task> sender, RequestMessage request, + ITrace childTrace, + string resourceUriString, + ResourceType resourceType, + OperationType operationType, + RequestOptions requestOptions, + ContainerInternal cosmosContainerCore, + FeedRange feedRange, + Stream streamPayload, + Action requestEnricher, + ITrace trace, CloneableStream clonedBody, IReadOnlyCollection hedgeRegions, int requestNumber, - ITrace trace, + ITrace hedgingTrace, CancellationToken cancellationToken, CancellationTokenSource cancellationTokenSource) { RequestMessage clonedRequest; using (clonedRequest = request.Clone( - trace, + hedgingTrace, clonedBody)) { clonedRequest.RequestOptions ??= new RequestOptions(); @@ -278,39 +363,79 @@ private async Task CloneAndSendAsync( return await this.RequestSenderAndResultCheckAsync( sender, clonedRequest, - region, + childTrace, + resourceUriString, + resourceType, + operationType, + requestOptions, + cosmosContainerCore, + feedRange, + streamPayload, + requestEnricher, + trace, cancellationToken, - cancellationTokenSource, - trace); + cancellationTokenSource, + hedgingTrace); } } private async Task RequestSenderAndResultCheckAsync( - Func> sender, + Func, + ITrace, CancellationToken, + Task> sender, RequestMessage request, - string hedgedRegion, + ITrace childTrace, + string resourceUriString, + ResourceType resourceType, + OperationType operationType, + RequestOptions requestOptions, + ContainerInternal cosmosContainerCore, + FeedRange feedRange, + Stream streamPayload, + Action requestEnricher, + ITrace trace, CancellationToken cancellationToken, CancellationTokenSource cancellationTokenSource, - ITrace trace) + ITrace hedgingTrace) { try { - ResponseMessage response = await sender.Invoke(request, cancellationToken); + ResponseMessage response = await sender.Invoke( + request, + childTrace, + resourceUriString, + resourceType, + operationType, + requestOptions, + cosmosContainerCore, + feedRange, + streamPayload, + requestEnricher, + trace, + cancellationToken); if (IsFinalResult((int)response.StatusCode, (int)response.Headers.SubStatusCode)) { if (!cancellationToken.IsCancellationRequested) { cancellationTokenSource.Cancel(); } - - return new HedgingResponse(true, response, hedgedRegion); + return new HedgingResponse(true, response); } - return new HedgingResponse(false, response, hedgedRegion); + return new HedgingResponse(false, response); } catch (OperationCanceledException oce ) when (cancellationTokenSource.IsCancellationRequested) { - throw new CosmosOperationCanceledException(oce, trace); + throw new CosmosOperationCanceledException(oce, hedgingTrace); } catch (Exception ex) { @@ -348,13 +473,11 @@ private sealed class HedgingResponse { public readonly bool IsNonTransient; public readonly ResponseMessage ResponseMessage; - public readonly string ResponseRegion; - public HedgingResponse(bool isNonTransient, ResponseMessage responseMessage, string responseRegion) + public HedgingResponse(bool isNonTransient, ResponseMessage responseMessage) { this.IsNonTransient = isNonTransient; this.ResponseMessage = responseMessage; - this.ResponseRegion = responseRegion; } } } diff --git a/Microsoft.Azure.Cosmos/src/Routing/AvailabilityStrategy/DisabledAvailabilityStrategy.cs b/Microsoft.Azure.Cosmos/src/Routing/AvailabilityStrategy/DisabledAvailabilityStrategy.cs index eef9705363..85696ccf26 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/AvailabilityStrategy/DisabledAvailabilityStrategy.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/AvailabilityStrategy/DisabledAvailabilityStrategy.cs @@ -4,8 +4,11 @@ namespace Microsoft.Azure.Cosmos { using System; + using System.IO; using System.Threading; using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.Tracing; + using Microsoft.Azure.Documents; /// /// A Disabled availability strategy that does not do anything. Used for overriding the default global availability strategy. @@ -24,14 +27,43 @@ internal override bool Enabled() /// /// /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// /// /// nothing, this will throw. internal override Task ExecuteAvailabilityStrategyAsync( Func> sender, + ITrace, + string, + ResourceType, + OperationType, + RequestOptions, + ContainerInternal, + FeedRange, + Stream, + Action, + ITrace, CancellationToken, + Task> sender, CosmosClient client, RequestMessage requestMessage, + ITrace childTrace, + string resourceUriString, + ResourceType resourceType, + OperationType operationType, + RequestOptions requestOptions, + ContainerInternal cosmosContainerCore, + FeedRange feedRange, + Stream streamPayload, + Action requestEnricher, + ITrace trace, CancellationToken cancellationToken) { throw new NotImplementedException(); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosAvailabilityStrategyTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosAvailabilityStrategyTests.cs index c95073aa69..14cfd2c55b 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosAvailabilityStrategyTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosAvailabilityStrategyTests.cs @@ -255,14 +255,14 @@ public async Task AvailabilityStrategyNoTriggerTest(bool isPreferredLocationsEmp Database database = faultInjectionClient.GetDatabase(MultiRegionSetupHelpers.dbName); Container container = database.GetContainer(MultiRegionSetupHelpers.containerName); + //warm up connections read + ItemResponse _ = await container.ReadItemAsync("testId", new PartitionKey("pk")); + responseDelay.Enable(); ItemResponse ir = await container.ReadItemAsync("testId", new PartitionKey("pk")); CosmosTraceDiagnostics traceDiagnostic = ir.Diagnostics as CosmosTraceDiagnostics; Assert.IsNotNull(traceDiagnostic); - traceDiagnostic.Value.Data.TryGetValue("Response Region", out object responseRegion); - Assert.IsNotNull(responseRegion); - Assert.AreEqual(region1, (string)responseRegion); //Should send out hedge request but original should be returned traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out object hedgeContext); @@ -325,6 +325,9 @@ public async Task AvailabilityStrategyRequestOptionsTriggerTest(bool isPreferred Database database = faultInjectionClient.GetDatabase(MultiRegionSetupHelpers.dbName); Container container = database.GetContainer(MultiRegionSetupHelpers.containerName); + //warm up connections read + ItemResponse _ = await container.ReadItemAsync("testId", new PartitionKey("pk")); + responseDelay.Enable(); ItemRequestOptions requestOptions = new ItemRequestOptions @@ -340,9 +343,9 @@ public async Task AvailabilityStrategyRequestOptionsTriggerTest(bool isPreferred CosmosTraceDiagnostics traceDiagnostic = ir.Diagnostics as CosmosTraceDiagnostics; Assert.IsNotNull(traceDiagnostic); - traceDiagnostic.Value.Data.TryGetValue("Response Region", out object hedgeContext); + traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out object hedgeContext); Assert.IsNotNull(hedgeContext); - Assert.AreEqual(region2, (string)hedgeContext); + Assert.IsTrue(((IReadOnlyCollection)hedgeContext).Contains(region2)); } } @@ -523,7 +526,8 @@ public async Task AvailabilityStrategyAllFaultsTests(string operation, string co AvailabilityStrategy = AvailabilityStrategy.CrossRegionHedgingStrategy( threshold: TimeSpan.FromMilliseconds(100), thresholdStep: TimeSpan.FromMilliseconds(50)), - Serializer = this.cosmosSystemTextJsonSerializer + Serializer = this.cosmosSystemTextJsonSerializer, + ConsistencyLevel = ConsistencyLevel.Session, }; using (CosmosClient faultInjectionClient = new CosmosClient( @@ -533,6 +537,9 @@ public async Task AvailabilityStrategyAllFaultsTests(string operation, string co Database database = faultInjectionClient.GetDatabase(MultiRegionSetupHelpers.dbName); Container container = database.GetContainer(MultiRegionSetupHelpers.containerName); + //warm up connections read + ItemResponse _ = await container.ReadItemAsync("testId", new PartitionKey("pk")); + CosmosTraceDiagnostics traceDiagnostic; object hedgeContext; @@ -556,9 +563,9 @@ public async Task AvailabilityStrategyAllFaultsTests(string operation, string co Assert.IsTrue(rule.GetHitCount() > 0); traceDiagnostic = ir.Diagnostics as CosmosTraceDiagnostics; Assert.IsNotNull(traceDiagnostic); - traceDiagnostic.Value.Data.TryGetValue("Response Region", out hedgeContext); + traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out hedgeContext); Assert.IsNotNull(hedgeContext); - Assert.AreEqual(region2, (string)hedgeContext); + Assert.IsTrue(((IReadOnlyCollection)hedgeContext).Contains(region2)); break; @@ -588,9 +595,9 @@ public async Task AvailabilityStrategyAllFaultsTests(string operation, string co Assert.IsTrue(rule.GetHitCount() > 0); traceDiagnostic = feedResponse.Diagnostics as CosmosTraceDiagnostics; Assert.IsNotNull(traceDiagnostic); - traceDiagnostic.Value.Data.TryGetValue("Response Region", out hedgeContext); + traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out hedgeContext); Assert.IsNotNull(hedgeContext); - Assert.AreEqual(region2, (string)hedgeContext); + Assert.IsTrue(((IReadOnlyCollection)hedgeContext).Contains(region2)); } break; @@ -619,9 +626,9 @@ public async Task AvailabilityStrategyAllFaultsTests(string operation, string co Assert.IsTrue(rule.GetHitCount() > 0); traceDiagnostic = feedResponse.Diagnostics as CosmosTraceDiagnostics; Assert.IsNotNull(traceDiagnostic); - traceDiagnostic.Value.Data.TryGetValue("Response Region", out hedgeContext); + traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out hedgeContext); Assert.IsNotNull(hedgeContext); - Assert.AreEqual(region2, (string)hedgeContext); + Assert.IsTrue(((IReadOnlyCollection)hedgeContext).Contains(region2)); } break; @@ -649,9 +656,9 @@ public async Task AvailabilityStrategyAllFaultsTests(string operation, string co Assert.IsTrue(rule.GetHitCount() > 0); traceDiagnostic = readManyResponse.Diagnostics as CosmosTraceDiagnostics; Assert.IsNotNull(traceDiagnostic); - traceDiagnostic.Value.Data.TryGetValue("Response Region", out hedgeContext); + traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out hedgeContext); Assert.IsNotNull(hedgeContext); - Assert.AreEqual(region2, (string)hedgeContext); + Assert.IsTrue(((IReadOnlyCollection)hedgeContext).Contains(region2)); break; @@ -750,6 +757,9 @@ public async Task AvailabilityStrategyStepTests(string operation, string condito Database database = faultInjectionClient.GetDatabase(MultiRegionSetupHelpers.dbName); Container container = database.GetContainer(MultiRegionSetupHelpers.containerName); + //warm up connections read + ItemResponse _ = await container.ReadItemAsync("testId", new PartitionKey("pk")); + CosmosTraceDiagnostics traceDiagnostic; object hedgeContext; @@ -765,9 +775,9 @@ public async Task AvailabilityStrategyStepTests(string operation, string condito traceDiagnostic = ir.Diagnostics as CosmosTraceDiagnostics; Assert.IsNotNull(traceDiagnostic); - traceDiagnostic.Value.Data.TryGetValue("Response Region", out hedgeContext); + traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out hedgeContext); Assert.IsNotNull(hedgeContext); - Assert.AreEqual(region3, (string)hedgeContext); + Assert.IsTrue(((IReadOnlyCollection)hedgeContext).Contains(region3)); break; @@ -792,9 +802,9 @@ public async Task AvailabilityStrategyStepTests(string operation, string condito traceDiagnostic = feedResponse.Diagnostics as CosmosTraceDiagnostics; Assert.IsNotNull(traceDiagnostic); - traceDiagnostic.Value.Data.TryGetValue("Response Region", out hedgeContext); + traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out hedgeContext); Assert.IsNotNull(hedgeContext); - Assert.AreEqual(region3, (string)hedgeContext); + Assert.IsTrue(((IReadOnlyCollection)hedgeContext).Contains(region3)); } break; @@ -813,9 +823,9 @@ public async Task AvailabilityStrategyStepTests(string operation, string condito traceDiagnostic = feedResponse.Diagnostics as CosmosTraceDiagnostics; Assert.IsNotNull(traceDiagnostic); - traceDiagnostic.Value.Data.TryGetValue("Response Region", out hedgeContext); + traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out hedgeContext); Assert.IsNotNull(hedgeContext); - Assert.AreEqual(region3, (string)hedgeContext); + Assert.IsTrue(((IReadOnlyCollection)hedgeContext).Contains(region3)); } break; @@ -835,9 +845,9 @@ public async Task AvailabilityStrategyStepTests(string operation, string condito traceDiagnostic = readManyResponse.Diagnostics as CosmosTraceDiagnostics; Assert.IsNotNull(traceDiagnostic); - traceDiagnostic.Value.Data.TryGetValue("Response Region", out hedgeContext); + traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out hedgeContext); Assert.IsNotNull(hedgeContext); - Assert.AreEqual(region3, (string)hedgeContext); + Assert.IsTrue(((IReadOnlyCollection)hedgeContext).Contains(region3)); break; @@ -944,9 +954,9 @@ public async Task AvailabilityStrategyMultiMasterWriteBeforeTest() CosmosTraceDiagnostics traceDiagnostic = ir.Diagnostics as CosmosTraceDiagnostics; Assert.IsNotNull(traceDiagnostic); - traceDiagnostic.Value.Data.TryGetValue("Response Region", out object hedgeContext); + traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out object hedgeContext); Assert.IsNotNull(hedgeContext); - Assert.AreEqual(region2, (string)hedgeContext); + Assert.IsTrue(((IReadOnlyCollection)hedgeContext).Contains(region2)); } } @@ -1016,9 +1026,9 @@ public async Task AvailabilityStrategyMultiMasterWriteAfterTest() CosmosTraceDiagnostics traceDiagnostic = ex.Diagnostics as CosmosTraceDiagnostics; Assert.IsNotNull(traceDiagnostic); - traceDiagnostic.Value.Data.TryGetValue("Response Region", out object hedgeContext); + traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out object hedgeContext); Assert.IsNotNull(hedgeContext); - Assert.AreEqual(region2, (string)hedgeContext); + Assert.IsTrue(((IReadOnlyCollection)hedgeContext).Contains(region2)); } finally { @@ -1119,9 +1129,9 @@ await this.container.DeleteItemAsync( CosmosTraceDiagnostics traceDiagnostic = ir.Diagnostics as CosmosTraceDiagnostics; Assert.IsNotNull(traceDiagnostic); - traceDiagnostic.Value.Data.TryGetValue("Response Region", out object hedgeContext); + traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out object hedgeContext); Assert.IsNotNull(hedgeContext); - Assert.AreEqual(region3, (string)hedgeContext); + Assert.IsTrue(((IReadOnlyCollection)hedgeContext).Contains(region3)); } } @@ -1218,9 +1228,9 @@ await this.container.DeleteItemAsync( CosmosTraceDiagnostics traceDiagnostic = ex.Diagnostics as CosmosTraceDiagnostics; Assert.IsNotNull(traceDiagnostic); - traceDiagnostic.Value.Data.TryGetValue("Response Region", out object hedgeContext); + traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out object hedgeContext); Assert.IsNotNull(hedgeContext); - Assert.AreEqual(region3, (string)hedgeContext); + Assert.IsTrue(((IReadOnlyCollection)hedgeContext).Contains(region3)); } finally { @@ -1284,6 +1294,105 @@ public async Task AvailabilityStrategyWithCancellationTokenThrowsExceptionTest() } + [TestMethod] + [TestCategory("MultiMaster")] + public async Task HedgingCancellationTokenHandling() + { + List feedRanges = (List)await this.container.GetFeedRangesAsync(); + Assert.IsTrue(feedRanges.Any()); + + try + { + await this.container.DeleteItemAsync("deleteMe", new PartitionKey("MMWrite")); + } + catch (Exception) { } + + + FaultInjectionRule sendDelay = new FaultInjectionRuleBuilder( + id: "sendDelay", + condition: + new FaultInjectionConditionBuilder() + .WithRegion(region1) + .WithConnectionType(FaultInjectionConnectionType.Gateway) + .WithEndpoint( + new FaultInjectionEndpointBuilder( + MultiRegionSetupHelpers.dbName, + MultiRegionSetupHelpers.containerName, + feedRanges[0]) + .WithIncludePrimary(true) + .WithReplicaCount(4) + .Build()) + .Build(), + result: + FaultInjectionResultBuilder.GetResultBuilder(FaultInjectionServerErrorType.SendDelay) + .WithDelay(TimeSpan.FromMilliseconds(8000)) + .Build()) + .WithDuration(TimeSpan.FromMinutes(90)) + .Build(); + + List rules = new List() { sendDelay }; + FaultInjector faultInjector = new FaultInjector(rules); + + sendDelay.Disable(); + + CosmosClientOptions clientOptions = new CosmosClientOptions() + { + ConnectionMode = ConnectionMode.Direct, + ApplicationPreferredRegions = new List() { region1, region2 }, + Serializer = this.cosmosSystemTextJsonSerializer, + RequestTimeout = TimeSpan.FromMilliseconds(5000) + }; + + using (CosmosClient faultInjectionClient = new CosmosClient( + connectionString: this.connectionString, + clientOptions: faultInjector.GetFaultInjectionClientOptions(clientOptions))) + { + Database database = faultInjectionClient.GetDatabase(MultiRegionSetupHelpers.dbName); + Container container = database.GetContainer(MultiRegionSetupHelpers.containerName); + + sendDelay.Enable(); + + CancellationTokenSource cts = new CancellationTokenSource(); + cts.CancelAfter(TimeSpan.FromSeconds(5)); // Cancellation token expiry time is 5 seconds. + + ItemRequestOptions requestOptions = new ItemRequestOptions + { + AvailabilityStrategy = new CrossRegionHedgingAvailabilityStrategy( + threshold: TimeSpan.FromMilliseconds(100), + thresholdStep: TimeSpan.FromMilliseconds(50), + enableMultiWriteRegionHedge: true) + }; + + CosmosIntegrationTestObject CosmosIntegrationTestObject = new CosmosIntegrationTestObject + { + Id = "deleteMe", + Pk = "MMWrite", + Other = "test" + }; + + try + { + ItemResponse ir = await container.CreateItemAsync( + CosmosIntegrationTestObject, + requestOptions: requestOptions, + cancellationToken: cts.Token); + + CosmosTraceDiagnostics traceDiagnostic = ir.Diagnostics as CosmosTraceDiagnostics; + Assert.IsNotNull(traceDiagnostic); + traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out object hedgeContext); + Assert.IsNotNull(hedgeContext); + Assert.IsTrue(((IReadOnlyCollection)hedgeContext).Contains(region2)); + } + catch (CosmosException ex) + { + Assert.Fail(ex.Message); + } + + + sendDelay.Disable(); + } + } + private static async Task HandleChangesAsync( ChangeFeedProcessorContext context, IReadOnlyCollection changes, @@ -1296,9 +1405,9 @@ private static async Task HandleChangesAsync( CosmosTraceDiagnostics traceDiagnostic = context.Diagnostics as CosmosTraceDiagnostics; Assert.IsNotNull(traceDiagnostic); - traceDiagnostic.Value.Data.TryGetValue("Response Region", out object hedgeContext); + traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out object hedgeContext); Assert.IsNotNull(hedgeContext); - Assert.AreNotEqual(region1, (string)hedgeContext); + Assert.IsTrue(((IReadOnlyCollection)hedgeContext).Contains(region1)); await Task.Delay(1); } @@ -1314,10 +1423,9 @@ private static async Task HandleChangesStepAsync( CosmosTraceDiagnostics traceDiagnostic = context.Diagnostics as CosmosTraceDiagnostics; Assert.IsNotNull(traceDiagnostic); - traceDiagnostic.Value.Data.TryGetValue("Response Region", out object hedgeContext); + traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out object hedgeContext); Assert.IsNotNull(hedgeContext); - Assert.AreNotEqual(region1, (string)hedgeContext); - Assert.AreNotEqual(region2, (string)hedgeContext); + Assert.IsTrue(((IReadOnlyCollection)hedgeContext).Contains(region3)); await Task.Delay(1); } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/AvailabilityStrategyUnitTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/AvailabilityStrategyUnitTests.cs index af411016ca..b3a12061fe 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/AvailabilityStrategyUnitTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/AvailabilityStrategyUnitTests.cs @@ -8,6 +8,7 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Cosmos; + using Microsoft.Azure.Cosmos.Tracing; using Microsoft.Azure.Documents; using Microsoft.VisualStudio.TestTools.UnitTesting; @@ -89,10 +90,38 @@ public async Task CancellationTokenThrowsExceptionTest() using CosmosClient mockCosmosClient = MockCosmosUtil.CreateMockCosmosClient(); mockCosmosClient.DocumentClient.GlobalEndpointManager.InitializeAccountPropertiesAndStartBackgroundRefresh(databaseAccount); - Func> sender = (request, token) => throw new OperationCanceledException("operation cancellation requested"); + Func, + ITrace, + CancellationToken, + Task> sender = (request, _, _, _, _, _, _, _, _, _, _, token) => throw new OperationCanceledException("operation cancellation requested"); + + Action mockAction = delegate(RequestMessage message) { }; CosmosOperationCanceledException cancelledException = await Assert.ThrowsExceptionAsync(() => - availabilityStrategy.ExecuteAvailabilityStrategyAsync(sender, mockCosmosClient, request, cts.Token)); + availabilityStrategy.ExecuteAvailabilityStrategyAsync( + sender, + mockCosmosClient, + request, + request.Trace, + string.Empty, + ResourceType.Document, + OperationType.Read, + null, + null, + null, + Stream.Null, + mockAction, + request.Trace, + cts.Token)); }