Skip to content
Next Next commit
Adding ConsolidatedLocationEndpoints to encapsulate thin proxy locati…
…on endpoint and gateway location endpoint.
  • Loading branch information
jeet1995 committed Feb 1, 2025
commit fd076faabc6862ff506afa1c03ca1e97d8fc8287

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpoint;
import com.azure.cosmos.implementation.routing.CollectionRoutingMap;
import com.azure.cosmos.implementation.routing.LocationCache;
import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper;
import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity;
import com.azure.cosmos.models.CosmosContainerIdentity;
Expand Down Expand Up @@ -180,11 +181,13 @@ public void openConnectionsAndInitCachesWithContainer(ProactiveConnectionManagem

cosmosAsyncContainer.openConnectionsAndInitCaches(proactiveConnectionRegionCount).block();

UnmodifiableList<URI> readEndpoints =
UnmodifiableList<LocationCache.ConsolidatedLocationEndpoints> readEndpoints =
globalEndpointManager.getReadEndpoints();

List<URI> proactiveConnectionEndpoints = readEndpoints.subList(
0,
Math.min(readEndpoints.size(),proactiveContainerInitConfig.getProactiveConnectionRegionsCount()));
Math.min(readEndpoints.size(),proactiveContainerInitConfig.getProactiveConnectionRegionsCount()))
.stream().map(LocationCache.ConsolidatedLocationEndpoints::getGatewayLocationEndpoint).collect(Collectors.toList());

Mono<CosmosAsyncContainer> asyncContainerMono = Mono.just(cosmosAsyncContainer);

Expand Down Expand Up @@ -342,10 +345,14 @@ public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnect
ConcurrentHashMap<String, ?> routingMap = getRoutingMap(rxDocumentClient);
ConcurrentHashMap<String, ?> collectionInfoByNameMap = getCollectionInfoByNameMap(rxDocumentClient);
Set<String> endpoints = ConcurrentHashMap.newKeySet();
UnmodifiableList<URI> readEndpoints = globalEndpointManager.getReadEndpoints();
UnmodifiableList<LocationCache.ConsolidatedLocationEndpoints> readEndpoints = globalEndpointManager.getReadEndpoints();

List<URI> proactiveConnectionEndpoints = readEndpoints.subList(
0,
Math.min(readEndpoints.size(), proactiveContainerInitConfig.getProactiveConnectionRegionsCount()));
Math.min(readEndpoints.size(), proactiveContainerInitConfig.getProactiveConnectionRegionsCount()))
.stream()
.map(LocationCache.ConsolidatedLocationEndpoints::getGatewayLocationEndpoint)
.collect(Collectors.toList());

Flux<CosmosAsyncContainer> asyncContainerFlux = Flux.fromIterable(asyncContainers);
Flux<Utils.ValueHolder<List<PartitionKeyRange>>> partitionKeyRangeFlux =
Expand Down Expand Up @@ -488,10 +495,13 @@ public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnect
ConcurrentHashMap<String, ?> routingMap = getRoutingMap(rxDocumentClient);
ConcurrentHashMap<String, ?> collectionInfoByNameMap = getCollectionInfoByNameMap(rxDocumentClient);
Set<String> endpoints = ConcurrentHashMap.newKeySet();
UnmodifiableList<URI> readEndpoints = globalEndpointManager.getReadEndpoints();
UnmodifiableList<LocationCache.ConsolidatedLocationEndpoints> readEndpoints = globalEndpointManager.getReadEndpoints();
List<URI> proactiveConnectionEndpoints = readEndpoints.subList(
0,
Math.min(readEndpoints.size(), proactiveContainerInitConfig.getProactiveConnectionRegionsCount()));
Math.min(readEndpoints.size(), proactiveContainerInitConfig.getProactiveConnectionRegionsCount()))
.stream()
.map(LocationCache.ConsolidatedLocationEndpoints::getGatewayLocationEndpoint)
.collect(Collectors.toList());;

