Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
e18aa95
Initial code changes to throw 503 on 429/3092.
kundadebdatta Sep 11, 2024
e44f080
Updated client retry policy. Added more tests to cover 429/3092.
kundadebdatta Sep 11, 2024
9006b5f
Code changes to update direct package version. Updating the tests.
kundadebdatta Sep 12, 2024
e7710c1
Code changes to refactor client retry policy.
kundadebdatta Sep 12, 2024
5a1b505
Minor code cleanup.
kundadebdatta Sep 12, 2024
ebf1a11
Merge branch 'master' into users/kundadebdatta/4390_cross_regional_re…
kundadebdatta Sep 12, 2024
91cc3ea
Reverting the direct version bump up change.
kundadebdatta Sep 13, 2024
b479714
Merge branch 'master' into users/kundadebdatta/4656_cross_regional_re…
kundadebdatta Sep 13, 2024
6ebb9e6
Code changes to address some of the review comments.
kundadebdatta Sep 13, 2024
47cacdf
Merge branch 'master' into users/kundadebdatta/4656_cross_regional_re…
kundadebdatta Sep 13, 2024
e6e2766
Code changes to move failover logic in client retry policy.
kundadebdatta Sep 14, 2024
9575986
Minor code clean up.
kundadebdatta Sep 14, 2024
0c5304d
Code changes to clean up some cosmetic items.
kundadebdatta Sep 15, 2024
ce3c62f
Further clean up.
kundadebdatta Sep 15, 2024
bf5e649
Merge branch 'master' into users/kundadebdatta/4656_cross_regional_re…
kundadebdatta Sep 16, 2024
973f1f1
Code changes to address review comments.
kundadebdatta Sep 17, 2024
40560cc
Minor refactor to address cosmetic update.
kundadebdatta Sep 17, 2024
a198a04
Code changes to address cosmetic review comment.
kundadebdatta Sep 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Initial code changes to throw 503 on 429/3092.
  • Loading branch information
kundadebdatta committed Sep 11, 2024
commit e18aa95c462cd718d3cf77dd8f70e28e74f05af8
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@ public virtual async Task<TransactionalBatchOperationResult> AddAsync(
ItemBatchOperationContext context = new ItemBatchOperationContext(
resolvedPartitionKeyRangeId,
trace,
BatchAsyncContainerExecutor.GetRetryPolicy(this.cosmosContainer, operation.OperationType, this.retryOptions));
BatchAsyncContainerExecutor.GetRetryPolicy(
this.cosmosContainer,
this.cosmosClientContext?.DocumentClient?.GlobalEndpointManager,
operation.OperationType,
this.retryOptions));

