Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
e18aa95
Initial code changes to throw 503 on 429/3092.
kundadebdatta Sep 11, 2024
e44f080
Updated client retry policy. Added more tests to cover 429/3092.
kundadebdatta Sep 11, 2024
9006b5f
Code changes to update direct package version. Updating the tests.
kundadebdatta Sep 12, 2024
e7710c1
Code changes to refactor client retry policy.
kundadebdatta Sep 12, 2024
5a1b505
Minor code cleanup.
kundadebdatta Sep 12, 2024
ebf1a11
Merge branch 'master' into users/kundadebdatta/4390_cross_regional_re…
kundadebdatta Sep 12, 2024
91cc3ea
Reverting the direct version bump up change.
kundadebdatta Sep 13, 2024
b479714
Merge branch 'master' into users/kundadebdatta/4656_cross_regional_re…
kundadebdatta Sep 13, 2024
6ebb9e6
Code changes to address some of the review comments.
kundadebdatta Sep 13, 2024
47cacdf
Merge branch 'master' into users/kundadebdatta/4656_cross_regional_re…
kundadebdatta Sep 13, 2024
e6e2766
Code changes to move failover logic in client retry policy.
kundadebdatta Sep 14, 2024
9575986
Minor code clean up.
kundadebdatta Sep 14, 2024
0c5304d
Code changes to clean up some cosmetic items.
kundadebdatta Sep 15, 2024
ce3c62f
Further clean up.
kundadebdatta Sep 15, 2024
bf5e649
Merge branch 'master' into users/kundadebdatta/4656_cross_regional_re…
kundadebdatta Sep 16, 2024
973f1f1
Code changes to address review comments.
kundadebdatta Sep 17, 2024
40560cc
Minor refactor to address cosmetic update.
kundadebdatta Sep 17, 2024
a198a04
Code changes to address cosmetic review comment.
kundadebdatta Sep 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@ public virtual async Task<TransactionalBatchOperationResult> AddAsync(
ItemBatchOperationContext context = new ItemBatchOperationContext(
resolvedPartitionKeyRangeId,
trace,
BatchAsyncContainerExecutor.GetRetryPolicy(this.cosmosContainer, operation.OperationType, this.retryOptions));
BatchAsyncContainerExecutor.GetRetryPolicy(
this.cosmosContainer,
this.cosmosClientContext?.DocumentClient?.GlobalEndpointManager,
operation.OperationType,
this.retryOptions));

