diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosItemTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosItemTest.java index bbc7e26a3104..1cc8c9676115 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosItemTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosItemTest.java @@ -104,6 +104,12 @@ public void afterClass() { @Test(groups = { "fast" }, timeOut = TIMEOUT) public void createItem() throws Exception { + // TODO @nehrao/@fabianm REMOVE BEFORE CHECK-IN + if (client.asyncClient().getConnectionPolicy().getConnectionMode() != ConnectionMode.DIRECT || + client.asyncClient().getEffectiveConsistencyLevel(OperationType.Read, null) != ConsistencyLevel.STRONG) { + + throw new SkipException("Disabled for debugging"); + } InternalObjectNode properties = getDocumentDefinition(UUID.randomUUID().toString()); CosmosItemResponse itemResponse = container.createItem(properties); assertThat(itemResponse.getRequestCharge()).isGreaterThan(0); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxGatewayStoreModelTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxGatewayStoreModelTest.java index 0ea494c8dbc6..bd26bb53a0b5 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxGatewayStoreModelTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxGatewayStoreModelTest.java @@ -274,7 +274,7 @@ public void validateApiType() throws Exception { ResourceType.Document); try { - storeModel.performRequest(dsr, HttpMethod.POST).block(); + storeModel.performRequest(dsr).block(); fail("Request should fail"); } catch (Exception e) { //no-op diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java index 3cd7ec512b00..6e19d5770ef5 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java @@ -9,6 +9,7 @@ import com.azure.cosmos.implementation.directconnectivity.WFConstants; import com.azure.cosmos.implementation.faultinjection.FaultInjectionRequestContext; import com.azure.cosmos.implementation.feedranges.FeedRangeInternal; +import com.azure.cosmos.implementation.http.HttpTransportSerializer; import com.azure.cosmos.implementation.routing.PartitionKeyInternal; import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity; import com.azure.cosmos.implementation.routing.Range; @@ -29,6 +30,9 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; + +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; /** * This is core Transport/Connection agnostic request to the Azure Cosmos DB database service. @@ -89,6 +93,8 @@ public class RxDocumentServiceRequest implements Cloneable { private volatile boolean hasFeedRangeFilteringBeenApplied = false; + private final AtomicReference httpTransportSerializer = new AtomicReference<>(null); + public boolean isReadOnlyRequest() { return this.operationType.isReadOnlyOperation(); } @@ -1238,4 +1244,23 @@ public void setThinclientHeaders(String operationType, String resourceType) { this.headers.put(HttpConstants.HttpHeaders.THINCLIENT_PROXY_OPERATION_TYPE, operationType); this.headers.put(HttpConstants.HttpHeaders.THINCLIENT_PROXY_RESOURCE_TYPE, resourceType); } + + public RxDocumentServiceRequest setHttpTransportSerializer(HttpTransportSerializer transportSerializer) { + this.httpTransportSerializer.set(transportSerializer); + + return this; + } + + public HttpTransportSerializer getEffectiveHttpTransportSerializer( + HttpTransportSerializer defaultTransportSerializer) { + + checkNotNull(defaultTransportSerializer, "Argument 'defaultTransportSerializer' must not be null."); + + HttpTransportSerializer snapshot = this.httpTransportSerializer.get(); + if (snapshot != null) { + return snapshot; + } + + return defaultTransportSerializer; + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java index 4ff506b71165..31d8271276d5 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java @@ -12,7 +12,9 @@ import com.azure.cosmos.implementation.directconnectivity.GatewayServiceConfigurationReader; import com.azure.cosmos.implementation.directconnectivity.HttpUtils; import com.azure.cosmos.implementation.directconnectivity.RequestHelper; +import com.azure.cosmos.implementation.directconnectivity.ResourceOperation; import com.azure.cosmos.implementation.directconnectivity.StoreResponse; +import com.azure.cosmos.implementation.directconnectivity.Uri; import com.azure.cosmos.implementation.directconnectivity.WebExceptionUtility; import com.azure.cosmos.implementation.faultinjection.GatewayServerErrorInjector; import com.azure.cosmos.implementation.faultinjection.IFaultInjectorProvider; @@ -20,6 +22,7 @@ import com.azure.cosmos.implementation.http.HttpHeaders; import com.azure.cosmos.implementation.http.HttpRequest; import com.azure.cosmos.implementation.http.HttpResponse; +import com.azure.cosmos.implementation.http.HttpTransportSerializer; import com.azure.cosmos.implementation.http.ReactorNettyRequestRecord; import com.azure.cosmos.implementation.routing.PartitionKeyInternal; import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper; @@ -47,6 +50,7 @@ import java.util.concurrent.Callable; import static com.azure.cosmos.implementation.HttpConstants.HttpHeaders.INTENDED_COLLECTION_RID_HEADER; +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; /** * While this class is public, but it is not part of our published public APIs. @@ -54,7 +58,7 @@ * * Used internally to provide functionality to communicate and process response from GATEWAY in the Azure Cosmos DB database service. */ -public class RxGatewayStoreModel implements RxStoreModel { +public class RxGatewayStoreModel implements RxStoreModel, HttpTransportSerializer { private static final boolean HTTP_CONNECTION_WITHOUT_TLS_ALLOWED = Configs.isHttpConnectionWithoutTLSAllowed(); private final DiagnosticsClientContext clientContext; @@ -83,29 +87,12 @@ public RxGatewayStoreModel( ApiType apiType) { this.clientContext = clientContext; - this.defaultHeaders = new HashMap<>(); - this.defaultHeaders.put(HttpConstants.HttpHeaders.CACHE_CONTROL, - "no-cache"); - this.defaultHeaders.put(HttpConstants.HttpHeaders.VERSION, - HttpConstants.Versions.CURRENT_VERSION); - this.defaultHeaders.put( - HttpConstants.HttpHeaders.SDK_SUPPORTED_CAPABILITIES, - HttpConstants.SDKSupportedCapabilities.SUPPORTED_CAPABILITIES); - - if (apiType != null) { - this.defaultHeaders.put(HttpConstants.HttpHeaders.API_TYPE, apiType.toString()); - } if (userAgentContainer == null) { userAgentContainer = new UserAgentContainer(); } - this.defaultHeaders.put(HttpConstants.HttpHeaders.USER_AGENT, userAgentContainer.getUserAgent()); - - if (defaultConsistencyLevel != null) { - this.defaultHeaders.put(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL, - defaultConsistencyLevel.toString()); - } + this.defaultHeaders = this.getDefaultHeaders(apiType, userAgentContainer, defaultConsistencyLevel); this.defaultConsistencyLevel = defaultConsistencyLevel; this.globalEndpointManager = globalEndpointManager; @@ -126,6 +113,40 @@ public RxGatewayStoreModel(RxGatewayStoreModel inner) { this.sessionContainer = inner.sessionContainer; } + protected Map getDefaultHeaders( + ApiType apiType, + UserAgentContainer userAgentContainer, + ConsistencyLevel clientDefaultConsistencyLevel) { + + checkNotNull(userAgentContainer, "Argument 'userAGentContainer' must not be null."); + + Map defaultHeaders = new HashMap<>(); + defaultHeaders.put(HttpConstants.HttpHeaders.CACHE_CONTROL, + "no-cache"); + defaultHeaders.put(HttpConstants.HttpHeaders.VERSION, + HttpConstants.Versions.CURRENT_VERSION); + defaultHeaders.put( + HttpConstants.HttpHeaders.SDK_SUPPORTED_CAPABILITIES, + HttpConstants.SDKSupportedCapabilities.SUPPORTED_CAPABILITIES); + + if (apiType != null) { + defaultHeaders.put(HttpConstants.HttpHeaders.API_TYPE, apiType.toString()); + } + + if (userAgentContainer == null) { + userAgentContainer = new UserAgentContainer(); + } + + defaultHeaders.put(HttpConstants.HttpHeaders.USER_AGENT, userAgentContainer.getUserAgent()); + + if (clientDefaultConsistencyLevel != null) { + defaultHeaders.put(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL, + clientDefaultConsistencyLevel.toString()); + } + + return defaultHeaders; + } + void setGatewayServiceConfigurationReader(GatewayServiceConfigurationReader gatewayServiceConfigurationReader) { this.gatewayServiceConfigurationReader = gatewayServiceConfigurationReader; } @@ -162,40 +183,46 @@ public void setCollectionCache(RxClientCollectionCache collectionCache) { this.collectionCache = collectionCache; } - private Mono create(RxDocumentServiceRequest request) { - return this.performRequest(request, HttpMethod.POST); - } - - private Mono patch(RxDocumentServiceRequest request) { - return this.performRequest(request, HttpMethod.PATCH); - } - - private Mono upsert(RxDocumentServiceRequest request) { - return this.performRequest(request, HttpMethod.POST); - } - - private Mono read(RxDocumentServiceRequest request) { - return this.performRequest(request, HttpMethod.GET); - } - - private Mono replace(RxDocumentServiceRequest request) { - return this.performRequest(request, HttpMethod.PUT); - } - - private Mono delete(RxDocumentServiceRequest request) { - return this.performRequest(request, HttpMethod.DELETE); - } + @Override + public HttpRequest wrapInHttpRequest(RxDocumentServiceRequest request, URI requestUri) throws Exception { + HttpMethod method = getHttpMethod(request); + HttpHeaders httpHeaders = this.getHttpRequestHeaders(request.getHeaders()); - private Mono deleteByPartitionKey(RxDocumentServiceRequest request) { - return this.performRequest(request, HttpMethod.POST); + Flux contentAsByteArray = request.getContentAsByteArrayFlux(); + return new HttpRequest(method, + requestUri, + requestUri.getPort(), + httpHeaders, + contentAsByteArray); } - private Mono execute(RxDocumentServiceRequest request) { - return this.performRequest(request, HttpMethod.POST); - } + @Override + public StoreResponse unwrapToStoreResponse( + RxDocumentServiceRequest request, + int statusCode, + HttpHeaders headers, + ByteBuf content) { + + checkNotNull(headers, "Argument 'headers' must not be null."); + checkNotNull( + content, + "Argument 'content' must not be null - use empty ByteBuf when theres is no payload."); + + // If there is any error in the header response this throws exception + validateOrThrow(request, HttpResponseStatus.valueOf(statusCode), headers, content); + + int size; + if ((size = content.readableBytes()) > 0) { + return new StoreResponse(statusCode, + HttpUtils.unescape(headers.toMap()), + new ByteBufInputStream(content, true), + size); + } - private Mono readFeed(RxDocumentServiceRequest request) { - return this.performRequest(request, HttpMethod.GET); + return new StoreResponse(statusCode, + HttpUtils.unescape(headers.toMap()), + null, + 0); } private Mono query(RxDocumentServiceRequest request) { @@ -215,10 +242,10 @@ private Mono query(RxDocumentServiceRequest request) RuntimeConstants.MediaTypes.QUERY_JSON); break; } - return this.performRequest(request, HttpMethod.POST); + return this.performRequest(request); } - public Mono performRequest(RxDocumentServiceRequest request, HttpMethod method) { + public Mono performRequest(RxDocumentServiceRequest request) { try { if (request.requestContext.cosmosDiagnostics == null) { request.requestContext.cosmosDiagnostics = clientContext.createDiagnostics(); @@ -228,10 +255,10 @@ public Mono performRequest(RxDocumentServiceRequest r request.requestContext.resourcePhysicalAddress = uri.toString(); if (this.throughputControlStore != null) { - return this.throughputControlStore.processRequest(request, Mono.defer(() -> this.performRequestInternal(request, method, uri))); + return this.throughputControlStore.processRequest(request, Mono.defer(() -> this.performRequestInternal(request, uri))); } - return this.performRequestInternal(request, method, uri); + return this.performRequestInternal(request, uri); } catch (Exception e) { return Mono.error(e); } @@ -241,23 +268,21 @@ public Mono performRequest(RxDocumentServiceRequest r * Given the request it creates an flux which upon subscription issues HTTP call and emits one RxDocumentServiceResponse. * * @param request - * @param method * @param requestUri * @return Flux */ - public Mono performRequestInternal(RxDocumentServiceRequest request, HttpMethod method, URI requestUri) { + public Mono performRequestInternal(RxDocumentServiceRequest request, URI requestUri) { try { + HttpMethod method = getHttpMethod(request); HttpHeaders httpHeaders = this.getHttpRequestHeaders(request.getHeaders()); Flux contentAsByteArray = request.getContentAsByteArrayFlux(); - HttpRequest httpRequest = new HttpRequest(method, - requestUri, - requestUri.getPort(), - httpHeaders, - contentAsByteArray); + HttpRequest httpRequest = request + .getEffectiveHttpTransportSerializer(this) + .wrapInHttpRequest(request, requestUri); Mono httpResponseMono = this.httpClient.send(httpRequest, request.getResponseTimeout()); @@ -371,23 +396,9 @@ private Mono toDocumentServiceResponse(Mono 0) { - rsp = new StoreResponse(httpResponseStatus, - HttpUtils.unescape(httpResponseHeaders.toMap()), - new ByteBufInputStream(content, true), - size); - } else { - rsp = new StoreResponse(httpResponseStatus, - HttpUtils.unescape(httpResponseHeaders.toMap()), - null, - 0); - } + StoreResponse rsp = request + .getEffectiveHttpTransportSerializer(this) + .unwrapToStoreResponse(request, httpResponseStatus, httpResponseHeaders, content); if (reactorNettyRequestRecord != null) { rsp.setRequestTimeline(reactorNettyRequestRecord.takeTimelineSnapshot()); @@ -518,28 +529,47 @@ private void validateOrThrow(RxDocumentServiceRequest request, } } - private Mono invokeAsyncInternal(RxDocumentServiceRequest request) { + private static HttpMethod getHttpMethod(RxDocumentServiceRequest request) { switch (request.getOperationType()) { case Create: case Batch: - return this.create(request); - case Patch: - return this.patch(request); case Upsert: - return this.upsert(request); + case ExecuteJavaScript: + case SqlQuery: + case Query: + case QueryPlan: + return HttpMethod.POST; + case Patch: + return HttpMethod.PATCH; case Delete: if (request.getResourceType() == ResourceType.PartitionKey) { - return this.deleteByPartitionKey(request); + return HttpMethod.POST; } - return this.delete(request); + return HttpMethod.DELETE; + case Read: + case ReadFeed: + return HttpMethod.GET; + case Replace: + return HttpMethod.PUT; + default: + throw new IllegalStateException( + "Operation type " + request.getOperationType() + " cannot be processed in RxGatewayStoreModel."); + } + } + + private Mono invokeAsyncInternal(RxDocumentServiceRequest request) { + switch (request.getOperationType()) { + case Create: + case Batch: + case Patch: + case Upsert: + case Delete: case ExecuteJavaScript: - return this.execute(request); case Read: - return this.read(request); case ReadFeed: - return this.readFeed(request); case Replace: - return this.replace(request); + return this.performRequest(request); + case SqlQuery: case Query: case QueryPlan: diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java index 4ab773509542..ad0bf54a5411 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java @@ -2,62 +2,78 @@ // Licensed under the MIT License. package com.azure.cosmos.implementation; +import com.azure.cosmos.ConsistencyLevel; import com.azure.cosmos.CosmosContainerProactiveInitConfig; -import com.azure.cosmos.implementation.directconnectivity.ProxyStoreClient; -import com.azure.cosmos.implementation.directconnectivity.StoreClient; import com.azure.cosmos.implementation.faultinjection.IFaultInjectorProvider; +import com.azure.cosmos.implementation.http.HttpClient; import com.azure.cosmos.implementation.throughputControl.ThroughputControlStore; import com.azure.cosmos.models.CosmosContainerIdentity; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.util.HashMap; import java.util.List; +import java.util.Map; + +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; /** * While this class is public, but it is not part of our published public APIs. - * This is meant to be internally used only by our sdk. + * This is meant to be internally used only by our sdk. * * Used internally to provide functionality to communicate and process response from THINCLIENT in the Azure Cosmos DB database service. */ -public class ThinClientStoreModel implements RxStoreModel { - - private final ProxyStoreClient storeClient; +public class ThinClientStoreModel extends RxGatewayStoreModel { + + public ThinClientStoreModel( + DiagnosticsClientContext clientContext, + ISessionContainer sessionContainer, + ConsistencyLevel defaultConsistencyLevel, + UserAgentContainer userAgentContainer, + GlobalEndpointManager globalEndpointManager, + HttpClient httpClient) { + super( + clientContext, + sessionContainer, + defaultConsistencyLevel, + QueryCompatibilityMode.Default, + userAgentContainer, + globalEndpointManager, + httpClient, + ApiType.SQL); + } - public ThinClientStoreModel(ProxyStoreClient storeClient) { - this.storeClient = storeClient; + public ThinClientStoreModel(ThinClientStoreModel inner) { + super(inner); } @Override public Mono processMessage(RxDocumentServiceRequest request) { // direct/gateway mode validations? session token, bad consistency level header - // set headers here? .NET sets in client - request.setThinclientHeaders(request.getOperationType().toString(), request.getResourceType().toString()); - return this.storeClient.processMessageAsync(request); - } - - @Override - public void enableThroughputControl(ThroughputControlStore throughputControlStore) { - - } - - @Override - public Flux submitOpenConnectionTasksAndInitCaches(CosmosContainerProactiveInitConfig proactiveContainerInitConfig) { - return null; + // TODO @nehrao/@fabianm FIX BEFORE CHECK-IN + // conditionally set RntbdTransportSerializer and physicalAddress here + // RntbdHttpTransportSerializer would need to create rntbdRequestArgs, then RntbdRequest from it and call encode + return super.processMessage(request); } @Override - public void configureFaultInjectorProvider(IFaultInjectorProvider injectorProvider, Configs configs) { - - } - - @Override - public void recordOpenConnectionsAndInitCachesCompleted(List cosmosContainerIdentities) { - - } - - @Override - public void recordOpenConnectionsAndInitCachesStarted(List cosmosContainerIdentities) { - + protected Map getDefaultHeaders( + ApiType apiType, + UserAgentContainer userAgentContainer, + ConsistencyLevel clientDefaultConsistencyLevel) { + + checkNotNull(userAgentContainer, "Argument 'userAGentContainer' must not be null."); + + Map defaultHeaders = new HashMap<>(); + // For ThinClient http/2 used for framing only + // All operation-level headers are only added to the rntbd-encoded message + // the thin client proxy wil parse the rntbd headers (not the content!) and substitute any + // missing headers for routing (like partitionId or replicaId) + // Since the Thin client proxy also needs to set the user-agent header to a different value + // it is not added to the rntbd headers - just http-headers in the SDK + defaultHeaders.put(HttpConstants.HttpHeaders.USER_AGENT, userAgentContainer.getUserAgent()); + + return defaultHeaders; } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ProxyStoreClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ProxyStoreClient.java deleted file mode 100644 index 751d8363b8dd..000000000000 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ProxyStoreClient.java +++ /dev/null @@ -1,278 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.azure.cosmos.implementation.directconnectivity; - -import com.azure.cosmos.BridgeInternal; -import com.azure.cosmos.ConsistencyLevel; -import com.azure.cosmos.CosmosContainerProactiveInitConfig; -import com.azure.cosmos.CosmosException; -import com.azure.cosmos.SessionRetryOptions; -import com.azure.cosmos.implementation.BackoffRetryUtility; -import com.azure.cosmos.implementation.Configs; -import com.azure.cosmos.implementation.DiagnosticsClientContext; -import com.azure.cosmos.implementation.Exceptions; -import com.azure.cosmos.implementation.HttpConstants; -import com.azure.cosmos.implementation.IAuthorizationTokenProvider; -import com.azure.cosmos.implementation.IRetryPolicy; -import com.azure.cosmos.implementation.ISessionContainer; -import com.azure.cosmos.implementation.ISessionToken; -import com.azure.cosmos.implementation.InternalServerErrorException; -import com.azure.cosmos.implementation.OperationType; -import com.azure.cosmos.implementation.RMResources; -import com.azure.cosmos.implementation.ResourceType; -import com.azure.cosmos.implementation.RxDocumentServiceRequest; -import com.azure.cosmos.implementation.RxDocumentServiceResponse; -import com.azure.cosmos.implementation.SessionTokenHelper; -import com.azure.cosmos.implementation.Strings; -import com.azure.cosmos.implementation.Utils; -import com.azure.cosmos.implementation.apachecommons.lang.math.NumberUtils; -import com.azure.cosmos.implementation.faultinjection.IFaultInjectorProvider; -import com.azure.cosmos.implementation.throughputControl.ThroughputControlStore; -import com.azure.cosmos.models.CosmosContainerIdentity; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.function.Function; - -/** - * Instantiated to issue direct connectivity requests to the backend on THINCLIENT for thinclient users. - * StoreClient uses the ReplicatedResourceClient to make requests to the backend. - */ -public class ProxyStoreClient implements IStoreClient { - private final DiagnosticsClientContext diagnosticsClientContext; - private final Logger logger = LoggerFactory.getLogger(ProxyStoreClient.class); - private final GatewayServiceConfigurationReader serviceConfigurationReader; - private final ISessionContainer sessionContainer; - private final ReplicatedResourceClient replicatedResourceClient; - private final TransportClient transportClient; - private final String ZERO_PARTITION_KEY_RANGE = "0"; - - public ProxyStoreClient( - DiagnosticsClientContext diagnosticsClientContext, - Configs configs, - IAddressResolver addressResolver, - ISessionContainer sessionContainer, - GatewayServiceConfigurationReader serviceConfigurationReader, IAuthorizationTokenProvider userTokenProvider, - TransportClient transportClient, - boolean useMultipleWriteLocations, - SessionRetryOptions sessionRetryOptions) { - this.diagnosticsClientContext = diagnosticsClientContext; - this.transportClient = transportClient; - this.sessionContainer = sessionContainer; - this.serviceConfigurationReader = serviceConfigurationReader; - this.replicatedResourceClient = new ReplicatedResourceClient( - diagnosticsClientContext, - configs, - new AddressSelector(addressResolver, configs.getProtocol()), - sessionContainer, - this.transportClient, - serviceConfigurationReader, - userTokenProvider, - useMultipleWriteLocations, - sessionRetryOptions); - - addressResolver.setOpenConnectionsProcessor(this.transportClient.getProactiveOpenConnectionsProcessor()); - } - - public void enableThroughputControl(ThroughputControlStore throughputControlStore) { - this.replicatedResourceClient.enableThroughputControl(throughputControlStore); - } - - private Mono InvokeClientAsync( - DocumentServiceRequest request, - ResourceType resourceType, - Uri physicalAddress, - CancellationToken cancellationToken) { - - } - - @Override - public Mono processMessageAsync(RxDocumentServiceRequest request, IRetryPolicy retryPolicy, Function> prepareRequestAsyncDelegate) { - if (request == null) { - throw new NullPointerException("request"); - } - - // HTTP2 transport - // serialize payload to RNTBD - - /*using (HttpResponseMessage responseMessage = await this.InvokeClientAsync(request, resourceOperation.resourceType, physicalAddress, default)) - { - return await HttpTransportClient.ProcessHttpResponse(request.ResourceAddress, string.Empty, responseMessage, physicalAddress, request); - }*/ - - Callable> storeResponseDelegate = () -> this.replicatedResourceClient.invokeAsync(request, prepareRequestAsyncDelegate); - - Mono storeResponse; - try { - storeResponse = retryPolicy != null - ? BackoffRetryUtility.executeRetry(storeResponseDelegate, retryPolicy) - : storeResponseDelegate.call(); - } catch (Exception e) { - return Mono.error(e); - } - - storeResponse = storeResponse.doOnError(e -> { - try { - Throwable unwrappedException = reactor.core.Exceptions.unwrap(e); - CosmosException exception = Utils.as(unwrappedException, CosmosException.class); - - if (exception == null) { - return; - } - - BridgeInternal.recordRetryContextEndTime(request.requestContext.cosmosDiagnostics); - exception = BridgeInternal.setCosmosDiagnostics(exception, request.requestContext.cosmosDiagnostics); - - handleUnsuccessfulStoreResponse(request, exception); - } catch (Throwable throwable) { - logger.error("Unexpected failure in handling orig [{}]", e.getMessage(), e); - logger.error("Unexpected failure in handling orig [{}] : new [{}]", e.getMessage(), throwable.getMessage(), throwable); - if (throwable instanceof Error) { - throw (Error) throwable; - } - } - } - ); - - return storeResponse.flatMap(sr -> { - try { - return Mono.just(this.completeResponse(sr, request)); - } catch (Exception e) { - return Mono.error(e); - } - }); - } - - @Override - public Flux submitOpenConnectionTasksAndInitCaches( - CosmosContainerProactiveInitConfig proactiveContainerInitConfig) { - return this.replicatedResourceClient.submitOpenConnectionTasksAndInitCaches(proactiveContainerInitConfig); - } - - public void configureFaultInjectorProvider(IFaultInjectorProvider injectorProvider) { - this.replicatedResourceClient.configureFaultInjectorProvider(injectorProvider); - } - - public void recordOpenConnectionsAndInitCachesCompleted(List cosmosContainerIdentities) { - this.replicatedResourceClient.recordOpenConnectionsAndInitCachesCompleted(cosmosContainerIdentities); - } - - public void recordOpenConnectionsAndInitCachesStarted(List cosmosContainerIdentities) { - this.replicatedResourceClient.recordOpenConnectionsAndInitCachesStarted(cosmosContainerIdentities); - } - - private void handleUnsuccessfulStoreResponse(RxDocumentServiceRequest request, CosmosException exception) { - this.updateResponseHeader(request, exception.getResponseHeaders()); - if ((!ReplicatedResourceClient.isMasterResource(request.getResourceType())) && - (Exceptions.isStatusCode(exception, HttpConstants.StatusCodes.PRECONDITION_FAILED) || Exceptions.isStatusCode(exception, HttpConstants.StatusCodes.CONFLICT) || - (Exceptions.isStatusCode(exception, HttpConstants.StatusCodes.NOTFOUND) && - !Exceptions.isSubStatusCode(exception, HttpConstants.SubStatusCodes.READ_SESSION_NOT_AVAILABLE)))) { - this.captureSessionToken(request, exception.getResponseHeaders()); - } - } - - private RxDocumentServiceResponse completeResponse( - StoreResponse storeResponse, - RxDocumentServiceRequest request) throws InternalServerErrorException { - - if (storeResponse.getResponseHeaderNames().length != storeResponse.getResponseHeaderValues().length) { - throw new InternalServerErrorException( - Exceptions.getInternalServerErrorMessage(RMResources.InvalidBackendResponse), - HttpConstants.SubStatusCodes.INVALID_BACKEND_RESPONSE); - } - - Map headers = new HashMap<>(storeResponse.getResponseHeaderNames().length); - for (int idx = 0; idx < storeResponse.getResponseHeaderNames().length; idx++) { - String name = storeResponse.getResponseHeaderNames()[idx]; - String value = storeResponse.getResponseHeaderValues()[idx]; - - headers.put(name, value); - } - - this.updateResponseHeader(request, headers); - this.captureSessionToken(request, headers); - BridgeInternal.recordRetryContextEndTime(request.requestContext.cosmosDiagnostics); - RxDocumentServiceResponse rxDocumentServiceResponse = - new RxDocumentServiceResponse(this.diagnosticsClientContext, storeResponse); - rxDocumentServiceResponse.setCosmosDiagnostics(request.requestContext.cosmosDiagnostics); - - return rxDocumentServiceResponse; - } - - private long getLSN(Map headers) { - long defaultValue = -1; - String value = headers.get(WFConstants.BackendHeaders.LSN); - - if (!Strings.isNullOrEmpty(value)) { - return NumberUtils.toLong(value, defaultValue); - - } - - return defaultValue; - } - - private void updateResponseHeader(RxDocumentServiceRequest request, Map headers) { - String requestConsistencyLevel = request.getHeaders().get(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL); - - boolean sessionConsistency = - this.serviceConfigurationReader.getDefaultConsistencyLevel() == ConsistencyLevel.SESSION || - (!Strings.isNullOrEmpty(requestConsistencyLevel) - && Strings.areEqualIgnoreCase(requestConsistencyLevel, ConsistencyLevel.SESSION.toString())); - - long storeLSN = this.getLSN(headers); - if (storeLSN == -1) { - return; - } - - String partitionKeyRangeId = headers.get(WFConstants.BackendHeaders.PARTITION_KEY_RANGE_ID); - - if (Strings.isNullOrEmpty(partitionKeyRangeId)) { - String inputSession = request.getHeaders().get(HttpConstants.HttpHeaders.SESSION_TOKEN); - if (!Strings.isNullOrEmpty(inputSession) - && inputSession.indexOf(ISessionToken.PARTITION_KEY_RANGE_SESSION_SEPARATOR) >= 1) { - partitionKeyRangeId = inputSession.substring(0, - inputSession.indexOf(ISessionToken.PARTITION_KEY_RANGE_SESSION_SEPARATOR)); - } else { - partitionKeyRangeId = ZERO_PARTITION_KEY_RANGE; - } - } - - ISessionToken sessionToken = null; - String sessionTokenResponseHeader = headers.get(HttpConstants.HttpHeaders.SESSION_TOKEN); - if (!Strings.isNullOrEmpty(sessionTokenResponseHeader)) { - sessionToken = SessionTokenHelper.parse(sessionTokenResponseHeader); - } - - if (sessionToken != null) { - headers.put(HttpConstants.HttpHeaders.SESSION_TOKEN, - SessionTokenHelper.concatPartitionKeyRangeIdWithSessionToken(partitionKeyRangeId, sessionToken.convertToString())); - } - - headers.remove(WFConstants.BackendHeaders.PARTITION_KEY_RANGE_ID); - } - - private void captureSessionToken(RxDocumentServiceRequest request, Map headers) { - if (request.getResourceType() == ResourceType.DocumentCollection - && request.getOperationType() == OperationType.Delete) { - String resourceId; - if (request.getIsNameBased()) { - resourceId = headers.get(HttpConstants.HttpHeaders.OWNER_ID); - } else { - resourceId = request.getResourceId(); - } - this.sessionContainer.clearTokenByResourceId(resourceId); - } else { - this.sessionContainer.setSessionToken(request, headers); - } - } - - // TODO RNTBD support - // https://msdata.visualstudio.com/CosmosDB/SDK/_workitems/edit/262496 -} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpTransportSerializer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpTransportSerializer.java new file mode 100644 index 000000000000..975ae4d4ca77 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpTransportSerializer.java @@ -0,0 +1,17 @@ +package com.azure.cosmos.implementation.http; + +import com.azure.cosmos.implementation.RxDocumentServiceRequest; +import com.azure.cosmos.implementation.directconnectivity.StoreResponse; +import io.netty.buffer.ByteBuf; + +import java.net.URI; + +public interface HttpTransportSerializer { + HttpRequest wrapInHttpRequest(RxDocumentServiceRequest request, URI requestUri) throws Exception; + + StoreResponse unwrapToStoreResponse( + RxDocumentServiceRequest request, + int statusCode, + HttpHeaders headers, + ByteBuf content); +}