Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ public void afterClass() {

@Test(groups = { "fast" }, timeOut = TIMEOUT)
public void createItem() throws Exception {
// TODO @nehrao/@fabianm REMOVE BEFORE CHECK-IN
if (client.asyncClient().getConnectionPolicy().getConnectionMode() != ConnectionMode.DIRECT ||
client.asyncClient().getEffectiveConsistencyLevel(OperationType.Read, null) != ConsistencyLevel.STRONG) {

throw new SkipException("Disabled for debugging");
}
InternalObjectNode properties = getDocumentDefinition(UUID.randomUUID().toString());
CosmosItemResponse<InternalObjectNode> itemResponse = container.createItem(properties);
assertThat(itemResponse.getRequestCharge()).isGreaterThan(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ public void validateApiType() throws Exception {
ResourceType.Document);

try {
storeModel.performRequest(dsr, HttpMethod.POST).block();
storeModel.performRequest(dsr).block();
fail("Request should fail");
} catch (Exception e) {
//no-op
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.azure.cosmos.implementation.directconnectivity.WFConstants;
import com.azure.cosmos.implementation.faultinjection.FaultInjectionRequestContext;
import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
import com.azure.cosmos.implementation.http.HttpTransportSerializer;
import com.azure.cosmos.implementation.routing.PartitionKeyInternal;
import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity;
import com.azure.cosmos.implementation.routing.Range;
Expand All @@ -29,6 +30,9 @@
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;

import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;

/**
* This is core Transport/Connection agnostic request to the Azure Cosmos DB database service.
Expand Down Expand Up @@ -89,6 +93,8 @@ public class RxDocumentServiceRequest implements Cloneable {

private volatile boolean hasFeedRangeFilteringBeenApplied = false;

private final AtomicReference<HttpTransportSerializer> httpTransportSerializer = new AtomicReference<>(null);

public boolean isReadOnlyRequest() {
return this.operationType.isReadOnlyOperation();
}
Expand Down Expand Up @@ -1238,4 +1244,23 @@ public void setThinclientHeaders(String operationType, String resourceType) {
this.headers.put(HttpConstants.HttpHeaders.THINCLIENT_PROXY_OPERATION_TYPE, operationType);
this.headers.put(HttpConstants.HttpHeaders.THINCLIENT_PROXY_RESOURCE_TYPE, resourceType);
}

public RxDocumentServiceRequest setHttpTransportSerializer(HttpTransportSerializer transportSerializer) {
this.httpTransportSerializer.set(transportSerializer);

return this;
}

public HttpTransportSerializer getEffectiveHttpTransportSerializer(
HttpTransportSerializer defaultTransportSerializer) {

checkNotNull(defaultTransportSerializer, "Argument 'defaultTransportSerializer' must not be null.");

HttpTransportSerializer snapshot = this.httpTransportSerializer.get();
if (snapshot != null) {
return snapshot;
}

return defaultTransportSerializer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@
import com.azure.cosmos.implementation.directconnectivity.GatewayServiceConfigurationReader;
import com.azure.cosmos.implementation.directconnectivity.HttpUtils;
import com.azure.cosmos.implementation.directconnectivity.RequestHelper;
import com.azure.cosmos.implementation.directconnectivity.ResourceOperation;
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import com.azure.cosmos.implementation.directconnectivity.Uri;
import com.azure.cosmos.implementation.directconnectivity.WebExceptionUtility;
import com.azure.cosmos.implementation.faultinjection.GatewayServerErrorInjector;
import com.azure.cosmos.implementation.faultinjection.IFaultInjectorProvider;
import com.azure.cosmos.implementation.http.HttpClient;
import com.azure.cosmos.implementation.http.HttpHeaders;
import com.azure.cosmos.implementation.http.HttpRequest;
import com.azure.cosmos.implementation.http.HttpResponse;
import com.azure.cosmos.implementation.http.HttpTransportSerializer;
import com.azure.cosmos.implementation.http.ReactorNettyRequestRecord;
import com.azure.cosmos.implementation.routing.PartitionKeyInternal;
import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper;
Expand Down Expand Up @@ -47,14 +50,15 @@
import java.util.concurrent.Callable;

import static com.azure.cosmos.implementation.HttpConstants.HttpHeaders.INTENDED_COLLECTION_RID_HEADER;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;

/**
* While this class is public, but it is not part of our published public APIs.
* This is meant to be internally used only by our sdk.
*
* Used internally to provide functionality to communicate and process response from GATEWAY in the Azure Cosmos DB database service.
*/
public class RxGatewayStoreModel implements RxStoreModel {
public class RxGatewayStoreModel implements RxStoreModel, HttpTransportSerializer {
private static final boolean HTTP_CONNECTION_WITHOUT_TLS_ALLOWED = Configs.isHttpConnectionWithoutTLSAllowed();

private final DiagnosticsClientContext clientContext;
Expand Down Expand Up @@ -83,29 +87,12 @@ public RxGatewayStoreModel(
ApiType apiType) {

this.clientContext = clientContext;
this.defaultHeaders = new HashMap<>();
this.defaultHeaders.put(HttpConstants.HttpHeaders.CACHE_CONTROL,
"no-cache");
this.defaultHeaders.put(HttpConstants.HttpHeaders.VERSION,
HttpConstants.Versions.CURRENT_VERSION);
this.defaultHeaders.put(
HttpConstants.HttpHeaders.SDK_SUPPORTED_CAPABILITIES,
HttpConstants.SDKSupportedCapabilities.SUPPORTED_CAPABILITIES);

if (apiType != null) {
this.defaultHeaders.put(HttpConstants.HttpHeaders.API_TYPE, apiType.toString());
}

if (userAgentContainer == null) {
userAgentContainer = new UserAgentContainer();
}

this.defaultHeaders.put(HttpConstants.HttpHeaders.USER_AGENT, userAgentContainer.getUserAgent());

if (defaultConsistencyLevel != null) {
this.defaultHeaders.put(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL,
defaultConsistencyLevel.toString());
}
this.defaultHeaders = this.getDefaultHeaders(apiType, userAgentContainer, defaultConsistencyLevel);

this.defaultConsistencyLevel = defaultConsistencyLevel;
this.globalEndpointManager = globalEndpointManager;
Expand All @@ -126,6 +113,40 @@ public RxGatewayStoreModel(RxGatewayStoreModel inner) {
this.sessionContainer = inner.sessionContainer;
}

protected Map<String, String> getDefaultHeaders(
ApiType apiType,
UserAgentContainer userAgentContainer,
ConsistencyLevel clientDefaultConsistencyLevel) {

checkNotNull(userAgentContainer, "Argument 'userAGentContainer' must not be null.");

Map<String, String> defaultHeaders = new HashMap<>();
defaultHeaders.put(HttpConstants.HttpHeaders.CACHE_CONTROL,
"no-cache");
defaultHeaders.put(HttpConstants.HttpHeaders.VERSION,
HttpConstants.Versions.CURRENT_VERSION);
defaultHeaders.put(
HttpConstants.HttpHeaders.SDK_SUPPORTED_CAPABILITIES,
HttpConstants.SDKSupportedCapabilities.SUPPORTED_CAPABILITIES);

if (apiType != null) {
defaultHeaders.put(HttpConstants.HttpHeaders.API_TYPE, apiType.toString());
}

if (userAgentContainer == null) {
userAgentContainer = new UserAgentContainer();
}

defaultHeaders.put(HttpConstants.HttpHeaders.USER_AGENT, userAgentContainer.getUserAgent());

if (clientDefaultConsistencyLevel != null) {
defaultHeaders.put(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL,
clientDefaultConsistencyLevel.toString());
}

return defaultHeaders;
}

void setGatewayServiceConfigurationReader(GatewayServiceConfigurationReader gatewayServiceConfigurationReader) {
this.gatewayServiceConfigurationReader = gatewayServiceConfigurationReader;
}
Expand Down Expand Up @@ -162,40 +183,46 @@ public void setCollectionCache(RxClientCollectionCache collectionCache) {
this.collectionCache = collectionCache;
}

private Mono<RxDocumentServiceResponse> create(RxDocumentServiceRequest request) {
return this.performRequest(request, HttpMethod.POST);
}

private Mono<RxDocumentServiceResponse> patch(RxDocumentServiceRequest request) {
return this.performRequest(request, HttpMethod.PATCH);
}

private Mono<RxDocumentServiceResponse> upsert(RxDocumentServiceRequest request) {
return this.performRequest(request, HttpMethod.POST);
}

private Mono<RxDocumentServiceResponse> read(RxDocumentServiceRequest request) {
return this.performRequest(request, HttpMethod.GET);
}

private Mono<RxDocumentServiceResponse> replace(RxDocumentServiceRequest request) {
return this.performRequest(request, HttpMethod.PUT);
}

private Mono<RxDocumentServiceResponse> delete(RxDocumentServiceRequest request) {
return this.performRequest(request, HttpMethod.DELETE);
}
@Override
public HttpRequest wrapInHttpRequest(RxDocumentServiceRequest request, URI requestUri) throws Exception {
HttpMethod method = getHttpMethod(request);
HttpHeaders httpHeaders = this.getHttpRequestHeaders(request.getHeaders());

private Mono<RxDocumentServiceResponse> deleteByPartitionKey(RxDocumentServiceRequest request) {
return this.performRequest(request, HttpMethod.POST);
Flux<byte[]> contentAsByteArray = request.getContentAsByteArrayFlux();
return new HttpRequest(method,
requestUri,
requestUri.getPort(),
httpHeaders,
contentAsByteArray);
}

private Mono<RxDocumentServiceResponse> execute(RxDocumentServiceRequest request) {
return this.performRequest(request, HttpMethod.POST);
}
@Override
public StoreResponse unwrapToStoreResponse(
RxDocumentServiceRequest request,
int statusCode,
HttpHeaders headers,
ByteBuf content) {

checkNotNull(headers, "Argument 'headers' must not be null.");
checkNotNull(
content,
"Argument 'content' must not be null - use empty ByteBuf when theres is no payload.");

// If there is any error in the header response this throws exception
validateOrThrow(request, HttpResponseStatus.valueOf(statusCode), headers, content);

int size;
if ((size = content.readableBytes()) > 0) {
return new StoreResponse(statusCode,
HttpUtils.unescape(headers.toMap()),
new ByteBufInputStream(content, true),
size);
}

private Mono<RxDocumentServiceResponse> readFeed(RxDocumentServiceRequest request) {
return this.performRequest(request, HttpMethod.GET);
return new StoreResponse(statusCode,
HttpUtils.unescape(headers.toMap()),
null,
0);
}

private Mono<RxDocumentServiceResponse> query(RxDocumentServiceRequest request) {
Expand All @@ -215,10 +242,10 @@ private Mono<RxDocumentServiceResponse> query(RxDocumentServiceRequest request)
RuntimeConstants.MediaTypes.QUERY_JSON);
break;
}
return this.performRequest(request, HttpMethod.POST);
return this.performRequest(request);
}

public Mono<RxDocumentServiceResponse> performRequest(RxDocumentServiceRequest request, HttpMethod method) {
public Mono<RxDocumentServiceResponse> performRequest(RxDocumentServiceRequest request) {
try {
if (request.requestContext.cosmosDiagnostics == null) {
request.requestContext.cosmosDiagnostics = clientContext.createDiagnostics();
Expand All @@ -228,10 +255,10 @@ public Mono<RxDocumentServiceResponse> performRequest(RxDocumentServiceRequest r
request.requestContext.resourcePhysicalAddress = uri.toString();

if (this.throughputControlStore != null) {
return this.throughputControlStore.processRequest(request, Mono.defer(() -> this.performRequestInternal(request, method, uri)));
return this.throughputControlStore.processRequest(request, Mono.defer(() -> this.performRequestInternal(request, uri)));
}

return this.performRequestInternal(request, method, uri);
return this.performRequestInternal(request, uri);
} catch (Exception e) {
return Mono.error(e);
}
Expand All @@ -241,23 +268,21 @@ public Mono<RxDocumentServiceResponse> performRequest(RxDocumentServiceRequest r
* Given the request it creates an flux which upon subscription issues HTTP call and emits one RxDocumentServiceResponse.
*
* @param request
* @param method
* @param requestUri
* @return Flux<RxDocumentServiceResponse>
*/
public Mono<RxDocumentServiceResponse> performRequestInternal(RxDocumentServiceRequest request, HttpMethod method, URI requestUri) {
public Mono<RxDocumentServiceResponse> performRequestInternal(RxDocumentServiceRequest request, URI requestUri) {

try {

HttpMethod method = getHttpMethod(request);
HttpHeaders httpHeaders = this.getHttpRequestHeaders(request.getHeaders());

Flux<byte[]> contentAsByteArray = request.getContentAsByteArrayFlux();

HttpRequest httpRequest = new HttpRequest(method,
requestUri,
requestUri.getPort(),
httpHeaders,
contentAsByteArray);
HttpRequest httpRequest = request
.getEffectiveHttpTransportSerializer(this)
.wrapInHttpRequest(request, requestUri);

Mono<HttpResponse> httpResponseMono = this.httpClient.send(httpRequest, request.getResponseTimeout());

Expand Down Expand Up @@ -371,23 +396,9 @@ private Mono<RxDocumentServiceResponse> toDocumentServiceResponse(Mono<HttpRespo
reactorNettyRequestRecord.setTimeCompleted(Instant.now());
}

// If there is any error in the header response this throws exception
validateOrThrow(request, HttpResponseStatus.valueOf(httpResponseStatus), httpResponseHeaders, content);

StoreResponse rsp;

int size;
if ((size = content.readableBytes()) > 0) {
rsp = new StoreResponse(httpResponseStatus,
HttpUtils.unescape(httpResponseHeaders.toMap()),
new ByteBufInputStream(content, true),
size);
} else {
rsp = new StoreResponse(httpResponseStatus,
HttpUtils.unescape(httpResponseHeaders.toMap()),
null,
0);
}
StoreResponse rsp = request
.getEffectiveHttpTransportSerializer(this)
.unwrapToStoreResponse(request, httpResponseStatus, httpResponseHeaders, content);

if (reactorNettyRequestRecord != null) {
rsp.setRequestTimeline(reactorNettyRequestRecord.takeTimelineSnapshot());
Expand Down Expand Up @@ -518,28 +529,47 @@ private void validateOrThrow(RxDocumentServiceRequest request,
}
}

private Mono<RxDocumentServiceResponse> invokeAsyncInternal(RxDocumentServiceRequest request) {
private static HttpMethod getHttpMethod(RxDocumentServiceRequest request) {
switch (request.getOperationType()) {
case Create:
case Batch:
return this.create(request);
case Patch:
return this.patch(request);
case Upsert:
return this.upsert(request);
case ExecuteJavaScript:
case SqlQuery:
case Query:
case QueryPlan:
return HttpMethod.POST;
case Patch:
return HttpMethod.PATCH;
case Delete:
if (request.getResourceType() == ResourceType.PartitionKey) {
return this.deleteByPartitionKey(request);
return HttpMethod.POST;
}
return this.delete(request);
return HttpMethod.DELETE;
case Read:
case ReadFeed:
return HttpMethod.GET;
case Replace:
return HttpMethod.PUT;
default:
throw new IllegalStateException(
"Operation type " + request.getOperationType() + " cannot be processed in RxGatewayStoreModel.");
}
}

private Mono<RxDocumentServiceResponse> invokeAsyncInternal(RxDocumentServiceRequest request) {
switch (request.getOperationType()) {
case Create:
case Batch:
case Patch:
case Upsert:
case Delete:
case ExecuteJavaScript:
return this.execute(request);
case Read:
return this.read(request);
case ReadFeed:
return this.readFeed(request);
case Replace:
return this.replace(request);
return this.performRequest(request);

case SqlQuery:
case Query:
case QueryPlan:
Expand Down
Loading
Loading