if (itemRequestOptions != null && itemRequestOptions.AddRequestHeaders != null)
{
Expand Down Expand Up @@ -159,6 +163,7 @@ internal virtual async Task ValidateOperationAsync(

private static IDocumentClientRetryPolicy GetRetryPolicy(
ContainerInternal containerInternal,
GlobalEndpointManager endpointManager,
OperationType operationType,
RetryOptions retryOptions)
{
Expand All @@ -167,6 +172,7 @@ private static IDocumentClientRetryPolicy GetRetryPolicy(
operationType,
new ResourceThrottleRetryPolicy(
retryOptions.MaxRetryAttemptsOnThrottledRequests,
endpointManager,
retryOptions.MaxRetryWaitTimeInSeconds));
}

Expand Down
1 change: 1 addition & 0 deletions Microsoft.Azure.Cosmos/src/ClientRetryPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public ClientRetryPolicy(
{
this.throttlingRetry = new ResourceThrottleRetryPolicy(
retryOptions.MaxRetryAttemptsOnThrottledRequests,
endpointManager: globalEndpointManager,
retryOptions.MaxRetryWaitTimeInSeconds);

this.globalEndpointManager = globalEndpointManager;
Expand Down
5 changes: 3 additions & 2 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -988,8 +988,9 @@ internal virtual void Initialize(Uri serviceEndpoint,
this.initializeTaskFactory = (_) => TaskHelper.InlineIfPossible<bool>(
() => this.GetInitializationTaskAsync(storeClientFactory: storeClientFactory),
new ResourceThrottleRetryPolicy(
this.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests,
this.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds));
maxAttemptCount: this.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests,
endpointManager: this.GlobalEndpointManager,
maxWaitTimeInSeconds: this.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds));

// Create the task to start the initialize task
// Task will be awaited on in the EnsureValidClientAsync
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public MetadataRequestThrottleRetryPolicy(

this.throttlingRetryPolicy = new ResourceThrottleRetryPolicy(
maxRetryAttemptsOnThrottledRequests,
this.globalEndpointManager,
maxRetryWaitTimeInSeconds);

this.retryContext = new MetadataRetryContext
Expand Down
38 changes: 35 additions & 3 deletions Microsoft.Azure.Cosmos/src/ResourceThrottleRetryPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace Microsoft.Azure.Cosmos
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Documents;

// Retry when we receive the throttling from server.
Expand All @@ -19,12 +20,15 @@ internal sealed class ResourceThrottleRetryPolicy : IDocumentClientRetryPolicy
private readonly uint backoffDelayFactor;
private readonly int maxAttemptCount;
private readonly TimeSpan maxWaitTimeInMilliseconds;
private readonly IGlobalEndpointManager globalEndpointManager;

private int currentAttemptCount;
private TimeSpan cumulativeRetryDelay;
private bool? isMultiMasterWriteRegion;

public ResourceThrottleRetryPolicy(
int maxAttemptCount,
IGlobalEndpointManager endpointManager,
int maxWaitTimeInSeconds = DefaultMaxWaitTimeInSeconds,
uint backoffDelayFactor = 1)
{
Expand All @@ -33,6 +37,7 @@ public ResourceThrottleRetryPolicy(
throw new ArgumentException("maxWaitTimeInSeconds", "maxWaitTimeInSeconds must be less than " + (int.MaxValue / 1000));
}

this.globalEndpointManager = endpointManager;
this.maxAttemptCount = maxAttemptCount;
this.backoffDelayFactor = backoffDelayFactor;
this.maxWaitTimeInMilliseconds = TimeSpan.FromSeconds(maxWaitTimeInSeconds);
Expand All @@ -59,7 +64,9 @@ public Task<ShouldRetryResult> ShouldRetryAsync(
return Task.FromResult(ShouldRetryResult.NoRetry());
}

return this.ShouldRetryInternalAsync(dce.RetryAfter);
return this.ShouldRetryInternalAsync(
dce?.GetSubStatus(),
dce?.RetryAfter);
}

DefaultTrace.TraceError(
Expand Down Expand Up @@ -88,11 +95,34 @@ public Task<ShouldRetryResult> ShouldRetryAsync(
return Task.FromResult(ShouldRetryResult.NoRetry());
}

return this.ShouldRetryInternalAsync(cosmosResponseMessage?.Headers.RetryAfter);
return this.ShouldRetryInternalAsync(
cosmosResponseMessage?.Headers.SubStatusCode,
cosmosResponseMessage?.Headers.RetryAfter,
cosmosResponseMessage?.CosmosException);
}

private Task<ShouldRetryResult> ShouldRetryInternalAsync(TimeSpan? retryAfter)
private Task<ShouldRetryResult> ShouldRetryInternalAsync(
SubStatusCodes? subStatusCode,
TimeSpan? retryAfter,
Exception exception = null)
{
if (this.isMultiMasterWriteRegion.HasValue
&& this.isMultiMasterWriteRegion.Value
&& subStatusCode != null
&& subStatusCode == SubStatusCodes.AadTokenExpired)
{
DefaultTrace.TraceError(
"Operation will NOT be retried. Converting 429/3092 to 503. Current attempt {0} sub status code: {1}.",
this.currentAttemptCount, SubStatusCodes.AadTokenExpired);

ServiceUnavailableException exceptionToThrow = ServiceUnavailableException.Create(
SubStatusCodes.AadTokenExpired,
innerException: exception);

return Task.FromResult(
ShouldRetryResult.NoRetry(exceptionToThrow));
}

TimeSpan retryDelay = TimeSpan.Zero;
if (this.currentAttemptCount < this.maxAttemptCount &&
this.CheckIfRetryNeeded(retryAfter, out retryDelay))
Expand Down Expand Up @@ -133,6 +163,8 @@ private object GetExceptionMessage(Exception exception)
/// <param name="request">The request being sent to the service.</param>
public void OnBeforeSendRequest(DocumentServiceRequest request)
{
this.isMultiMasterWriteRegion = !request.IsReadOnlyRequest
&& (this.globalEndpointManager?.CanUseMultipleWriteLocations(request) ?? false);
}

/// <summary>
Expand Down
9 changes: 5 additions & 4 deletions Microsoft.Azure.Cosmos/src/Routing/LocationCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -529,11 +529,12 @@ public bool ShouldRefreshEndpoints(out bool canRefreshInBackground)

public bool CanUseMultipleWriteLocations(DocumentServiceRequest request)
{
return this.CanUseMultipleWriteLocations() &&
(request.ResourceType == ResourceType.Document ||
return this.CanUseMultipleWriteLocations()
&& this.locationInfo.AvailableWriteLocations.Count > 1
&& (request.ResourceType == ResourceType.Document ||
(request.ResourceType == ResourceType.StoredProcedure && request.OperationType == Documents.OperationType.ExecuteJavaScript));
}

}
private void ClearStaleEndpointUnavailabilityInfo()
{
if (this.locationUnavailablityInfoByEndpoint.Any())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,23 @@ public class BatchAsyncBatcherTests
{
private static readonly Exception expectedException = new Exception();
private static readonly BatchPartitionMetric metric = new BatchPartitionMetric();
private GlobalEndpointManager mockedEndpointManager;

[TestInitialize]
public void Initialize()
{
Mock<IDocumentClientInternal> mockedClient = new();

this.mockedEndpointManager = new(
mockedClient.Object,
new ConnectionPolicy());
}

[TestCleanup]
public void Cleanup()
{
this.mockedEndpointManager.Dispose();
}

private ItemBatchOperation CreateItemBatchOperation(bool withContext = false)
{
Expand Down Expand Up @@ -565,12 +582,12 @@ public async Task RetrierGetsCalledOnSplit()
IDocumentClientRetryPolicy retryPolicy1 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Read,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, this.mockedEndpointManager));

IDocumentClientRetryPolicy retryPolicy2 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Read,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, this.mockedEndpointManager));

ItemBatchOperation operation1 = this.CreateItemBatchOperation();
ItemBatchOperation operation2 = this.CreateItemBatchOperation();
Expand All @@ -594,12 +611,12 @@ public async Task RetrierGetsCalledOnCompletingSplit()
IDocumentClientRetryPolicy retryPolicy1 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Read,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, this.mockedEndpointManager));

IDocumentClientRetryPolicy retryPolicy2 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Read,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, this.mockedEndpointManager));

