Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions sdk/cosmos/azure-cosmos-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -663,5 +663,26 @@ Licensed under the MIT License.
</plugins>
</build>
</profile>
<profile>
<!-- thin client integration tests, requires thin client endpoint and key -->
<id>thinclient</id>
<properties>
<test.groups>thinclient</test.groups>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>3.5.2</version> <!-- {x-version-update;org.apache.maven.plugins:maven-failsafe-plugin;external_dependency} -->
<configuration>
<suiteXmlFiles>
<suiteXmlFile>src/test/resources/thinclient-testng.xml</suiteXmlFile>
</suiteXmlFiles>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.net.URI;
import java.util.EnumSet;

import static com.azure.cosmos.implementation.Configs.isThinClientEnabled;
import static org.assertj.core.api.Assertions.assertThat;

public class ConfigsTests {
Expand Down Expand Up @@ -165,21 +166,19 @@ public void http2MaxConcurrentStreams() {
}
}

@Test(groups = { "unit" })
@Test(groups = { "emulator" })
public void thinClientEnabledTest() {
Configs config = new Configs();
assertThat(config.getThinclientEnabled()).isFalse();

assertThat(isThinClientEnabled()).isFalse();
System.clearProperty("COSMOS.THINCLIENT_ENABLED");
System.setProperty("COSMOS.THINCLIENT_ENABLED", "true");
try {
assertThat(config.getThinclientEnabled()).isTrue();
assertThat(isThinClientEnabled()).isTrue();
} finally {
System.clearProperty("COSMOS.THINCLIENT_ENABLED");
}
}

