Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions Microsoft.Azure.Cosmos/src/Routing/AsyncCacheNonBlocking.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ namespace Microsoft.Azure.Cosmos
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Tracing.TraceData;

/// <summary>
/// This is a thread safe AsyncCache that allows refreshing values in the background.
Expand Down Expand Up @@ -179,21 +178,26 @@ public bool TryRemove(TKey key)

/// <summary>
/// Refreshes the async non blocking cache on-demand for the given <paramref name="key"/>
/// and caches the result for later usage.
/// and caches the result for later usage. Note that this method doesn't control the number
/// of tasks created in parallel, and the concurrency needed to be controlled at the caller.
/// </summary>
/// <param name="key">The requested key to be refreshed.</param>
/// <param name="singleValueInitFunc">A func delegate to be invoked at a later point of time.</param>
public async Task RefreshAsync(
public void Refresh(
TKey key,
Func<TValue, Task<TValue>> singleValueInitFunc)
{
if (this.values.TryGetValue(key, out AsyncLazyWithRefreshTask<TValue> initialLazyValue))
{
await this.UpdateCacheAndGetValueFromBackgroundTaskAsync(
key: key,
initialValue: initialLazyValue,
callbackDelegate: singleValueInitFunc,
operationName: nameof(RefreshAsync));
Task backgroundRefreshTask = this.GetAsync(
key: key,
singleValueInitFunc: singleValueInitFunc,
forceRefresh: (_) => true);

Task continuationTask = backgroundRefreshTask
.ContinueWith(
task => DefaultTrace.TraceVerbose("Failed to refresh addresses in the background with exception: {0}", task.Exception),
TaskContinuationOptions.OnlyOnFaulted);
}
}

Expand Down Expand Up @@ -250,8 +254,8 @@ private sealed class AsyncLazyWithRefreshTask<T>
{
private readonly CancellationToken cancellationToken;
private readonly Func<T, Task<T>> createValueFunc;
private readonly object valueLock = new object();
private readonly object removedFromCacheLock = new object();
private readonly object valueLock = new ();
private readonly object removedFromCacheLock = new ();

private bool removedFromCache = false;
private Task<T> value;
Expand Down
50 changes: 39 additions & 11 deletions Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@ internal class GatewayAddressCache : IAddressCache, IDisposable
private readonly ICosmosAuthorizationTokenProvider tokenProvider;
private readonly bool enableTcpConnectionEndpointRediscovery;

private readonly SemaphoreSlim semaphore;
private readonly CosmosHttpClient httpClient;
private readonly bool isReplicaAddressValidationEnabled;

private Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation> masterPartitionAddressCache;
private DateTime suboptimalMasterPartitionTimestamp;
private bool disposedValue;
private bool validateUnknownReplicas;
private IOpenConnectionsHandler openConnectionsHandler;

public GatewayAddressCache(
Expand Down Expand Up @@ -90,8 +92,10 @@ public GatewayAddressCache(
Constants.Properties.Protocol,
GatewayAddressCache.ProtocolString(this.protocol));

this.semaphore = new SemaphoreSlim(1, 1);
this.openConnectionsHandler = openConnectionsHandler;
this.isReplicaAddressValidationEnabled = replicaAddressValidationEnabled;
this.validateUnknownReplicas = false;
}

public Uri ServiceEndpoint => this.serviceEndpoint;
Expand Down Expand Up @@ -120,6 +124,14 @@ public async Task OpenConnectionsAsync(
List<Task> tasks = new ();
int batchSize = GatewayAddressCache.DefaultBatchSize;

// By design, the Unknown replicas are validated only when the following two conditions meet:
// 1) The CosmosClient is initiated using the CreateAndInitializaAsync() flow.
// 2) The advanced replica selection feature enabled.
if (shouldOpenRntbdChannels)
{
this.validateUnknownReplicas = true;
}

#if !(NETSTANDARD15 || NETSTANDARD16)
#if NETSTANDARD20
// GetEntryAssembly returns null when loaded from native netstandard2.0
Expand Down Expand Up @@ -302,11 +314,12 @@ public async Task<PartitionAddressInformation> TryGetAddressesAsync(
.ReplicaTransportAddressUris
.Any(x => x.ShouldRefreshHealthStatus()))
{
Task refreshAddressesInBackgroundTask = Task.Run(async () =>
bool slimAcquired = await this.semaphore.WaitAsync(0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarification: What will happen if this check not exists?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the below condition doesn't exist, then there is no gate to detect if other threads has acquired the semaphore in the past and already scheduled a background refresh task. So, without the check, the try block will get executed.

        if (slimAcquired)
        {
            this.serverPartitionAddressCache.Refresh(
                key: partitionKeyRangeIdentity,
                singleValueInitFunc: (currentCachedValue) => this.GetAddressesForRangeIdAsync(
                    request,
                    cachedAddresses: currentCachedValue,
                    partitionKeyRangeIdentity.CollectionRid,
                    partitionKeyRangeIdentity.PartitionKeyRangeId,
                    forceRefresh: true));
        }

Also, the code snippet bool slimAcquired = await this.semaphore.WaitAsync(0); checks the semaphore hook is acquired at that point of time. If it's already acquired, then the thread immediately returns a false and continue it's execution. This guarantees that no duplicate tasks were created.

try
{
try
if (slimAcquired)
{
await this.serverPartitionAddressCache.RefreshAsync(
this.serverPartitionAddressCache.Refresh(
key: partitionKeyRangeIdentity,
singleValueInitFunc: (currentCachedValue) => this.GetAddressesForRangeIdAsync(
request,
Expand All @@ -315,14 +328,21 @@ await this.serverPartitionAddressCache.RefreshAsync(
partitionKeyRangeIdentity.PartitionKeyRangeId,
forceRefresh: true));
}
catch (Exception ex)
else
{
DefaultTrace.TraceWarning("Failed to refresh addresses in the background for the collection rid: {0} with exception: {1}. '{2}'",
DefaultTrace.TraceVerbose("Failed to refresh addresses in the background for the collection rid: {0}, partition key range id: {1}, because the semaphore is already acquired. '{2}'",
partitionKeyRangeIdentity.CollectionRid,
ex,
partitionKeyRangeIdentity.PartitionKeyRangeId,
System.Diagnostics.Trace.CorrelationManager.ActivityId);
}
});
}
finally
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: try-finally be moved inside IF (code refractoring and fainally clause then don't need explicit check

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I will refactor this with the next PR to master. Good catch!

{
if (slimAcquired)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should not this semaphore be released only when the refresh completed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed this offline. The below code should cover this:

if (addresses
    .Get(Protocol.Tcp)
    .ReplicaTransportAddressUris
    .Any(x => x.ShouldRefreshHealthStatus()))

The semaphore will be released as soon as the task is successfully scheduled. and the above check is good enough to block other parallel threads to create duplicate tasks.

{
this.semaphore.Release();
}
}
}

return addresses;
Expand Down Expand Up @@ -1008,18 +1028,26 @@ private static PartitionAddressInformation MergeAddresses(
/// Returns a list of <see cref="TransportAddressUri"/> needed to validate their health status. Validating
/// a uri is done by opening Rntbd connection to the backend replica, which is a costly operation by nature. Therefore
/// vaidating both Unhealthy and Unknown replicas at the same time could impose a high CPU utilization. To avoid this
/// situation, the RntbdOpenConnectionHandler has good concurrency control mechanism to open the connections gracefully/>.
/// situation, the RntbdOpenConnectionHandler has good concurrency control mechanism to open the connections gracefully.
/// By default, this method only returns the Unhealthy replicas that requires to validate it's connectivity status. The
/// Unknown replicas are validated only when the CosmosClient is initiated using the CreateAndInitializaAsync() flow.
/// </summary>
/// <param name="transportAddresses">A read only list of <see cref="TransportAddressUri"/>s.</param>
/// <returns>A list of <see cref="TransportAddressUri"/> that needs to validate their status.</returns>
private IEnumerable<TransportAddressUri> GetAddressesNeededToValidateStatus(
IReadOnlyList<TransportAddressUri> transportAddresses)
{
return transportAddresses
.Where(address => address
return this.validateUnknownReplicas
? transportAddresses
.Where(address => address
.GetCurrentHealthState()
.GetHealthStatus() is
TransportAddressHealthState.HealthStatus.UnhealthyPending or
TransportAddressHealthState.HealthStatus.Unknown)
: transportAddresses
.Where(address => address
.GetCurrentHealthState()
.GetHealthStatus() is
TransportAddressHealthState.HealthStatus.Unknown or
TransportAddressHealthState.HealthStatus.UnhealthyPending);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ public async Task TestGoneFromServiceScenarioAsync(
"44444444444444444",
};

HttpResponseMessage replicaSet1 = MockSetupsHelper.CreateAddresses(
replicaIds1,
partitionKeyRanges.First(),
"eastus",
cRid);
HttpResponseMessage replicaSet1 = MockSetupsHelper.CreateAddresses(
replicaIds1,
partitionKeyRanges.First(),
"eastus",
cRid);

// One replica changed on the refresh
List<string> replicaIds2 = new List<string>()
Expand Down Expand Up @@ -176,6 +176,10 @@ public async Task TestGoneFromServiceScenarioAsync(
mockTransportClient.VerifyAll();
mockHttpHandler.VerifyAll();

mockTransportClient
.Setup(x => x.OpenConnectionAsync(It.IsAny<Uri>()))
.Returns(Task.CompletedTask);

Documents.TransportAddressUri failedReplica = urisVisited.First();

// With replica validation enabled in preview mode, the failed replica will be validated as a part of the flow,
Expand Down
Loading