Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.azure.cosmos.implementation.directconnectivity.AddressSelector;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdUtils;
import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
import com.azure.cosmos.implementation.routing.LocationCache;
import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity;
import com.azure.cosmos.test.faultinjection.FaultInjectionCondition;
import com.azure.cosmos.test.faultinjection.FaultInjectionConnectionErrorResult;
Expand Down Expand Up @@ -447,6 +448,7 @@ private Mono<List<URI>> resolvePhysicalAddresses(
null);

faultInjectionAddressRequest.requestContext.locationEndpointToRoute = regionEndpoint;
faultInjectionAddressRequest.requestContext.consolidatedRegionalEndpointToRoute = new LocationCache.ConsolidatedRegionalEndpoint(regionEndpoint, null);
faultInjectionAddressRequest.setPartitionKeyRangeIdentity(new PartitionKeyRangeIdentity(pkRangeId));

if (isWriteOnly) {
Expand Down
12 changes: 0 additions & 12 deletions sdk/cosmos/azure-cosmos-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -222,18 +222,6 @@ Licensed under the MIT License.
<scope>test</scope>
<version>2.17.2</version> <!-- {x-version-update;com.fasterxml.jackson.module:jackson-module-blackbird;external_dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-cosmos</artifactId>
<version>4.67.0-beta.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-cosmos</artifactId>
<version>4.67.0-beta.1</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
import com.azure.cosmos.implementation.feedranges.FeedRangePartitionKeyImpl;
import com.azure.cosmos.implementation.guava25.base.Function;
import com.azure.cosmos.implementation.routing.LocationCache;
import com.azure.cosmos.models.CosmosBatch;
import com.azure.cosmos.models.CosmosBatchResponse;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
Expand Down Expand Up @@ -60,7 +61,6 @@
import reactor.core.publisher.Mono;

import java.lang.reflect.Field;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -5144,8 +5144,8 @@ private static double getEstimatedFailureCountSeenPerRegionPerPartitionKeyRange(
return 0d;
}

ConcurrentHashMap<URI, LocationSpecificHealthContext> locationEndpointToLocationSpecificContextForPartition
= (ConcurrentHashMap<URI, LocationSpecificHealthContext>) locationEndpointToLocationSpecificContextForPartitionField.get(partitionAndLocationSpecificUnavailabilityInfo);
ConcurrentHashMap<LocationCache.ConsolidatedRegionalEndpoint, LocationSpecificHealthContext> locationEndpointToLocationSpecificContextForPartition
= (ConcurrentHashMap<LocationCache.ConsolidatedRegionalEndpoint, LocationSpecificHealthContext>) locationEndpointToLocationSpecificContextForPartitionField.get(partitionAndLocationSpecificUnavailabilityInfo);

int count = 0;
boolean failuresExist = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpoint;
import com.azure.cosmos.implementation.routing.CollectionRoutingMap;
import com.azure.cosmos.implementation.routing.LocationCache;
import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper;
import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity;
import com.azure.cosmos.models.CosmosContainerIdentity;
Expand Down Expand Up @@ -180,11 +181,13 @@ public void openConnectionsAndInitCachesWithContainer(ProactiveConnectionManagem

cosmosAsyncContainer.openConnectionsAndInitCaches(proactiveConnectionRegionCount).block();

UnmodifiableList<URI> readEndpoints =
UnmodifiableList<LocationCache.ConsolidatedRegionalEndpoint> readEndpoints =
globalEndpointManager.getReadEndpoints();

List<URI> proactiveConnectionEndpoints = readEndpoints.subList(
0,
Math.min(readEndpoints.size(),proactiveContainerInitConfig.getProactiveConnectionRegionsCount()));
Math.min(readEndpoints.size(),proactiveContainerInitConfig.getProactiveConnectionRegionsCount()))
.stream().map(LocationCache.ConsolidatedRegionalEndpoint::getGatewayLocationEndpoint).collect(Collectors.toList());

Mono<CosmosAsyncContainer> asyncContainerMono = Mono.just(cosmosAsyncContainer);

Expand Down Expand Up @@ -342,10 +345,14 @@ public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnect
ConcurrentHashMap<String, ?> routingMap = getRoutingMap(rxDocumentClient);
ConcurrentHashMap<String, ?> collectionInfoByNameMap = getCollectionInfoByNameMap(rxDocumentClient);
Set<String> endpoints = ConcurrentHashMap.newKeySet();
UnmodifiableList<URI> readEndpoints = globalEndpointManager.getReadEndpoints();
UnmodifiableList<LocationCache.ConsolidatedRegionalEndpoint> readEndpoints = globalEndpointManager.getReadEndpoints();

List<URI> proactiveConnectionEndpoints = readEndpoints.subList(
0,
Math.min(readEndpoints.size(), proactiveContainerInitConfig.getProactiveConnectionRegionsCount()));
Math.min(readEndpoints.size(), proactiveContainerInitConfig.getProactiveConnectionRegionsCount()))
.stream()
.map(LocationCache.ConsolidatedRegionalEndpoint::getGatewayLocationEndpoint)
.collect(Collectors.toList());

Flux<CosmosAsyncContainer> asyncContainerFlux = Flux.fromIterable(asyncContainers);
Flux<Utils.ValueHolder<List<PartitionKeyRange>>> partitionKeyRangeFlux =
Expand Down Expand Up @@ -488,10 +495,13 @@ public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnect
ConcurrentHashMap<String, ?> routingMap = getRoutingMap(rxDocumentClient);
ConcurrentHashMap<String, ?> collectionInfoByNameMap = getCollectionInfoByNameMap(rxDocumentClient);
Set<String> endpoints = ConcurrentHashMap.newKeySet();
UnmodifiableList<URI> readEndpoints = globalEndpointManager.getReadEndpoints();
UnmodifiableList<LocationCache.ConsolidatedRegionalEndpoint> readEndpoints = globalEndpointManager.getReadEndpoints();
List<URI> proactiveConnectionEndpoints = readEndpoints.subList(
0,
Math.min(readEndpoints.size(), proactiveContainerInitConfig.getProactiveConnectionRegionsCount()));
Math.min(readEndpoints.size(), proactiveContainerInitConfig.getProactiveConnectionRegionsCount()))
.stream()
.map(LocationCache.ConsolidatedRegionalEndpoint::getGatewayLocationEndpoint)
.collect(Collectors.toList());;