ItemBatchOperation operation1 = this.CreateItemBatchOperation();
ItemBatchOperation operation2 = this.CreateItemBatchOperation();
Expand All @@ -623,12 +640,12 @@ public async Task RetrierGetsCalledOnCompletingPartitionMigration()
IDocumentClientRetryPolicy retryPolicy1 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Read,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, this.mockedEndpointManager));

IDocumentClientRetryPolicy retryPolicy2 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Read,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, this.mockedEndpointManager));

ItemBatchOperation operation1 = this.CreateItemBatchOperation();
ItemBatchOperation operation2 = this.CreateItemBatchOperation();
Expand Down Expand Up @@ -672,17 +689,17 @@ public async Task RetrierGetsCalledOn413_3402()
IDocumentClientRetryPolicy retryPolicy1 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Read,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, this.mockedEndpointManager));

IDocumentClientRetryPolicy retryPolicy2 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Read,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, this.mockedEndpointManager));

IDocumentClientRetryPolicy retryPolicy3 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Create,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, this.mockedEndpointManager));

ItemBatchOperation operation1 = this.CreateItemBatchOperation();
ItemBatchOperation operation2 = this.CreateItemBatchOperation();
Expand Down Expand Up @@ -710,17 +727,17 @@ public async Task RetrierGetsCalledOn413_NoSubstatus()
IDocumentClientRetryPolicy retryPolicy1 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Read,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, this.mockedEndpointManager));

IDocumentClientRetryPolicy retryPolicy2 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Read,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, this.mockedEndpointManager));

IDocumentClientRetryPolicy retryPolicy3 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Create,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, this.mockedEndpointManager));

ItemBatchOperation operation1 = this.CreateItemBatchOperation();
ItemBatchOperation operation2 = this.CreateItemBatchOperation();
Expand Down
Loading