Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,33 @@ public void buildClient_withCustomGatewayConnectionConfig() {
safeCloseSyncClient(cosmosClient);
}

@Test(groups = { "emulator" })
public void buildClient_withThinclientGatewayConnectionConfig() {
GatewayConnectionConfig gatewayConnectionConfig = new GatewayConnectionConfig();
gatewayConnectionConfig.setThinClientEnabled(true);
final List<String> preferredRegions = new ArrayList<>();
preferredRegions.add("West US");
CosmosClientBuilder cosmosClientBuilder = new CosmosClientBuilder()
.endpoint(TestConfigurations.HOST)
.key(TestConfigurations.MASTER_KEY)
.preferredRegions(preferredRegions)
.userAgentSuffix("custom-gateway-client")
.multipleWriteRegionsEnabled(false)
.endpointDiscoveryEnabled(false)
.readRequestsFallbackEnabled(true)
.gatewayMode(gatewayConnectionConfig);

CosmosClient cosmosClient = cosmosClientBuilder.buildClient();

AsyncDocumentClient asyncDocumentClient =
CosmosBridgeInternal.getAsyncDocumentClient(cosmosClient);
ConnectionPolicy connectionPolicy = asyncDocumentClient.getConnectionPolicy();
assertThat(connectionPolicy.getConnectionMode()).isEqualTo(ConnectionMode.GATEWAY);
assertThat(connectionPolicy.getThinclientEnabled()).isEqualTo(true);
validateGatewayConnectionConfig(connectionPolicy, cosmosClientBuilder, gatewayConnectionConfig);
safeCloseSyncClient(cosmosClient);
}

@Test(groups = { "emulator" })
public void buildClient_withDefaultDirectConnectionConfig() {
DirectConnectionConfig directConnectionConfig = DirectConnectionConfig.getDefaultConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,12 @@ public URI getServiceEndpoint() {
throw new RuntimeException();
}
}

@Override
public URI getThinclientEndpoint() {
// From env var right now. Mock will be updated when thin client endpoint discovery is finalized
return URI.create("testThinclientEndpoint");
}
}