if (itemRequestOptions != null && itemRequestOptions.AddRequestHeaders != null)
{
Expand Down Expand Up @@ -159,6 +163,7 @@ internal virtual async Task ValidateOperationAsync(

private static IDocumentClientRetryPolicy GetRetryPolicy(
ContainerInternal containerInternal,
GlobalEndpointManager endpointManager,
OperationType operationType,
RetryOptions retryOptions)
{
Expand All @@ -167,6 +172,7 @@ private static IDocumentClientRetryPolicy GetRetryPolicy(
operationType,
new ResourceThrottleRetryPolicy(
retryOptions.MaxRetryAttemptsOnThrottledRequests,
endpointManager,
retryOptions.MaxRetryWaitTimeInSeconds));
}

Expand Down
67 changes: 55 additions & 12 deletions Microsoft.Azure.Cosmos/src/ClientRetryPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public ClientRetryPolicy(
{
this.throttlingRetry = new ResourceThrottleRetryPolicy(
retryOptions.MaxRetryAttemptsOnThrottledRequests,
endpointManager: globalEndpointManager,
retryOptions.MaxRetryWaitTimeInSeconds);

this.globalEndpointManager = globalEndpointManager;
Expand Down Expand Up @@ -120,7 +121,18 @@ public async Task<ShouldRetryResult> ShouldRetryAsync(
}
}

return await this.throttlingRetry.ShouldRetryAsync(exception, cancellationToken);
ShouldRetryResult throttleRetryResult = await this.throttlingRetry.ShouldRetryAsync(exception, cancellationToken);

// Today, the only scenario where we would receive a ServiceUnavailableException from the Throttling Retry Policy
// is when we get 410 (Gone) with sub status code 3092 (System Resource Not Available). Note that this is applicable
// for write requests targeted to a multiple master account. In such case, the 410/3092 will get converted into 503.
if (throttleRetryResult.ExceptionToThrow is ServiceUnavailableException)
{
return this.TryMarkEndpointUnavailableForPkRangeAndRetryOnServiceUnavailable(
shouldMarkEndpointUnavailableForPkRange: true);
}

return throttleRetryResult;
}

/// <summary>
Expand All @@ -143,7 +155,18 @@ public async Task<ShouldRetryResult> ShouldRetryAsync(
return shouldRetryResult;
}

return await this.throttlingRetry.ShouldRetryAsync(cosmosResponseMessage, cancellationToken);
ShouldRetryResult throttleRetryResult = await this.throttlingRetry.ShouldRetryAsync(cosmosResponseMessage, cancellationToken);

// Today, the only scenario where we would receive a ServiceUnavailableException from the Throttling Retry Policy
// is when we get 410 (Gone) with sub status code 3092 (System Resource Not Available). Note that this is applicable
// for write requests targeted to a multiple master account. In such case, the 410/3092 will get converted into 503.
if (throttleRetryResult.ExceptionToThrow is ServiceUnavailableException)
{
return this.TryMarkEndpointUnavailableForPkRangeAndRetryOnServiceUnavailable(
shouldMarkEndpointUnavailableForPkRange: true);
}

return throttleRetryResult;
}

/// <summary>
Expand Down Expand Up @@ -177,6 +200,7 @@ public void OnBeforeSendRequest(DocumentServiceRequest request)
// This enables marking the endpoint unavailability on endpoint failover/unreachability
this.locationEndpoint = this.globalEndpointManager.ResolveServiceEndpoint(request);
request.RequestContext.RouteToLocation(this.locationEndpoint);
this.throttlingRetry.OnBeforeSendRequest(request);
}

private async Task<ShouldRetryResult> ShouldRetryInternalAsync(
Expand Down Expand Up @@ -274,16 +298,8 @@ private async Task<ShouldRetryResult> ShouldRetryInternalAsync(
// Received 503 due to client connect timeout or Gateway
if (statusCode == HttpStatusCode.ServiceUnavailable)
{
DefaultTrace.TraceWarning("ClientRetryPolicy: ServiceUnavailable. Refresh cache and retry. Failed Location: {0}; ResourceAddress: {1}",
this.documentServiceRequest?.RequestContext?.LocationEndpointToRoute?.ToString() ?? string.Empty,
this.documentServiceRequest?.ResourceAddress ?? string.Empty);

// Mark the partition as unavailable.
// Let the ClientRetry logic decide if the request should be retried
this.partitionKeyRangeLocationCache.TryMarkEndpointUnavailableForPartitionKeyRange(
this.documentServiceRequest);

return this.ShouldRetryOnServiceUnavailable();
return this.TryMarkEndpointUnavailableForPkRangeAndRetryOnServiceUnavailable(
shouldMarkEndpointUnavailableForPkRange: true);
}

return null;
Expand Down Expand Up @@ -406,6 +422,33 @@ private ShouldRetryResult ShouldRetryOnSessionNotAvailable(DocumentServiceReques
}
}

/// <summary>
/// Attempts to mark the endpoint associated with the current partition key range as unavailable and determines if
/// a retry should be performed due to a ServiceUnavailable (503) response. This method is invoked when a 503
/// Service Unavailable response is received, indicating that the service might be temporarily unavailable.
/// It optionally marks the partition key range as unavailable, which will influence future routing decisions.
/// </summary>
/// <param name="shouldMarkEndpointUnavailableForPkRange">A boolean flag indicating whether the endpoint for the
/// current partition key range should be marked as unavailable.</param>
/// <returns>An instance of <see cref="ShouldRetryResult"/> indicating whether the operation should be retried.</returns>
private ShouldRetryResult TryMarkEndpointUnavailableForPkRangeAndRetryOnServiceUnavailable(
bool shouldMarkEndpointUnavailableForPkRange)
{
DefaultTrace.TraceWarning("ClientRetryPolicy: ServiceUnavailable. Refresh cache and retry. Failed Location: {0}; ResourceAddress: {1}",
this.documentServiceRequest?.RequestContext?.LocationEndpointToRoute?.ToString() ?? string.Empty,
this.documentServiceRequest?.ResourceAddress ?? string.Empty);

if (shouldMarkEndpointUnavailableForPkRange)
{
// Mark the partition as unavailable.
// Let the ClientRetry logic decide if the request should be retried
this.partitionKeyRangeLocationCache.TryMarkEndpointUnavailableForPartitionKeyRange(
this.documentServiceRequest);
}

return this.ShouldRetryOnServiceUnavailable();
}

/// <summary>
/// For a ServiceUnavailable (503.0) we could be having a timeout from Direct/TCP locally or a request to Gateway request with a similar response due to an endpoint not yet available.
/// We try and retry the request only if there are other regions available. The retry logic is applicable for single master write accounts as well.
Expand Down
5 changes: 3 additions & 2 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -988,8 +988,9 @@ internal virtual void Initialize(Uri serviceEndpoint,
this.initializeTaskFactory = (_) => TaskHelper.InlineIfPossible<bool>(
() => this.GetInitializationTaskAsync(storeClientFactory: storeClientFactory),
new ResourceThrottleRetryPolicy(
this.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests,
this.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds));
maxAttemptCount: this.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests,
endpointManager: this.GlobalEndpointManager,
maxWaitTimeInSeconds: this.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds));