Flux<CosmosAsyncContainer> asyncContainerFlux = Flux.fromIterable(asyncContainers);
Flux<Utils.ValueHolder<List<PartitionKeyRange>>> partitionKeyRangeFlux =
Expand Down Expand Up @@ -656,10 +666,13 @@ public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnect
ConcurrentHashMap<String, ?> routingMap = getRoutingMap(rxDocumentClient);
ConcurrentHashMap<String, ?> collectionInfoByNameMap = getCollectionInfoByNameMap(rxDocumentClient);
Set<String> endpoints = ConcurrentHashMap.newKeySet();
UnmodifiableList<URI> readEndpoints = globalEndpointManager.getReadEndpoints();
UnmodifiableList<LocationCache.ConsolidatedRegionalEndpoint> readEndpoints = globalEndpointManager.getReadEndpoints();
List<URI> proactiveConnectionEndpoints = readEndpoints.subList(
0,
Math.min(readEndpoints.size(), proactiveContainerInitConfig.getProactiveConnectionRegionsCount()));
Math.min(readEndpoints.size(), proactiveContainerInitConfig.getProactiveConnectionRegionsCount()))
.stream()
.map(LocationCache.ConsolidatedRegionalEndpoint::getGatewayLocationEndpoint)
.collect(Collectors.toList());;