@Test(groups = { "unit" })
@Test(groups = { "emulator" })
public void thinClientEndpointTest() {
Configs config = new Configs();
assertThat(config.getThinclientEndpoint()).isEqualTo(URI.create(""));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation;

import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.CosmosPatchOperations;
import com.azure.cosmos.models.PartitionKey;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;

import java.util.UUID;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

public class ThinClientE2ETest extends com.azure.cosmos.rx.TestSuiteBase {
@Test(groups = {"thinclient"})
public void testThinClientDocumentPointOperations() {
CosmosAsyncClient client = null;
try {
System.setProperty("COSMOS.THINCLIENT_ENABLED", "true");
System.setProperty("COSMOS.HTTP2_ENABLED", "true");

String thinclientTestEndpoint = System.getProperty("COSMOS.THINCLIENT_ENDPOINT");
String thinclientTestKey = System.getProperty("COSMOS.THINCLIENT_KEY");

client = new CosmosClientBuilder()
.endpoint(thinclientTestEndpoint)
.key(thinclientTestKey)
.gatewayMode()
.consistencyLevel(ConsistencyLevel.SESSION)
.buildAsyncClient();

CosmosAsyncContainer container = client.getDatabase("updatedd-thin-client-test-db").getContainer("thin-client-test-container-1");
ObjectMapper mapper = new ObjectMapper();
ObjectNode doc = mapper.createObjectNode();
String idValue = UUID.randomUUID().toString();
doc.put("id", idValue);
doc.put("pk", idValue);

// create
// todo: test other overloads for create
CosmosItemResponse<ObjectNode> createResponse = container.createItem(doc).block();
assertThat(createResponse.getStatusCode()).isEqualTo(201);
assertThat(createResponse.getRequestCharge()).isGreaterThan(0.0);

// read
// todo: test other overloads for read
CosmosItemResponse<ObjectNode> readResponse = container.readItem(idValue, new PartitionKey(idValue), ObjectNode.class).block();
assertThat(readResponse.getStatusCode()).isEqualTo(200);
assertThat(readResponse.getRequestCharge()).isGreaterThan(0.0);

ObjectNode doc2 = mapper.createObjectNode();
String idValue2 = UUID.randomUUID().toString();
doc2.put("id", idValue2);
doc2.put("pk", idValue);

// replace
// todo: test other overloads for replace
CosmosItemResponse<ObjectNode> replaceResponse = container.replaceItem(doc2, idValue, new PartitionKey(idValue)).block();
assertThat(replaceResponse.getStatusCode()).isEqualTo(200);
assertThat(replaceResponse.getRequestCharge()).isGreaterThan(0.0);
CosmosItemResponse<ObjectNode> readAfterReplaceResponse = container.readItem(idValue2, new PartitionKey(idValue), ObjectNode.class).block();
assertThat(readAfterReplaceResponse.getStatusCode()).isEqualTo(200);
ObjectNode replacedItemFromRead = readAfterReplaceResponse.getItem();
assertThat(replacedItemFromRead.get("id").asText()).isEqualTo(idValue2);
assertThat(replacedItemFromRead.get("pk").asText()).isEqualTo(idValue);

ObjectNode doc3 = mapper.createObjectNode();
doc3.put("id", idValue2);
doc3.put("pk", idValue);
doc3.put("newField", "newValue");

// upsert
// todo: test other overloads for upsert
CosmosItemResponse<ObjectNode> upsertResponse = container.upsertItem(doc3, new PartitionKey(idValue), new CosmosItemRequestOptions()).block();
assertThat(upsertResponse.getStatusCode()).isEqualTo(200);
assertThat(upsertResponse.getRequestCharge()).isGreaterThan(0.0);
CosmosItemResponse<ObjectNode> readAfterUpsertResponse = container.readItem(idValue2, new PartitionKey(idValue), ObjectNode.class).block();
ObjectNode upsertedItemFromRead = readAfterUpsertResponse.getItem();
assertThat(upsertedItemFromRead.get("id").asText()).isEqualTo(idValue2);
assertThat(upsertedItemFromRead.get("pk").asText()).isEqualTo(idValue);
assertThat(upsertedItemFromRead.get("newField").asText()).isEqualTo("newValue");

// patch
// todo: test other overloads for patch
CosmosPatchOperations patchOperations = CosmosPatchOperations.create();
patchOperations.add("/anotherNewField", "anotherNewValue");
patchOperations.replace("/newField", "patchedNewField");
CosmosItemResponse<ObjectNode> patchResponse = container.patchItem(idValue2, new PartitionKey(idValue), patchOperations, ObjectNode.class).block();
assertThat(patchResponse.getStatusCode()).isEqualTo(200);
assertThat(patchResponse.getRequestCharge()).isGreaterThan(0.0);
CosmosItemResponse<ObjectNode> readAfterPatchResponse = container.readItem(idValue2, new PartitionKey(idValue), ObjectNode.class).block();
ObjectNode patchedItemFromRead = readAfterPatchResponse.getItem();
assertThat(patchedItemFromRead.get("id").asText()).isEqualTo(idValue2);
assertThat(patchedItemFromRead.get("pk").asText()).isEqualTo(idValue);
assertThat(patchedItemFromRead.get("newField").asText()).isEqualTo("patchedNewField");
assertThat(patchedItemFromRead.get("anotherNewField").asText()).isEqualTo("anotherNewValue");

// delete
// todo: test other overloads for delete
CosmosItemResponse<Object> deleteResponse = container.deleteItem(idValue2, new PartitionKey(idValue)).block();
assertThat(deleteResponse.getStatusCode()).isEqualTo(204);
assertThat(deleteResponse.getRequestCharge()).isGreaterThan(0.0);
} finally {
System.clearProperty("COSMOS.THINCLIENT_ENABLED");
System.clearProperty("COSMOS.HTTP2_ENABLED");
client.close();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<!--
~ The MIT License (MIT)
~ Copyright (c) 2018 Microsoft Corporation
~
~ Permission is hereby granted, free of charge, to any person obtaining a copy
~ of this software and associated documentation files (the "Software"), to deal
~ in the Software without restriction, including without limitation the rights
~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
~ copies of the Software, and to permit persons to whom the Software is
~ furnished to do so, subject to the following conditions:
~
~ The above copyright notice and this permission notice shall be included in all
~ copies or substantial portions of the Software.
~
~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
~ SOFTWARE.
-->
<!DOCTYPE suite SYSTEM "https://testng.org/testng-1.0.dtd">
<suite name="thinclient">
<test name="thinclient" group-by-instances="true">
<groups>
<run>
<include name="thinclient"/>
</run>
</groups>
<packages>
<package name="com.azure.cosmos.*"/>
</packages>
</test>
</suite>
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ public URI getThinclientEndpoint() {
return URI.create(DEFAULT_THINCLIENT_ENDPOINT);
}

public static boolean getThinclientEnabled() {
public static boolean isThinClientEnabled() {
String valueFromSystemProperty = System.getProperty(THINCLIENT_ENABLED);
if (valueFromSystemProperty != null && !valueFromSystemProperty.isEmpty()) {
return Boolean.parseBoolean(valueFromSystemProperty);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ public static final class Properties {
public static final String Name = "name";
public static final String WRITABLE_LOCATIONS = "writableLocations";
public static final String READABLE_LOCATIONS = "readableLocations";
public static final String THINCLIENT_WRITABLE_LOCATIONS = "thinClientWritableLocations";
public static final String THINCLIENT_READABLE_LOCATIONS = "thinClientReadableLocations";
public static final String DATABASE_ACCOUNT_ENDPOINT = "databaseAccountEndpoint";

//Authorization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,24 @@ public void setReadableLocations(Iterable<DatabaseAccountLocation> locations) {
this.set(Constants.Properties.READABLE_LOCATIONS, locations);
}

/**
* Gets the list of thin client readable locations for this database account.
*
* @return the list of thin client readable locations.
*/
public Iterable<DatabaseAccountLocation> getThinClientReadableLocations() {
return super.getCollection(Constants.Properties.THINCLIENT_READABLE_LOCATIONS, DatabaseAccountLocation.class);
}

/**
* Gets the list of thin client writable locations for this database account.
*
* @return the list of thin client writable locations.
*/
public Iterable<DatabaseAccountLocation> getThinClientWritableLocations() {
return super.getCollection(Constants.Properties.THINCLIENT_WRITABLE_LOCATIONS, DatabaseAccountLocation.class);
}

/**
* Gets if enable multiple write locations is set.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ public static class HttpHeaders {
// Thinclient headers
public static final String THINCLIENT_PROXY_OPERATION_TYPE = "x-ms-thinclient-proxy-operation-type";
public static final String THINCLIENT_PROXY_RESOURCE_TYPE = "x-ms-thinclient-proxy-resource-type";

public static final String THINCLIENT_OPT_IN = "x-ms-cosmos-use-thinclient";
public static final String GLOBAL_DATABASE_ACCOUNT_NAME = "GlobalDatabaseAccountName";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ public class RxDocumentClientImpl implements AsyncDocumentClient, IAuthorization
private String firstResourceTokenFromPermissionFeed = StringUtils.EMPTY;
private RxClientCollectionCache collectionCache;
private RxGatewayStoreModel gatewayProxy;
private RxGatewayStoreModel thinProxy;
private RxStoreModel storeModel;
private GlobalAddressResolver addressResolver;
private RxPartitionKeyRangeCache partitionKeyRangeCache;
Expand Down Expand Up @@ -664,6 +665,14 @@ private void updateGatewayProxy() {
(this.gatewayProxy).setSessionContainer(this.sessionContainer);
}

private void updateThinProxy() {
(this.thinProxy).setGatewayServiceConfigurationReader(this.gatewayConfigurationReader);
(this.thinProxy).setCollectionCache(this.collectionCache);
(this.thinProxy).setPartitionKeyRangeCache(this.partitionKeyRangeCache);
(this.thinProxy).setUseMultipleWriteLocations(this.useMultipleWriteLocations);
(this.thinProxy).setSessionContainer(this.sessionContainer);
}

public void init(CosmosClientMetadataCachesSnapshot metadataCachesSnapshot, Function<HttpClient, HttpClient> httpClientInterceptor) {
try {

Expand All @@ -680,6 +689,12 @@ public void init(CosmosClientMetadataCachesSnapshot metadataCachesSnapshot, Func
this.reactorHttpClient,
this.apiType);

this.thinProxy = createThinProxy(this.sessionContainer,
this.consistencyLevel,
this.userAgentContainer,
this.globalEndpointManager,
this.reactorHttpClient);

this.globalEndpointManager.init();

DatabaseAccount databaseAccountSnapshot = this.initializeGatewayConfigurationReader();
Expand Down Expand Up @@ -707,6 +722,7 @@ public void init(CosmosClientMetadataCachesSnapshot metadataCachesSnapshot, Func
collectionCache);

updateGatewayProxy();
updateThinProxy();
clientTelemetry = new ClientTelemetry(
this,
null,
Expand Down Expand Up @@ -821,6 +837,20 @@ RxGatewayStoreModel createRxGatewayProxy(ISessionContainer sessionContainer,
apiType);
}

ThinClientStoreModel createThinProxy(ISessionContainer sessionContainer,
ConsistencyLevel consistencyLevel,
UserAgentContainer userAgentContainer,
GlobalEndpointManager globalEndpointManager,
HttpClient httpClient) {
return new ThinClientStoreModel(
this,
sessionContainer,
consistencyLevel,
userAgentContainer,
globalEndpointManager,
httpClient);
}

private HttpClient httpClient() {
HttpClientConfig httpClientConfig = new HttpClientConfig(this.configs)
.withMaxIdleConnectionTimeout(this.connectionPolicy.getIdleHttpConnectionTimeout())
Expand Down Expand Up @@ -5690,6 +5720,10 @@ public Flux<DatabaseAccount> getDatabaseAccountFromEndpoint(URI endpoint) {
return Flux.defer(() -> {
RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
OperationType.Read, ResourceType.DatabaseAccount, "", null, (Object) null);
// if thin client enabled, populate thin client header so we can get thin client read and writeable locations
if (useThinClient(request)) {
request.getHeaders().put(HttpConstants.HttpHeaders.THINCLIENT_OPT_IN, "true");
}
return this.populateHeadersAsync(request, RequestVerb.GET)
.flatMap(requestPopulated -> {

Expand Down Expand Up @@ -5721,6 +5755,10 @@ private RxStoreModel getStoreProxy(RxDocumentServiceRequest request) {
return this.gatewayProxy;
}

if (useThinClientStoreModel(request)) {
return this.thinProxy;
}

ResourceType resourceType = request.getResourceType();
OperationType operationType = request.getOperationType();

Expand Down Expand Up @@ -6746,6 +6784,16 @@ private void handleLocationCancellationExceptionForPartitionKeyRange(RxDocumentS
}
}

private boolean useThinClient(RxDocumentServiceRequest request) {
return Configs.isThinClientEnabled() && this.connectionPolicy.getConnectionMode() == ConnectionMode.GATEWAY;
}

private boolean useThinClientStoreModel(RxDocumentServiceRequest request) {
return useThinClient(request)
&& request.getResourceType() == ResourceType.Document
&& request.getOperationType().isPointOperation();
}

@FunctionalInterface
private interface DocumentPointOperation {
Mono<ResourceResponse<Document>> apply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1185,9 +1185,11 @@ public void setEffectivePartitionKey(String effectivePartitionKey) {
this.effectivePartitionKey = effectivePartitionKey;
}

public void setThinclientHeaders(String operationType, String resourceType) {
public void setThinclientHeaders(String operationType, String resourceType, String globalDatabaseAccountName, String resourceId) {
this.headers.put(HttpConstants.HttpHeaders.THINCLIENT_PROXY_OPERATION_TYPE, operationType);
this.headers.put(HttpConstants.HttpHeaders.THINCLIENT_PROXY_RESOURCE_TYPE, resourceType);
this.headers.put(HttpConstants.HttpHeaders.GLOBAL_DATABASE_ACCOUNT_NAME, globalDatabaseAccountName);
this.headers.put(WFConstants.BackendHeaders.COLLECTION_RID, resourceId);
}

public RxDocumentServiceRequest setHttpTransportSerializer(HttpTransportSerializer transportSerializer) {
Expand Down
Loading
Loading