// Create the task to start the initialize task
// Task will be awaited on in the EnsureValidClientAsync
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public MetadataRequestThrottleRetryPolicy(

this.throttlingRetryPolicy = new ResourceThrottleRetryPolicy(
maxRetryAttemptsOnThrottledRequests,
this.globalEndpointManager,
maxRetryWaitTimeInSeconds);

this.retryContext = new MetadataRetryContext
Expand Down
38 changes: 35 additions & 3 deletions Microsoft.Azure.Cosmos/src/ResourceThrottleRetryPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace Microsoft.Azure.Cosmos
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Documents;

// Retry when we receive the throttling from server.
Expand All @@ -19,12 +20,15 @@ internal sealed class ResourceThrottleRetryPolicy : IDocumentClientRetryPolicy
private readonly uint backoffDelayFactor;
private readonly int maxAttemptCount;
private readonly TimeSpan maxWaitTimeInMilliseconds;
private readonly IGlobalEndpointManager globalEndpointManager;

private int currentAttemptCount;
private TimeSpan cumulativeRetryDelay;
private bool? isMultiMasterWriteRegion;

public ResourceThrottleRetryPolicy(
int maxAttemptCount,
IGlobalEndpointManager endpointManager,
int maxWaitTimeInSeconds = DefaultMaxWaitTimeInSeconds,
uint backoffDelayFactor = 1)
{
Expand All @@ -33,6 +37,7 @@ public ResourceThrottleRetryPolicy(
throw new ArgumentException("maxWaitTimeInSeconds", "maxWaitTimeInSeconds must be less than " + (int.MaxValue / 1000));
}

this.globalEndpointManager = endpointManager;
this.maxAttemptCount = maxAttemptCount;
this.backoffDelayFactor = backoffDelayFactor;
this.maxWaitTimeInMilliseconds = TimeSpan.FromSeconds(maxWaitTimeInSeconds);
Expand All @@ -59,7 +64,9 @@ public Task<ShouldRetryResult> ShouldRetryAsync(
return Task.FromResult(ShouldRetryResult.NoRetry());
}

return this.ShouldRetryInternalAsync(dce.RetryAfter);
return this.ShouldRetryInternalAsync(
dce?.GetSubStatus(),
dce?.RetryAfter);
}

DefaultTrace.TraceError(
Expand Down Expand Up @@ -88,11 +95,34 @@ public Task<ShouldRetryResult> ShouldRetryAsync(
return Task.FromResult(ShouldRetryResult.NoRetry());
}

return this.ShouldRetryInternalAsync(cosmosResponseMessage?.Headers.RetryAfter);
return this.ShouldRetryInternalAsync(
cosmosResponseMessage?.Headers.SubStatusCode,
cosmosResponseMessage?.Headers.RetryAfter,
cosmosResponseMessage?.CosmosException);
}

private Task<ShouldRetryResult> ShouldRetryInternalAsync(TimeSpan? retryAfter)
private Task<ShouldRetryResult> ShouldRetryInternalAsync(
SubStatusCodes? subStatusCode,
TimeSpan? retryAfter,
Exception exception = null)
{
if (this.isMultiMasterWriteRegion.HasValue
&& this.isMultiMasterWriteRegion.Value
&& subStatusCode != null
&& subStatusCode == SubStatusCodes.SystemResourceUnavailable)
{
DefaultTrace.TraceError(
"Operation will NOT be retried. Converting SystemResourceUnavailable (429/3092) to ServiceUnavailable (503). Current attempt {0} sub status code: {1}.",
this.currentAttemptCount, SubStatusCodes.SystemResourceUnavailable);

ServiceUnavailableException exceptionToThrow = ServiceUnavailableException.Create(
SubStatusCodes.SystemResourceUnavailable,
innerException: exception);

return Task.FromResult(
ShouldRetryResult.NoRetry(exceptionToThrow));
}

TimeSpan retryDelay = TimeSpan.Zero;
if (this.currentAttemptCount < this.maxAttemptCount &&
this.CheckIfRetryNeeded(retryAfter, out retryDelay))
Expand Down Expand Up @@ -133,6 +163,8 @@ private object GetExceptionMessage(Exception exception)
/// <param name="request">The request being sent to the service.</param>
public void OnBeforeSendRequest(DocumentServiceRequest request)
{
this.isMultiMasterWriteRegion = !request.IsReadOnlyRequest
&& (this.globalEndpointManager?.CanSupportMultipleWriteLocations(request) ?? false);
}

/// <summary>
Expand Down
12 changes: 12 additions & 0 deletions Microsoft.Azure.Cosmos/src/Routing/GlobalEndpointManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,18 @@ public virtual async Task RefreshLocationAsync(bool forceRefresh = false)

await this.RefreshDatabaseAccountInternalAsync(forceRefresh: forceRefresh);
}

/// <summary>
/// Determines whether the current configuration and state of the service allow for supporting multiple write locations.
/// This method returns True is the AvailableWriteLocations in LocationCache is more than 1. Otherwise, it returns False.
/// </summary>
/// <param name="request">The document service request for which the write location support is being evaluated.</param>
/// <returns>A boolean flag indicating if the available write locations are more than one.</returns>
public bool CanSupportMultipleWriteLocations(DocumentServiceRequest request)
{
return this.CanUseMultipleWriteLocations(request)
&& this.locationCache.GetAvailableWriteLocations()?.Count > 1;
}

#pragma warning disable VSTHRD100 // Avoid async void methods
private async void StartLocationBackgroundRefreshLoop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,7 @@ internal interface IGlobalEndpointManager : IDisposable
ReadOnlyDictionary<string, Uri> GetAvailableWriteEndpointsByLocation();

ReadOnlyDictionary<string, Uri> GetAvailableReadEndpointsByLocation();

bool CanSupportMultipleWriteLocations(DocumentServiceRequest request);
}
}
13 changes: 9 additions & 4 deletions Microsoft.Azure.Cosmos/src/Routing/LocationCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,11 @@ public ReadOnlyCollection<string> GetAvailableReadLocations()
{
return this.locationInfo.AvailableReadLocations;
}

public ReadOnlyCollection<string> GetAvailableWriteLocations()
{
return this.locationInfo.AvailableWriteLocations;
}

/// <summary>
/// Resolves request to service endpoint.
Expand Down Expand Up @@ -529,11 +534,11 @@ public bool ShouldRefreshEndpoints(out bool canRefreshInBackground)

public bool CanUseMultipleWriteLocations(DocumentServiceRequest request)
{
return this.CanUseMultipleWriteLocations() &&
(request.ResourceType == ResourceType.Document ||
return this.CanUseMultipleWriteLocations()
&& (request.ResourceType == ResourceType.Document ||
(request.ResourceType == ResourceType.StoredProcedure && request.OperationType == Documents.OperationType.ExecuteJavaScript));
}

}
private void ClearStaleEndpointUnavailabilityInfo()
{
if (this.locationUnavailablityInfoByEndpoint.Any())
Expand Down
Loading