diff --git a/sdk/cosmos/azure-cosmos-tests/pom.xml b/sdk/cosmos/azure-cosmos-tests/pom.xml index 44e3349275d0..d51d15a6a6c9 100644 --- a/sdk/cosmos/azure-cosmos-tests/pom.xml +++ b/sdk/cosmos/azure-cosmos-tests/pom.xml @@ -663,5 +663,26 @@ Licensed under the MIT License. + + + thinclient + + thinclient + + + + + org.apache.maven.plugins + maven-failsafe-plugin + 3.5.2 + + + src/test/resources/thinclient-testng.xml + + + + + + diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ConfigsTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ConfigsTests.java index 8245f322aa5e..4191c9423ba1 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ConfigsTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ConfigsTests.java @@ -11,6 +11,7 @@ import java.net.URI; import java.util.EnumSet; +import static com.azure.cosmos.implementation.Configs.isThinClientEnabled; import static org.assertj.core.api.Assertions.assertThat; public class ConfigsTests { @@ -165,21 +166,19 @@ public void http2MaxConcurrentStreams() { } } - @Test(groups = { "unit" }) + @Test(groups = { "emulator" }) public void thinClientEnabledTest() { - Configs config = new Configs(); - assertThat(config.getThinclientEnabled()).isFalse(); - + assertThat(isThinClientEnabled()).isFalse(); System.clearProperty("COSMOS.THINCLIENT_ENABLED"); System.setProperty("COSMOS.THINCLIENT_ENABLED", "true"); try { - assertThat(config.getThinclientEnabled()).isTrue(); + assertThat(isThinClientEnabled()).isTrue(); } finally { System.clearProperty("COSMOS.THINCLIENT_ENABLED"); } } - @Test(groups = { "unit" }) + @Test(groups = { "emulator" }) public void thinClientEndpointTest() { Configs config = new Configs(); assertThat(config.getThinclientEndpoint()).isEqualTo(URI.create("")); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ThinClientE2ETest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ThinClientE2ETest.java new file mode 100644 index 000000000000..b1657bc6efff --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ThinClientE2ETest.java @@ -0,0 +1,112 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.implementation; + +import com.azure.cosmos.ConsistencyLevel; +import com.azure.cosmos.CosmosAsyncClient; +import com.azure.cosmos.CosmosAsyncContainer; +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.CosmosItemResponse; +import com.azure.cosmos.models.CosmosPatchOperations; +import com.azure.cosmos.models.PartitionKey; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.Test; + +import java.util.UUID; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +public class ThinClientE2ETest extends com.azure.cosmos.rx.TestSuiteBase { + @Test(groups = {"thinclient"}) + public void testThinClientDocumentPointOperations() { + CosmosAsyncClient client = null; + try { + System.setProperty("COSMOS.THINCLIENT_ENABLED", "true"); + System.setProperty("COSMOS.HTTP2_ENABLED", "true"); + + String thinclientTestEndpoint = System.getProperty("COSMOS.THINCLIENT_ENDPOINT"); + String thinclientTestKey = System.getProperty("COSMOS.THINCLIENT_KEY"); + + client = new CosmosClientBuilder() + .endpoint(thinclientTestEndpoint) + .key(thinclientTestKey) + .gatewayMode() + .consistencyLevel(ConsistencyLevel.SESSION) + .buildAsyncClient(); + + CosmosAsyncContainer container = client.getDatabase("updatedd-thin-client-test-db").getContainer("thin-client-test-container-1"); + ObjectMapper mapper = new ObjectMapper(); + ObjectNode doc = mapper.createObjectNode(); + String idValue = UUID.randomUUID().toString(); + doc.put("id", idValue); + doc.put("pk", idValue); + + // create + CosmosItemResponse createResponse = container.createItem(doc).block(); + assertThat(createResponse.getStatusCode()).isEqualTo(201); + assertThat(createResponse.getRequestCharge()).isGreaterThan(0.0); + + // read + CosmosItemResponse readResponse = container.readItem(idValue, new PartitionKey(idValue), ObjectNode.class).block(); + assertThat(readResponse.getStatusCode()).isEqualTo(200); + assertThat(readResponse.getRequestCharge()).isGreaterThan(0.0); + + ObjectNode doc2 = mapper.createObjectNode(); + String idValue2 = UUID.randomUUID().toString(); + doc2.put("id", idValue2); + doc2.put("pk", idValue); + + // replace + CosmosItemResponse replaceResponse = container.replaceItem(doc2, idValue, new PartitionKey(idValue)).block(); + assertThat(replaceResponse.getStatusCode()).isEqualTo(200); + assertThat(replaceResponse.getRequestCharge()).isGreaterThan(0.0); + CosmosItemResponse readAfterReplaceResponse = container.readItem(idValue2, new PartitionKey(idValue), ObjectNode.class).block(); + assertThat(readAfterReplaceResponse.getStatusCode()).isEqualTo(200); + ObjectNode replacedItemFromRead = readAfterReplaceResponse.getItem(); + assertThat(replacedItemFromRead.get("id").asText()).isEqualTo(idValue2); + assertThat(replacedItemFromRead.get("pk").asText()).isEqualTo(idValue); + + ObjectNode doc3 = mapper.createObjectNode(); + doc3.put("id", idValue2); + doc3.put("pk", idValue); + doc3.put("newField", "newValue"); + + // upsert + CosmosItemResponse upsertResponse = container.upsertItem(doc3, new PartitionKey(idValue), new CosmosItemRequestOptions()).block(); + assertThat(upsertResponse.getStatusCode()).isEqualTo(200); + assertThat(upsertResponse.getRequestCharge()).isGreaterThan(0.0); + CosmosItemResponse readAfterUpsertResponse = container.readItem(idValue2, new PartitionKey(idValue), ObjectNode.class).block(); + ObjectNode upsertedItemFromRead = readAfterUpsertResponse.getItem(); + assertThat(upsertedItemFromRead.get("id").asText()).isEqualTo(idValue2); + assertThat(upsertedItemFromRead.get("pk").asText()).isEqualTo(idValue); + assertThat(upsertedItemFromRead.get("newField").asText()).isEqualTo("newValue"); + + // patch + CosmosPatchOperations patchOperations = CosmosPatchOperations.create(); + patchOperations.add("/anotherNewField", "anotherNewValue"); + patchOperations.replace("/newField", "patchedNewField"); + CosmosItemResponse patchResponse = container.patchItem(idValue2, new PartitionKey(idValue), patchOperations, ObjectNode.class).block(); + assertThat(patchResponse.getStatusCode()).isEqualTo(200); + assertThat(patchResponse.getRequestCharge()).isGreaterThan(0.0); + CosmosItemResponse readAfterPatchResponse = container.readItem(idValue2, new PartitionKey(idValue), ObjectNode.class).block(); + ObjectNode patchedItemFromRead = readAfterPatchResponse.getItem(); + assertThat(patchedItemFromRead.get("id").asText()).isEqualTo(idValue2); + assertThat(patchedItemFromRead.get("pk").asText()).isEqualTo(idValue); + assertThat(patchedItemFromRead.get("newField").asText()).isEqualTo("patchedNewField"); + assertThat(patchedItemFromRead.get("anotherNewField").asText()).isEqualTo("anotherNewValue"); + + // delete + CosmosItemResponse deleteResponse = container.deleteItem(idValue2, new PartitionKey(idValue)).block(); + assertThat(deleteResponse.getStatusCode()).isEqualTo(204); + assertThat(deleteResponse.getRequestCharge()).isGreaterThan(0.0); + } finally { + System.clearProperty("COSMOS.THINCLIENT_ENABLED"); + System.clearProperty("COSMOS.HTTP2_ENABLED"); + client.close(); + } + } +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/resources/thinclient-testng.xml b/sdk/cosmos/azure-cosmos-tests/src/test/resources/thinclient-testng.xml new file mode 100644 index 000000000000..ce233b954013 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/resources/thinclient-testng.xml @@ -0,0 +1,35 @@ + + + + + + + + + + + + + + diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java index 7bb2d3099e82..a933b63155f6 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java @@ -434,7 +434,7 @@ public URI getThinclientEndpoint() { return URI.create(DEFAULT_THINCLIENT_ENDPOINT); } - public static boolean getThinclientEnabled() { + public static boolean isThinClientEnabled() { String valueFromSystemProperty = System.getProperty(THINCLIENT_ENABLED); if (valueFromSystemProperty != null && !valueFromSystemProperty.isEmpty()) { return Boolean.parseBoolean(valueFromSystemProperty); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Constants.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Constants.java index 2bb480e88dda..d4406f9b2ec8 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Constants.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Constants.java @@ -199,6 +199,8 @@ public static final class Properties { public static final String Name = "name"; public static final String WRITABLE_LOCATIONS = "writableLocations"; public static final String READABLE_LOCATIONS = "readableLocations"; + public static final String THINCLIENT_WRITABLE_LOCATIONS = "thinClientWritableLocations"; + public static final String THINCLIENT_READABLE_LOCATIONS = "thinClientReadableLocations"; public static final String DATABASE_ACCOUNT_ENDPOINT = "databaseAccountEndpoint"; //Authorization diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DatabaseAccount.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DatabaseAccount.java index 5141c1ed0864..dc906c0c1004 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DatabaseAccount.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DatabaseAccount.java @@ -251,6 +251,24 @@ public void setReadableLocations(Iterable locations) { this.set(Constants.Properties.READABLE_LOCATIONS, locations); } + /** + * Gets the list of thin client readable locations for this database account. + * + * @return the list of thin client readable locations. + */ + public Iterable getThinClientReadableLocations() { + return super.getCollection(Constants.Properties.THINCLIENT_READABLE_LOCATIONS, DatabaseAccountLocation.class); + } + + /** + * Gets the list of thin client writable locations for this database account. + * + * @return the list of thin client writable locations. + */ + public Iterable getThinClientWritableLocations() { + return super.getCollection(Constants.Properties.THINCLIENT_WRITABLE_LOCATIONS, DatabaseAccountLocation.class); + } + /** * Gets if enable multiple write locations is set. * diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java index 6ce603cb983c..d8c4075c9b23 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java @@ -286,7 +286,7 @@ public static class HttpHeaders { // Thinclient headers public static final String THINCLIENT_PROXY_OPERATION_TYPE = "x-ms-thinclient-proxy-operation-type"; public static final String THINCLIENT_PROXY_RESOURCE_TYPE = "x-ms-thinclient-proxy-resource-type"; - + public static final String THINCLIENT_OPT_IN = "x-ms-cosmos-use-thinclient"; public static final String GLOBAL_DATABASE_ACCOUNT_NAME = "GlobalDatabaseAccountName"; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index b59c27dc12fa..e980fb1e3388 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -211,6 +211,7 @@ public class RxDocumentClientImpl implements AsyncDocumentClient, IAuthorization private String firstResourceTokenFromPermissionFeed = StringUtils.EMPTY; private RxClientCollectionCache collectionCache; private RxGatewayStoreModel gatewayProxy; + private RxGatewayStoreModel thinProxy; private RxStoreModel storeModel; private GlobalAddressResolver addressResolver; private RxPartitionKeyRangeCache partitionKeyRangeCache; @@ -664,6 +665,14 @@ private void updateGatewayProxy() { (this.gatewayProxy).setSessionContainer(this.sessionContainer); } + private void updateThinProxy() { + (this.thinProxy).setGatewayServiceConfigurationReader(this.gatewayConfigurationReader); + (this.thinProxy).setCollectionCache(this.collectionCache); + (this.thinProxy).setPartitionKeyRangeCache(this.partitionKeyRangeCache); + (this.thinProxy).setUseMultipleWriteLocations(this.useMultipleWriteLocations); + (this.thinProxy).setSessionContainer(this.sessionContainer); + } + public void init(CosmosClientMetadataCachesSnapshot metadataCachesSnapshot, Function httpClientInterceptor) { try { @@ -680,6 +689,12 @@ public void init(CosmosClientMetadataCachesSnapshot metadataCachesSnapshot, Func this.reactorHttpClient, this.apiType); + this.thinProxy = createThinProxy(this.sessionContainer, + this.consistencyLevel, + this.userAgentContainer, + this.globalEndpointManager, + this.reactorHttpClient); + this.globalEndpointManager.init(); DatabaseAccount databaseAccountSnapshot = this.initializeGatewayConfigurationReader(); @@ -707,6 +722,7 @@ public void init(CosmosClientMetadataCachesSnapshot metadataCachesSnapshot, Func collectionCache); updateGatewayProxy(); + updateThinProxy(); clientTelemetry = new ClientTelemetry( this, null, @@ -821,6 +837,20 @@ RxGatewayStoreModel createRxGatewayProxy(ISessionContainer sessionContainer, apiType); } + ThinClientStoreModel createThinProxy(ISessionContainer sessionContainer, + ConsistencyLevel consistencyLevel, + UserAgentContainer userAgentContainer, + GlobalEndpointManager globalEndpointManager, + HttpClient httpClient) { + return new ThinClientStoreModel( + this, + sessionContainer, + consistencyLevel, + userAgentContainer, + globalEndpointManager, + httpClient); + } + private HttpClient httpClient() { HttpClientConfig httpClientConfig = new HttpClientConfig(this.configs) .withMaxIdleConnectionTimeout(this.connectionPolicy.getIdleHttpConnectionTimeout()) @@ -5690,6 +5720,10 @@ public Flux getDatabaseAccountFromEndpoint(URI endpoint) { return Flux.defer(() -> { RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this, OperationType.Read, ResourceType.DatabaseAccount, "", null, (Object) null); + // if thin client enabled, populate thin client header so we can get thin client read and writeable locations + if (useThinClient()) { + request.getHeaders().put(HttpConstants.HttpHeaders.THINCLIENT_OPT_IN, "true"); + } return this.populateHeadersAsync(request, RequestVerb.GET) .flatMap(requestPopulated -> { @@ -5721,6 +5755,10 @@ private RxStoreModel getStoreProxy(RxDocumentServiceRequest request) { return this.gatewayProxy; } + if (useThinClientStoreModel(request)) { + return this.thinProxy; + } + ResourceType resourceType = request.getResourceType(); OperationType operationType = request.getOperationType(); @@ -6746,6 +6784,16 @@ private void handleLocationCancellationExceptionForPartitionKeyRange(RxDocumentS } } + private boolean useThinClient() { + return Configs.isThinClientEnabled() && this.connectionPolicy.getConnectionMode() == ConnectionMode.GATEWAY; + } + + private boolean useThinClientStoreModel(RxDocumentServiceRequest request) { + return useThinClient() + && request.getResourceType() == ResourceType.Document + && request.getOperationType().isPointOperation(); + } + @FunctionalInterface private interface DocumentPointOperation { Mono> apply( diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java index 1f9f2cf2aaa3..01b9ae92a731 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java @@ -1185,9 +1185,11 @@ public void setEffectivePartitionKey(String effectivePartitionKey) { this.effectivePartitionKey = effectivePartitionKey; } - public void setThinclientHeaders(String operationType, String resourceType) { + public void setThinclientHeaders(String operationType, String resourceType, String globalDatabaseAccountName, String resourceId) { this.headers.put(HttpConstants.HttpHeaders.THINCLIENT_PROXY_OPERATION_TYPE, operationType); this.headers.put(HttpConstants.HttpHeaders.THINCLIENT_PROXY_RESOURCE_TYPE, resourceType); + this.headers.put(HttpConstants.HttpHeaders.GLOBAL_DATABASE_ACCOUNT_NAME, globalDatabaseAccountName); + this.headers.put(WFConstants.BackendHeaders.COLLECTION_RID, resourceId); } public RxDocumentServiceRequest setHttpTransportSerializer(HttpTransportSerializer transportSerializer) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java index 94c6d4a1a224..f60c215888b8 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java @@ -64,7 +64,7 @@ public class RxGatewayStoreModel implements RxStoreModel, HttpTransportSerialize private final Map defaultHeaders; private final HttpClient httpClient; private final QueryCompatibilityMode queryCompatibilityMode; - private final GlobalEndpointManager globalEndpointManager; + protected final GlobalEndpointManager globalEndpointManager; private ConsistencyLevel defaultConsistencyLevel; private ISessionContainer sessionContainer; private ThroughputControlStore throughputControlStore; @@ -315,6 +315,10 @@ private HttpHeaders getHttpRequestHeaders(Map headers) { return httpHeaders; } + public URI getRootUri(RxDocumentServiceRequest request) { + return this.globalEndpointManager.resolveServiceEndpoint(request).getGatewayRegionalEndpoint(); + } + private URI getUri(RxDocumentServiceRequest request) throws URISyntaxException { URI rootUri = request.getEndpointOverride(); if (rootUri == null) { @@ -322,7 +326,7 @@ private URI getUri(RxDocumentServiceRequest request) throws URISyntaxException { // For media read request, always use the write endpoint. rootUri = this.globalEndpointManager.getWriteEndpoints().get(0).getGatewayRegionalEndpoint(); } else { - rootUri = this.globalEndpointManager.resolveServiceEndpoint(request).getGatewayRegionalEndpoint(); + rootUri = getRootUri(request); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java index 6edfd95eab77..2e626c71d2d1 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java @@ -3,8 +3,12 @@ package com.azure.cosmos.implementation; import com.azure.cosmos.ConsistencyLevel; +import com.azure.cosmos.implementation.directconnectivity.StoreResponse; +import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdConstants; +import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdFramer; import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequest; import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestArgs; +import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdResponse; import com.azure.cosmos.implementation.http.HttpClient; import com.azure.cosmos.implementation.http.HttpHeaders; import com.azure.cosmos.implementation.http.HttpRequest; @@ -28,6 +32,8 @@ */ public class ThinClientStoreModel extends RxGatewayStoreModel { + private String globalDatabaseAccountName = null; + public ThinClientStoreModel( DiagnosticsClientContext clientContext, ISessionContainer sessionContainer, @@ -46,10 +52,6 @@ public ThinClientStoreModel( ApiType.SQL); } - public ThinClientStoreModel(ThinClientStoreModel inner) { - super(inner); - } - @Override public Mono processMessage(RxDocumentServiceRequest request) { return super.processMessage(request); @@ -76,36 +78,74 @@ protected Map getDefaultHeaders( } @Override - public HttpRequest wrapInHttpRequest(RxDocumentServiceRequest request, URI requestUri) throws Exception { + public URI getRootUri(RxDocumentServiceRequest request) { + // need to have thin client endpoint here + return this.globalEndpointManager.resolveServiceEndpoint(request).getThinclientRegionalEndpoint(); + } + + @Override + public StoreResponse unwrapToStoreResponse(RxDocumentServiceRequest request, int statusCode, HttpHeaders headers, ByteBuf content) { + if (content == null || content.readableBytes() == 0) { + return super.unwrapToStoreResponse(request, statusCode, headers, Unpooled.EMPTY_BUFFER); + } + if (RntbdFramer.canDecodeHead(content)) { + + final RntbdResponse response = RntbdResponse.decode(content); + + if (response != null) { + return super.unwrapToStoreResponse( + request, + response.getStatus().code(), + new HttpHeaders(response.getHeaders().asMap(request.getActivityId())), + response.getContent() + ); + } + return super.unwrapToStoreResponse(request, statusCode, headers, null); + } + + throw new IllegalStateException("Invalid rntbd response"); + } + + @Override + public HttpRequest wrapInHttpRequest(RxDocumentServiceRequest request, URI requestUri) throws Exception { + if (this.globalDatabaseAccountName == null) { + this.globalDatabaseAccountName = this.globalEndpointManager.getLatestDatabaseAccount().getId(); + } // todo - neharao1 - validate b/w name() v/s toString() - request.setThinclientHeaders(request.getOperationType().name(), request.getResourceType().name()); + request.setThinclientHeaders( + request.getOperationType().name(), + request.getResourceType().name(), + this.globalDatabaseAccountName, + request.getResourceId()); + + byte[] epk = request.getPartitionKeyInternal().getEffectivePartitionKeyBytes(request.getPartitionKeyInternal(), request.getPartitionKeyDefinition()); + if (request.properties == null) { + request.properties = new HashMap<>(); + } - // todo - neharao1: no concept of a replica / service endpoint that can be passed RntbdRequestArgs rntbdRequestArgs = new RntbdRequestArgs(request); - // todo - neharao1: validate what HTTP headers are needed - for now have put default ThinClient HTTP headers - // todo - based on fabianm comment - thinClient also takes op type and resource type headers as HTTP headers HttpHeaders headers = this.getHttpHeaders(); + headers.set(HttpConstants.HttpHeaders.ACTIVITY_ID, request.getActivityId().toString()); RntbdRequest rntbdRequest = RntbdRequest.from(rntbdRequestArgs); + rntbdRequest.setHeaderValue(RntbdConstants.RntbdRequestHeader.EffectivePartitionKey, epk); - // todo: neharao1 - validate whether Java heap buffer is okay v/s Direct buffer // todo: eventually need to use pooled buffer ByteBuf byteBuf = Unpooled.buffer(); - // todo: comment can be removed - RntbdRequestEncoder does the same - a type of ChannelHandler in ChannelPipeline (a Netty concept) - // todo: lifting the logic from there to encode the RntbdRequest instance into a ByteBuf (ByteBuf is a network compatible format) - // todo: double-check with fabianm to see if RntbdRequest across RNTBD over TCP (Direct connectivity mode) is same as that when using ThinClient proxy - // todo: need to conditionally add some headers (userAgent, replicaId/endpoint, etc) rntbdRequest.encode(byteBuf, true); + byte[] contentAsByteArray = new byte[byteBuf.writerIndex()]; + byteBuf.getBytes(0, contentAsByteArray, 0, byteBuf.writerIndex()); + return new HttpRequest( HttpMethod.POST, requestUri, requestUri.getPort(), headers, - Flux.just(byteBuf.array())); + Flux.just(contentAsByteArray)); } private HttpHeaders getHttpHeaders() { @@ -117,7 +157,6 @@ private HttpHeaders getHttpHeaders() { httpHeaders.set(header.getKey(), header.getValue()); } - // todo: add thin client resourcetype/operationtype headers return httpHeaders; } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdFramer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdFramer.java index 79ef876c13ab..bb4f27ee6f1b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdFramer.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdFramer.java @@ -9,12 +9,12 @@ import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; -final class RntbdFramer { +public final class RntbdFramer { private RntbdFramer() { } - static boolean canDecodeHead(final ByteBuf in) throws CorruptedFrameException { + public static boolean canDecodeHead(final ByteBuf in) throws CorruptedFrameException { checkNotNull(in, "in"); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdResponse.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdResponse.java index cd615736aebf..362a126c0f2f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdResponse.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdResponse.java @@ -318,7 +318,7 @@ public RntbdResponse touch(final Object hint) { return this; } - static RntbdResponse decode(final ByteBuf in) { + public static RntbdResponse decode(final ByteBuf in) { final int start = in.markReaderIndex().readerIndex(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdResponseHeaders.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdResponseHeaders.java index 641df31d8c4b..eb3658026ab9 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdResponseHeaders.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdResponseHeaders.java @@ -30,7 +30,7 @@ @SuppressWarnings("UnstableApiUsage") @JsonFilter("RntbdToken") -class RntbdResponseHeaders extends RntbdTokenStream { +public class RntbdResponseHeaders extends RntbdTokenStream { // region Fields @@ -225,6 +225,20 @@ public Map asMap(final String serverVersion, final UUID activity return builder.build(); } + public Map asMap(final UUID activityId) { + + final ImmutableMap.Builder builder = ImmutableMap.builderWithExpectedSize(this.computeCount(false) + 2); + builder.put(new Entry(HttpHeaders.ACTIVITY_ID, activityId.toString())); + + this.collectEntries((token, toEntry) -> { + if (token.isPresent()) { + builder.put(toEntry.apply(token)); + } + }); + + return builder.build(); + } + static RntbdResponseHeaders decode(final ByteBuf in) { final RntbdResponseHeaders headers = new RntbdResponseHeaders(in); RntbdTokenStream.decode(headers); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/LocationCache.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/LocationCache.java index ecab529cbd74..c912be751a7f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/LocationCache.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/LocationCache.java @@ -167,13 +167,15 @@ public void onDatabaseAccountRead(DatabaseAccount databaseAccount) { this.updateLocationCache( databaseAccount.getWritableLocations(), databaseAccount.getReadableLocations(), + databaseAccount.getThinClientWritableLocations(), + databaseAccount.getThinClientReadableLocations(), null, BridgeInternal.isEnableMultipleWriteLocations(databaseAccount)); } void onLocationPreferenceChanged(UnmodifiableList preferredLocations) { this.updateLocationCache( - null, null , preferredLocations, null); + null, null, null, null, preferredLocations, null); } /** @@ -547,12 +549,14 @@ public LocationUnavailabilityInfo apply(RegionalRoutingContext url, LocationUnav } private void updateLocationCache(){ - updateLocationCache(null, null, null, null); + updateLocationCache(null, null, null, null, null, null); } private void updateLocationCache( Iterable gatewayWriteLocations, Iterable gatewayReadLocations, + Iterable thinClientWriteLocations, + Iterable thinClientReadLocations, UnmodifiableList preferenceList, Boolean enableMultipleWriteLocations) { synchronized (this.lockObject) { @@ -573,7 +577,7 @@ private void updateLocationCache( if (gatewayReadLocations != null) { Utils.ValueHolder> readValueHolder = Utils.ValueHolder.initialize(nextLocationInfo.availableReadLocations); Utils.ValueHolder> readRegionMapValueHolder = Utils.ValueHolder.initialize(nextLocationInfo.regionNameByReadEndpoint); - nextLocationInfo.availableReadEndpointsByLocation = this.getEndpointsByLocation(gatewayReadLocations, readValueHolder, readRegionMapValueHolder); + nextLocationInfo.availableReadEndpointsByLocation = this.getEndpointsByLocation(gatewayReadLocations, thinClientReadLocations, readValueHolder, readRegionMapValueHolder); nextLocationInfo.availableReadLocations = readValueHolder.v; nextLocationInfo.regionNameByReadEndpoint = readRegionMapValueHolder.v; } @@ -581,7 +585,7 @@ private void updateLocationCache( if (gatewayWriteLocations != null) { Utils.ValueHolder> writeValueHolder = Utils.ValueHolder.initialize(nextLocationInfo.availableWriteLocations); Utils.ValueHolder> outWriteRegionMap = Utils.ValueHolder.initialize(nextLocationInfo.regionNameByWriteEndpoint); - nextLocationInfo.availableWriteEndpointsByLocation = this.getEndpointsByLocation(gatewayWriteLocations, writeValueHolder, outWriteRegionMap); + nextLocationInfo.availableWriteEndpointsByLocation = this.getEndpointsByLocation(gatewayWriteLocations, thinClientWriteLocations, writeValueHolder, outWriteRegionMap); nextLocationInfo.availableWriteLocations = writeValueHolder.v; nextLocationInfo.regionNameByWriteEndpoint = outWriteRegionMap.v; } @@ -679,6 +683,7 @@ private UnmodifiableList getPreferredAvailableEndpoints( private void addEndpoints( Iterable gatewayDbAccountLocations, + Iterable thinclientDbAccountLocations, Map endpointsByLocation, Map regionByEndpoint, List parsedLocations) { @@ -712,16 +717,36 @@ private void addEndpoints( } } } + + if (thinclientDbAccountLocations != null) { + for (DatabaseAccountLocation thinclientDbAccountLocation : thinclientDbAccountLocations) { + if (!Strings.isNullOrEmpty(thinclientDbAccountLocation.getName())) { + try { + String location = thinclientDbAccountLocation.getName().toLowerCase(Locale.ROOT); + URI endpoint = new URI(thinclientDbAccountLocation.getEndpoint().toLowerCase(Locale.ROOT)); + + RegionalRoutingContext regionalRoutingContext = endpointsByLocation.get(location); + regionalRoutingContext.setThinclientRegionalEndpoint(endpoint); + } catch (Exception e) { + logger.warn("Skipping add for location = [{}] and endpoint = [{}] due to exception [{}]", + thinclientDbAccountLocation.getName(), + thinclientDbAccountLocation.getEndpoint(), + e.getMessage()); + } + } + } + } } private UnmodifiableMap getEndpointsByLocation(Iterable gatewayLocations, + Iterable thinclientLocations, Utils.ValueHolder> orderedLocations, Utils.ValueHolder> regionMap) { Map endpointsByLocation = new CaseInsensitiveMap<>(); Map regionByEndpoint = new CaseInsensitiveMap<>(); List parsedLocations = new ArrayList<>(); - addEndpoints(gatewayLocations, endpointsByLocation, regionByEndpoint, parsedLocations); + addEndpoints(gatewayLocations, thinclientLocations, endpointsByLocation, regionByEndpoint, parsedLocations); orderedLocations.v = new UnmodifiableList<>(parsedLocations); regionMap.v = (UnmodifiableMap) UnmodifiableMap.unmodifiableMap(regionByEndpoint); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/RegionalRoutingContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/RegionalRoutingContext.java index 6a796bb9866a..641637d5c81b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/RegionalRoutingContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/RegionalRoutingContext.java @@ -13,26 +13,44 @@ public class RegionalRoutingContext { // when adding additional properties to this class private final URI gatewayRegionalEndpoint; private final String gatewayRegionalEndpointAsString; + private URI thinclientRegionalEndpoint; + private String thinclientRegionalEndpointAsString; public RegionalRoutingContext(URI gatewayRegionalEndpoint) { this.gatewayRegionalEndpoint = gatewayRegionalEndpoint; this.gatewayRegionalEndpointAsString = gatewayRegionalEndpoint.toString(); + this.thinclientRegionalEndpoint = null; + thinclientRegionalEndpointAsString = null; } public URI getGatewayRegionalEndpoint() { return this.gatewayRegionalEndpoint; } + public void setThinclientRegionalEndpoint(URI thinclientRegionalEndpoint) { + this.thinclientRegionalEndpoint = thinclientRegionalEndpoint; + this.thinclientRegionalEndpointAsString = thinclientRegionalEndpoint.toString(); + } + + public URI getThinclientRegionalEndpoint() { + return this.thinclientRegionalEndpoint; + } + @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; RegionalRoutingContext that = (RegionalRoutingContext) o; - return this.gatewayRegionalEndpoint.equals(that.gatewayRegionalEndpoint); + if (this.thinclientRegionalEndpoint != null) { + return this.gatewayRegionalEndpoint.equals(that.gatewayRegionalEndpoint) && + this.thinclientRegionalEndpoint.equals(that.thinclientRegionalEndpoint); + } else { + return this.gatewayRegionalEndpoint.equals(that.gatewayRegionalEndpoint); + } } @Override public int hashCode() { - return Objects.hash(this.gatewayRegionalEndpointAsString); + return Objects.hash(this.gatewayRegionalEndpointAsString, this.thinclientRegionalEndpointAsString); } } diff --git a/sdk/cosmos/live-thinclient-platform-matrix.json b/sdk/cosmos/live-thinclient-platform-matrix.json new file mode 100644 index 000000000000..9edda8429f8d --- /dev/null +++ b/sdk/cosmos/live-thinclient-platform-matrix.json @@ -0,0 +1,24 @@ +{ + "displayNames": { + "-Pthinclient": "ThinClient", + "Session": "", + "ubuntu": "", + "@{ enableMultipleWriteLocations = $true; defaultConsistencyLevel = 'Session'; enableMultipleRegions = $true }": ""}, + "include": [ + { + "DESIRED_CONSISTENCIES": "[\"Session\"]", + "ACCOUNT_CONSISTENCY": "Session", + "ArmConfig": { + "MultiMaster_MultiRegion": { + "ArmTemplateParameters": "@{ enableMultipleWriteLocations = $true; defaultConsistencyLevel = 'Session'; enableMultipleRegions = $true }", + "PREFERRED_LOCATIONS": "[\"East US 2\"]" + } + }, + "PROTOCOLS": "[\"Tcp\"]", + "ProfileFlag": [ "-Pthinclient" ], + "Agent": { + "ubuntu": { "OSVmImage": "env:LINUXVMIMAGE", "Pool": "env:LINUXPOOL" } + } + } + ] +} diff --git a/sdk/cosmos/tests.yml b/sdk/cosmos/tests.yml index 66f760673e51..e810eb0a63e7 100644 --- a/sdk/cosmos/tests.yml +++ b/sdk/cosmos/tests.yml @@ -71,6 +71,40 @@ extends: - name: AdditionalArgs value: '-DCOSMOS.CLIENT_TELEMETRY_ENDPOINT=$(cosmos-client-telemetry-endpoint) -DCOSMOS.CLIENT_TELEMETRY_COSMOS_ACCOUNT=$(cosmos-client-telemetry-cosmos-account) -DCOSMOS.HTTP2_ENABLED=true' + - template: /eng/pipelines/templates/stages/archetype-sdk-tests-isolated.yml + parameters: + TestName: 'Cosmos_Live_Test_ThinClient' + CloudConfig: + Public: + ServiceConnection: azure-sdk-tests-cosmos + MatrixConfigs: + - Name: Cosmos_live_test_thinclient + Path: sdk/cosmos/live-thinclient-platform-matrix.json + Selection: all + GenerateVMJobs: true + MatrixReplace: + - .*Version=1.21/1.17 + ServiceDirectory: cosmos + Artifacts: + - name: azure-cosmos + groupId: com.azure + safeName: azurecosmos + AdditionalModules: + - name: azure-cosmos-tests + groupId: com.azure + - name: azure-cosmos-benchmark + groupId: com.azure + TimeoutInMinutes: 210 + MaxParallel: 20 + PreSteps: + - template: /eng/pipelines/templates/steps/install-reporting-tools.yml + TestGoals: 'verify' + TestOptions: '$(ProfileFlag) $(AdditionalArgs) -DskipCompile=true -DskipTestCompile=true -DcreateSourcesJar=false' + TestResultsFiles: '**/junitreports/TEST-*.xml' + AdditionalVariables: + - name: AdditionalArgs + value: '-DCOSMOS.CLIENT_TELEMETRY_ENDPOINT=$(cosmos-client-telemetry-endpoint) -DCOSMOS.CLIENT_TELEMETRY_COSMOS_ACCOUNT=$(cosmos-client-telemetry-cosmos-account) -DCOSMOS.THINCLIENT_ENDPOINT=$(thinclient-test-endpoint) -DCOSMOS.THINCLIENT_KEY=$(thinclient-test-key)' + - template: /eng/pipelines/templates/stages/archetype-sdk-tests-isolated.yml parameters: TestName: 'Spring_Data_Cosmos_Integration'