private static <T> Stream<T> toStream(Iterable<T> iterable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ public void beforeSuite() {

logger.info("beforeSuite Started");

// thin client endpoint discovery will not be controlled by env var long term, this is a temp solution
System.setProperty("COSMOS.THINCLIENT_ENDPOINT", "testThinclientEndpoint");
try (CosmosAsyncClient houseKeepingClient = createGatewayHouseKeepingDocumentClient(true).buildAsyncClient()) {
CosmosDatabaseForTest dbForTest = CosmosDatabaseForTest.create(DatabaseManagerImpl.getInstance(houseKeepingClient));
SHARED_DATABASE = dbForTest.createdDatabase;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public final class GatewayConnectionConfig {
private int maxConnectionPoolSize;
private Duration idleConnectionTimeout;
private ProxyOptions proxy;
private boolean thinclientEnabled;

/**
* Constructor.
Expand All @@ -32,6 +33,7 @@ public GatewayConnectionConfig() {
this.idleConnectionTimeout = DEFAULT_IDLE_CONNECTION_TIMEOUT;
this.maxConnectionPoolSize = Configs.getDefaultHttpPoolSize();
this.networkRequestTimeout = DEFAULT_NETWORK_REQUEST_TIMEOUT;
this.thinclientEnabled = false;
}

/**
Expand All @@ -43,6 +45,13 @@ public static GatewayConnectionConfig getDefaultConfig() {
return new GatewayConnectionConfig();
}

public boolean getThinClientEnabled() { return this.thinclientEnabled; }

public GatewayConnectionConfig setThinClientEnabled(final boolean enabled) {
this.thinclientEnabled = enabled;
return this;
}

/**
* Gets the network request timeout interval (time to wait for response from network peer).
* The default is 60 seconds.
Expand Down Expand Up @@ -147,6 +156,7 @@ public String toString() {
", networkRequestTimeout=" + networkRequestTimeout +
", proxyType=" + proxyType +
", inetSocketProxyAddress=" + proxyAddress +
", thinclientEnabled=" + thinclientEnabled +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,8 @@ public Builder withOperationPolicies(List<CosmosOperationPolicy> operationPolici
*/
boolean isContentResponseOnWriteEnabled();

URI getThinclientEndpoint();

/**
* Gets the connection policy
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
import org.slf4j.LoggerFactory;

import javax.net.ssl.SSLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Locale;
import java.util.Objects;

import static com.azure.cosmos.implementation.guava25.base.MoreObjects.firstNonNull;
import static com.azure.cosmos.implementation.guava25.base.Strings.emptyToNull;
Expand Down Expand Up @@ -44,6 +47,7 @@ public class Configs {

private static final String UNAVAILABLE_LOCATIONS_EXPIRATION_TIME_IN_SECONDS = "COSMOS.UNAVAILABLE_LOCATIONS_EXPIRATION_TIME_IN_SECONDS";
private static final String GLOBAL_ENDPOINT_MANAGER_INITIALIZATION_TIME_IN_SECONDS = "COSMOS.GLOBAL_ENDPOINT_MANAGER_MAX_INIT_TIME_IN_SECONDS";
private static final String THINCLIENT_ENDPOINT = "COSMOS.THINCLIENT_ENDPOINT"; // Environment variable for now

private static final String MAX_HTTP_BODY_LENGTH_IN_BYTES = "COSMOS.MAX_HTTP_BODY_LENGTH_IN_BYTES";
private static final String MAX_HTTP_INITIAL_LINE_LENGTH_IN_BYTES = "COSMOS.MAX_HTTP_INITIAL_LINE_LENGTH_IN_BYTES";
Expand Down Expand Up @@ -360,6 +364,12 @@ public int getGlobalEndpointManagerMaxInitializationTimeInSeconds() {
return getJVMConfigAsInt(GLOBAL_ENDPOINT_MANAGER_INITIALIZATION_TIME_IN_SECONDS, DEFAULT_GLOBAL_ENDPOINT_MANAGER_INITIALIZATION_TIME_IN_SECONDS);
}

// Temporary. Thinclient endpoint discovery to be done through GetDatabaseAccount API
public URI getThinclientEndpoint() {
String uriString = System.getProperty("COSMOS.THINCLIENT_ENDPOINT");
return URI.create(Objects.requireNonNullElse(uriString, "testThinClientEndpoint"));
}

public int getUnavailableLocationsExpirationTimeInSeconds() {
return getJVMConfigAsInt(UNAVAILABLE_LOCATIONS_EXPIRATION_TIME_IN_SECONDS, DEFAULT_UNAVAILABLE_LOCATIONS_EXPIRATION_TIME_IN_SECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public final class ConnectionPolicy {
private Duration httpNetworkRequestTimeout;
private ProxyOptions proxy;
private Duration idleHttpConnectionTimeout;
private boolean isThinclientEnabled;

// Direct connection config properties
private Duration connectTimeout;
Expand Down Expand Up @@ -92,6 +93,7 @@ private ConnectionPolicy(
.getDirectConnectionConfigAccessor()
.getIoThreadPriority(directConnectionConfig);
this.idleHttpConnectionTimeout = gatewayConnectionConfig.getIdleConnectionTimeout();
this.isThinclientEnabled = gatewayConnectionConfig.getThinClientEnabled();
this.maxConnectionPoolSize = gatewayConnectionConfig.getMaxConnectionPoolSize();
this.httpNetworkRequestTimeout = BridgeInternal.getNetworkRequestTimeoutFromGatewayConnectionConfig(gatewayConnectionConfig);
this.proxy = gatewayConnectionConfig.getProxy();
Expand Down Expand Up @@ -269,6 +271,13 @@ public ConnectionPolicy setIdleHttpConnectionTimeout(Duration idleHttpConnection
return this;
}

/**
* Gets whether thin client has been enabled for gateway.
*
* @return true if enabled, false otherwise
*/
public boolean getThinclientEnabled() { return this.isThinclientEnabled; }

/**
* Gets the idle tcp connection timeout for direct client
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,10 @@ public interface DatabaseAccountManagerInternal {
*/
URI getServiceEndpoint();

/**
* Gets the thin client endpoint
*
* @return thin client endpoint
*/
URI getThinclientEndpoint();
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public class GlobalEndpointManager implements AutoCloseable {
private final AtomicBoolean isRefreshing;
private final AtomicBoolean refreshInBackground;
private final Scheduler scheduler = Schedulers.newSingle(theadFactory);
private final URI thinclientEndpoint;
private final boolean isThinClientEnabled;
private volatile boolean isClosed;
private AtomicBoolean firstTimeDatabaseAccountInitialization = new AtomicBoolean(true);
private volatile DatabaseAccount latestDatabaseAccount;
Expand Down Expand Up @@ -70,6 +72,8 @@ public GlobalEndpointManager(DatabaseAccountManagerInternal owner, ConnectionPol

this.owner = owner;
this.defaultEndpoint = owner.getServiceEndpoint();
this.thinclientEndpoint = owner.getThinclientEndpoint();
this.isThinClientEnabled = connectionPolicy.getThinclientEnabled();
this.connectionPolicy = connectionPolicy;

this.isRefreshing = new AtomicBoolean(false);
Expand Down Expand Up @@ -328,7 +332,9 @@ private Mono<Void> startRefreshLocationTimerAsync(boolean initialization) {
}

logger.debug("startRefreshLocationTimerAsync() - Invoking refresh, I was registered on [{}]", now);
Mono<DatabaseAccount> databaseAccountObs = GlobalEndpointManager.getDatabaseAccountFromAnyLocationsAsync(this.defaultEndpoint, new ArrayList<>(this.getEffectivePreferredRegions()),
Mono<DatabaseAccount> databaseAccountObs = GlobalEndpointManager.getDatabaseAccountFromAnyLocationsAsync(
this.defaultEndpoint,
new ArrayList<>(this.getEffectivePreferredRegions()),
this::getDatabaseAccountAsync);

return databaseAccountObs.flatMap(dbAccount -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ public class RxDocumentClientImpl implements AsyncDocumentClient, IAuthorization
private final static Logger logger = LoggerFactory.getLogger(RxDocumentClientImpl.class);
private final String masterKeyOrResourceToken;
private final URI serviceEndpoint;
private final URI thinclientEndpoint;
private final ConnectionPolicy connectionPolicy;
private final ConsistencyLevel consistencyLevel;
private final BaseAuthorizationTokenProvider authorizationTokenProvider;
Expand Down Expand Up @@ -488,6 +489,7 @@ private RxDocumentClientImpl(URI serviceEndpoint,
this.configs = configs;
this.masterKeyOrResourceToken = masterKeyOrResourceToken;
this.serviceEndpoint = serviceEndpoint;
this.thinclientEndpoint = configs.getThinclientEndpoint();
this.credential = credential;
this.tokenCredential = tokenCredential;
this.contentResponseOnWriteEnabled = contentResponseOnWriteEnabled;
Expand Down Expand Up @@ -794,6 +796,9 @@ public URI getServiceEndpoint() {
return RxDocumentClientImpl.this.getServiceEndpoint();
}

@Override
public URI getThinclientEndpoint() { return RxDocumentClientImpl.this.getThinclientEndpoint(); }

@Override
public Flux<DatabaseAccount> getDatabaseAccountFromEndpoint(URI endpoint) {
logger.info("Getting database account endpoint from {}", endpoint);
Expand Down Expand Up @@ -863,6 +868,11 @@ public URI getServiceEndpoint() {
return this.serviceEndpoint;
}

@Override
public URI getThinclientEndpoint() {
return this.thinclientEndpoint;
}

@Override
public ConnectionPolicy getConnectionPolicy() {
return this.connectionPolicy;
Expand Down
Loading