Flux<CosmosAsyncContainer> asyncContainerFlux = Flux.fromIterable(asyncContainers);
Flux<Utils.ValueHolder<List<PartitionKeyRange>>> partitionKeyRangeFlux =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.azure.cosmos.ThrottlingRetryOptions;
import com.azure.cosmos.implementation.circuitBreaker.GlobalPartitionEndpointManagerForCircuitBreaker;
import com.azure.cosmos.implementation.directconnectivity.ChannelAcquisitionException;
import com.azure.cosmos.implementation.routing.LocationCache;
import io.netty.handler.timeout.ReadTimeoutException;
import io.reactivex.subscribers.TestSubscriber;
import org.mockito.Mockito;
Expand Down Expand Up @@ -65,7 +66,7 @@ public void networkFailureOnRead() throws Exception {
ThrottlingRetryOptions throttlingRetryOptions = new ThrottlingRetryOptions();
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
GlobalPartitionEndpointManagerForCircuitBreaker globalPartitionEndpointManager = Mockito.mock(GlobalPartitionEndpointManagerForCircuitBreaker.class);
Mockito.doReturn(new URI("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(new LocationCache.ConsolidatedRegionalEndpoint(new URI("http://localhost"), null)).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false));
ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(mockDiagnosticsClientContext(), endpointManager, true, throttlingRetryOptions, null, globalPartitionEndpointManager);

Expand Down Expand Up @@ -106,7 +107,7 @@ public void shouldRetryOnGatewayTimeout(
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
GlobalPartitionEndpointManagerForCircuitBreaker globalPartitionEndpointManager = Mockito.mock(GlobalPartitionEndpointManagerForCircuitBreaker.class);

Mockito.doReturn(new URI("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(new LocationCache.ConsolidatedRegionalEndpoint(new URI("http://localhost"), null)).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(true));
ClientRetryPolicy clientRetryPolicy =
new ClientRetryPolicy(
Expand Down Expand Up @@ -149,7 +150,7 @@ public void tcpNetworkFailureOnRead() throws Exception {
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
GlobalPartitionEndpointManagerForCircuitBreaker globalPartitionEndpointManager = Mockito.mock(GlobalPartitionEndpointManagerForCircuitBreaker.class);

Mockito.doReturn(new URI("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(new LocationCache.ConsolidatedRegionalEndpoint(new URI("http://localhost"), null)).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false));
Mockito.doReturn(2).when(endpointManager).getPreferredLocationCount();
ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(mockDiagnosticsClientContext(), endpointManager, true, retryOptions, null, globalPartitionEndpointManager);
Expand Down Expand Up @@ -197,7 +198,7 @@ public void networkFailureOnWrite() throws Exception {
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
GlobalPartitionEndpointManagerForCircuitBreaker globalPartitionEndpointManager = Mockito.mock(GlobalPartitionEndpointManagerForCircuitBreaker.class);

Mockito.doReturn(new URI("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(new LocationCache.ConsolidatedRegionalEndpoint(new URI("http://localhost"), null)).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false));
ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(mockDiagnosticsClientContext(), endpointManager, true, throttlingRetryOptions, null, globalPartitionEndpointManager);

Expand Down Expand Up @@ -232,7 +233,7 @@ public void tcpNetworkFailureOnWrite(
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
GlobalPartitionEndpointManagerForCircuitBreaker globalPartitionEndpointManager = Mockito.mock(GlobalPartitionEndpointManagerForCircuitBreaker.class);

Mockito.doReturn(new URI("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(new LocationCache.ConsolidatedRegionalEndpoint(new URI("http://localhost"), null)).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false));
Mockito.doReturn(2).when(endpointManager).getPreferredLocationCount();
ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(mockDiagnosticsClientContext(), endpointManager, true, retryOptions, null, globalPartitionEndpointManager);
Expand Down Expand Up @@ -292,7 +293,7 @@ public void networkFailureOnUpsert() throws Exception {
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
GlobalPartitionEndpointManagerForCircuitBreaker globalPartitionEndpointManager = Mockito.mock(GlobalPartitionEndpointManagerForCircuitBreaker.class);

Mockito.doReturn(new URI("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(new LocationCache.ConsolidatedRegionalEndpoint(new URI("http://localhost"), null)).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false));
ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(mockDiagnosticsClientContext(), endpointManager, true, throttlingRetryOptions, null, globalPartitionEndpointManager);

Expand Down Expand Up @@ -325,7 +326,7 @@ public void tcpNetworkFailureOnUpsert() throws Exception {
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
GlobalPartitionEndpointManagerForCircuitBreaker globalPartitionEndpointManager = Mockito.mock(GlobalPartitionEndpointManagerForCircuitBreaker.class);

Mockito.doReturn(new URI("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(new LocationCache.ConsolidatedRegionalEndpoint(new URI("http://localhost"), null)).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false));
Mockito.doReturn(2).when(endpointManager).getPreferredLocationCount();
ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(mockDiagnosticsClientContext(), endpointManager, true, retryOptions, null, globalPartitionEndpointManager);
Expand Down Expand Up @@ -361,7 +362,7 @@ public void networkFailureOnDelete() throws Exception {
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
GlobalPartitionEndpointManagerForCircuitBreaker globalPartitionEndpointManager = Mockito.mock(GlobalPartitionEndpointManagerForCircuitBreaker.class);

Mockito.doReturn(new URI("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(new LocationCache.ConsolidatedRegionalEndpoint(new URI("http://localhost"), null)).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false));
ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(mockDiagnosticsClientContext(), endpointManager, true, throttlingRetryOptions, null, globalPartitionEndpointManager);

Expand Down Expand Up @@ -395,7 +396,7 @@ public void tcpNetworkFailureOnDelete() throws Exception {
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
GlobalPartitionEndpointManagerForCircuitBreaker globalPartitionEndpointManager = Mockito.mock(GlobalPartitionEndpointManagerForCircuitBreaker.class);

Mockito.doReturn(new URI("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(new LocationCache.ConsolidatedRegionalEndpoint(new URI("http://localhost"), null)).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false));
Mockito.doReturn(2).when(endpointManager).getPreferredLocationCount();
ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(mockDiagnosticsClientContext(), endpointManager, true, retryOptions, null, globalPartitionEndpointManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
import com.azure.cosmos.implementation.clienttelemetry.MetricCategory;
import com.azure.cosmos.implementation.clienttelemetry.TagName;
import com.azure.cosmos.implementation.directconnectivity.Protocol;
import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils;
import io.netty.handler.ssl.SslContext;
import org.testng.annotations.Test;

import java.net.URI;
Expand Down Expand Up @@ -170,12 +168,12 @@ public void http2MaxConcurrentStreams() {
@Test(groups = { "unit" })
public void thinClientEnabledTest() {
Configs config = new Configs();
assertThat(config.getThinclientEnabled()).isFalse();
assertThat(config.isThinClientEnabled()).isFalse();

System.clearProperty("COSMOS.THINCLIENT_ENABLED");
System.setProperty("COSMOS.THINCLIENT_ENABLED", "true");
try {
assertThat(config.getThinclientEnabled()).isTrue();
assertThat(config.isThinClientEnabled()).isTrue();
} finally {
System.clearProperty("COSMOS.THINCLIENT_ENABLED");
}
Expand Down
Loading