Flux<CosmosAsyncContainer> asyncContainerFlux = Flux.fromIterable(asyncContainers);
Flux<Utils.ValueHolder<List<PartitionKeyRange>>> partitionKeyRangeFlux =
Expand Down Expand Up @@ -656,10 +666,13 @@ public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnect
ConcurrentHashMap<String, ?> routingMap = getRoutingMap(rxDocumentClient);
ConcurrentHashMap<String, ?> collectionInfoByNameMap = getCollectionInfoByNameMap(rxDocumentClient);
Set<String> endpoints = ConcurrentHashMap.newKeySet();
UnmodifiableList<URI> readEndpoints = globalEndpointManager.getReadEndpoints();
UnmodifiableList<LocationCache.ConsolidatedLocationEndpoints> readEndpoints = globalEndpointManager.getReadEndpoints();
List<URI> proactiveConnectionEndpoints = readEndpoints.subList(
0,
Math.min(readEndpoints.size(), proactiveContainerInitConfig.getProactiveConnectionRegionsCount()));
Math.min(readEndpoints.size(), proactiveContainerInitConfig.getProactiveConnectionRegionsCount()))
.stream()
.map(LocationCache.ConsolidatedLocationEndpoints::getGatewayLocationEndpoint)
.collect(Collectors.toList());;

Flux<CosmosAsyncContainer> asyncContainerFlux = Flux.fromIterable(asyncContainers);
Flux<Utils.ValueHolder<List<PartitionKeyRange>>> partitionKeyRangeFlux =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
import com.azure.cosmos.implementation.clienttelemetry.MetricCategory;
import com.azure.cosmos.implementation.clienttelemetry.TagName;
import com.azure.cosmos.implementation.directconnectivity.Protocol;
import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils;
import io.netty.handler.ssl.SslContext;
import org.testng.annotations.Test;

import java.net.URI;
Expand Down Expand Up @@ -170,12 +168,12 @@ public void http2MaxConcurrentStreams() {
@Test(groups = { "unit" })
public void thinClientEnabledTest() {
Configs config = new Configs();
assertThat(config.getThinclientEnabled()).isFalse();
assertThat(config.isThinClientEnabled()).isFalse();

System.clearProperty("COSMOS.THINCLIENT_ENABLED");
System.setProperty("COSMOS.THINCLIENT_ENABLED", "true");
try {
assertThat(config.getThinclientEnabled()).isTrue();
assertThat(config.isThinClientEnabled()).isTrue();
} finally {
System.clearProperty("COSMOS.THINCLIENT_ENABLED");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.SerializationDiagnosticsContext;
import com.azure.cosmos.implementation.guava25.collect.ImmutableList;
import com.azure.cosmos.implementation.routing.LocationCache;
import com.azure.cosmos.util.Beta;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -330,7 +331,7 @@ String getFirstContactedRegion() {
return this.clientSideRequestStatistics.getFirstContactedRegion();
}

URI getFirstContactedLocationEndpoint() {
LocationCache.ConsolidatedLocationEndpoints getFirstContactedLocationEndpoint() {
return this.clientSideRequestStatistics.getFirstContactedLocationEndpoint();
}

Expand Down Expand Up @@ -478,7 +479,7 @@ public void setDiagnosticsContext(CosmosDiagnostics cosmosDiagnostics, CosmosDia
}

@Override
public URI getFirstContactedLocationEndpoint(CosmosDiagnostics cosmosDiagnostics) {
public LocationCache.ConsolidatedLocationEndpoints getFirstContactedLocationEndpoint(CosmosDiagnostics cosmosDiagnostics) {

if (cosmosDiagnostics == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.net.URI;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

import static com.azure.cosmos.implementation.HttpConstants.HttpHeaders.INTENDED_COLLECTION_RID_HEADER;
Expand Down Expand Up @@ -46,7 +47,8 @@ public class ClientRetryPolicy extends DocumentClientRetryPolicy {
private int staleContainerRetryCount;
private boolean isReadRequest;
private boolean canUseMultipleWriteLocations;
private LocationCache.LocationEndpoints locationEndpoints;
private URI locationEndpoint;
private LocationCache.ConsolidatedLocationEndpoints consolidatedLocationEndpoints;
private RetryContext retryContext;
private CosmosDiagnostics cosmosDiagnostics;
private AtomicInteger cnt = new AtomicInteger(0);
Expand Down Expand Up @@ -87,8 +89,7 @@ public Mono<ShouldRetryResult> shouldRetry(Exception e) {
isReadRequest,
canUseMultipleWriteLocations,
e);
// TODO: verify this logic
if (this.locationEndpoints.gatewayEndpoint == null && this.locationEndpoints.thinClientEndpoint == null) {
if (this.locationEndpoint == null || this.consolidatedLocationEndpoints == null) {
// on before request is not invoked because Document Service Request creation failed.
logger.error("locationEndpoint is null because ClientRetryPolicy::onBeforeRequest(.) is not invoked, " +
"probably request creation failed due to invalid options, serialization setting, etc.");
Expand Down Expand Up @@ -230,7 +231,7 @@ private ShouldRetryResult shouldRetryOnSessionNotAvailable(RxDocumentServiceRequ
return ShouldRetryResult.noRetry();
} else {
if (this.canUseMultipleWriteLocations) {
UnmodifiableList<URI> endpoints =
UnmodifiableList<LocationCache.ConsolidatedLocationEndpoints> endpoints =
this.isReadRequest ?
this.globalEndpointManager.getApplicableReadEndpoints(request) : this.globalEndpointManager.getApplicableWriteEndpoints(request);

Expand Down Expand Up @@ -306,7 +307,7 @@ private Mono<ShouldRetryResult> shouldRetryOnGatewayTimeout() {
boolean canFailoverOnTimeout = canGatewayRequestFailoverOnTimeout(this.request);

if (this.globalPartitionEndpointManagerForCircuitBreaker.isPartitionLevelCircuitBreakingApplicable(this.request)) {
this.globalPartitionEndpointManagerForCircuitBreaker.handleLocationExceptionForPartitionKeyRange(this.request, this.request.requestContext.locationEndpointToRoute);
this.globalPartitionEndpointManagerForCircuitBreaker.handleLocationExceptionForPartitionKeyRange(this.request, this.request.requestContext.consolidatedLocationEndpointsToRoute);
}

//if operation is data plane read, metadata read, or query plan it can be retried on a different endpoint.
Expand Down Expand Up @@ -337,19 +338,13 @@ private Mono<ShouldRetryResult> shouldNotRetryOnEndpointFailureAsync(boolean isR
private Mono<Void> refreshLocation(boolean isReadRequest, boolean forceRefresh, boolean usePreferredLocations) {
this.failoverRetryCount++;

// Mark the current read endpoints as unavailable
// Mark the current read endpoint as unavailable
if (isReadRequest) {
logger.warn("marking the endpoint {} as unavailable for read",this.locationEndpoints.gatewayEndpoint);
logger.warn("marking the endpoint {} as unavailable for read",this.locationEndpoints.thinClientEndpoint);
// adding an extra call here will cause updateLocationCache to be called twice, is this ok or should we consolidate
this.globalEndpointManager.markEndpointUnavailableForRead(this.locationEndpoints.gatewayEndpoint);
this.globalEndpointManager.markEndpointUnavailableForRead(this.locationEndpoints.thinClientEndpoint);
logger.warn("marking the endpoint {} as unavailable for read",this.locationEndpoint);
this.globalEndpointManager.markEndpointUnavailableForRead(this.consolidatedLocationEndpoints.getGatewayLocationEndpoint());
} else {
logger.warn("marking the endpoint {} as unavailable for write",this.locationEndpoints.gatewayEndpoint);
logger.warn("marking the endpoint {} as unavailable for write",this.locationEndpoints.thinClientEndpoint);
// adding an extra call here will cause updateLocationCache to be called twice, is this ok or should we consolidate
this.globalEndpointManager.markEndpointUnavailableForWrite(this.locationEndpoints.gatewayEndpoint);
this.globalEndpointManager.markEndpointUnavailableForWrite(this.locationEndpoints.thinClientEndpoint);
logger.warn("marking the endpoint {} as unavailable for write",this.locationEndpoint);
this.globalEndpointManager.markEndpointUnavailableForWrite(this.consolidatedLocationEndpoints.getGatewayLocationEndpoint());
}

this.retryContext = new RetryContext(this.failoverRetryCount, usePreferredLocations);
Expand All @@ -364,7 +359,7 @@ private Mono<ShouldRetryResult> shouldRetryOnBackendServiceUnavailableAsync(

if (this.globalPartitionEndpointManagerForCircuitBreaker.isPartitionLevelCircuitBreakingApplicable(this.request)) {
this.globalPartitionEndpointManagerForCircuitBreaker
.handleLocationExceptionForPartitionKeyRange(this.request, this.request.requestContext.locationEndpointToRoute);
.handleLocationExceptionForPartitionKeyRange(this.request, this.request.requestContext.consolidatedLocationEndpointsToRoute);
}

// The request has failed with 503, SDK need to decide whether it is safe to retry for write operations
Expand Down Expand Up @@ -409,10 +404,7 @@ private Mono<ShouldRetryResult> shouldRetryOnBackendServiceUnavailableAsync(
return Mono.just(ShouldRetryResult.noRetry());
}

logger.info("shouldRetryOnServiceUnavailable() Retrying. Received on endpoints {}, {}, IsReadRequest = {}",
this.locationEndpoints.gatewayEndpoint,
this.locationEndpoints.thinClientEndpoint,
isReadRequest);
logger.info("shouldRetryOnServiceUnavailable() Retrying. Received on endpoint {}, IsReadRequest = {}", this.locationEndpoint, isReadRequest);

// Retrying on second PreferredLocations
// RetryCount is used as zero-based index
Expand All @@ -426,9 +418,10 @@ private Mono<ShouldRetryResult> shouldRetryOnRequestTimeout(

if (this.globalPartitionEndpointManagerForCircuitBreaker.isPartitionLevelCircuitBreakingApplicable(this.request)) {
if (!isReadRequest && !nonIdempotentWriteRetriesEnabled) {

this.globalPartitionEndpointManagerForCircuitBreaker.handleLocationExceptionForPartitionKeyRange(
request,
request.requestContext.locationEndpointToRoute);
this.request,
this.request.requestContext.consolidatedLocationEndpointsToRoute);
}
}

Expand All @@ -438,9 +431,10 @@ private Mono<ShouldRetryResult> shouldRetryOnRequestTimeout(
private Mono<ShouldRetryResult> shouldRetryOnInternalServerError() {

if (this.globalPartitionEndpointManagerForCircuitBreaker.isPartitionLevelCircuitBreakingApplicable(this.request)) {

this.globalPartitionEndpointManagerForCircuitBreaker.handleLocationExceptionForPartitionKeyRange(
request,
request.requestContext.locationEndpointToRoute);
this.request,
this.request.requestContext.consolidatedLocationEndpointsToRoute);
}

return Mono.just(ShouldRetryResult.NO_RETRY);
Expand Down Expand Up @@ -469,11 +463,17 @@ public void onBeforeSendRequest(RxDocumentServiceRequest request) {

// Resolve the endpoint for the request and pin the resolution to the resolved endpoint
// This enables marking the endpoint unavailability on endpoint failover/unreachability
this.locationEndpoints = this.globalEndpointManager.resolveServiceEndpoint(request);
this.consolidatedLocationEndpoints = this.globalEndpointManager.resolveServiceEndpoint(request);
this.locationEndpoint = request.useThinProxy && this.consolidatedLocationEndpoints.getThinClientLocationEndpoint() != null ?
this.consolidatedLocationEndpoints.getThinClientLocationEndpoint() :
this.consolidatedLocationEndpoints.getGatewayLocationEndpoint();

if (this.consolidatedLocationEndpoints.getThinClientLocationEndpoint() == null) {
request.useThinProxy = false;
}

if (request.requestContext != null) {
// TODO: try both endpoints before we force cross-region retry
request.requestContext.locationEndpoints = this.locationEndpoints;
request.requestContext.routeToLocation(Configs.getThinclientEnabled() ? this.locationEndpoints.thinClientEndpoint : this.locationEndpoints.gatewayEndpoint);
request.requestContext.routeToLocation(this.locationEndpoint, this.consolidatedLocationEndpoints);
}
}

Expand Down Expand Up @@ -519,4 +519,13 @@ public RetryContext(int retryCount,
this.retryRequestOnPreferredLocations = retryRequestOnPreferredLocations;
}
}

private URI getGatewayLocationEndpoint(RxDocumentServiceRequest request) {

Objects.requireNonNull(request, "Argument 'request' must not be null'");
Objects.requireNonNull(request.requestContext, "Argument 'request.requestContext' must not be null'");
Objects.requireNonNull(request.requestContext.consolidatedLocationEndpointsToRoute, "Argument 'request.requestContext.consolidatedLocationEndpointsToRoute' must not be null'");

return request.requestContext.consolidatedLocationEndpointsToRoute.getGatewayLocationEndpoint();
}
}
Loading