Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,9 @@ public Mono<RxDocumentServiceResponse> performRequestInternal(RxDocumentServiceR

try {

// todo: neharao1 - see if the below three statements can be removed since these are part of wrapInHttpRequest
HttpMethod method = getHttpMethod(request);
HttpHeaders httpHeaders = this.getHttpRequestHeaders(request.getHeaders());

Flux<byte[]> contentAsByteArray = request.getContentAsByteArrayFlux();

HttpRequest httpRequest = request
Expand Down Expand Up @@ -654,6 +654,10 @@ public void recordOpenConnectionsAndInitCachesStarted(List<CosmosContainerIdenti
//no-op
}

public Map<String, String> getDefaultHeaders() {
return this.defaultHeaders;
}

private void captureSessionToken(RxDocumentServiceRequest request, Map<String, String> responseHeaders) {
if (request.getResourceType() == ResourceType.DocumentCollection &&
request.getOperationType() == OperationType.Delete) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,26 @@
package com.azure.cosmos.implementation;

import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosContainerProactiveInitConfig;
import com.azure.cosmos.implementation.faultinjection.IFaultInjectorProvider;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequest;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestArgs;
import com.azure.cosmos.implementation.http.HttpClient;
import com.azure.cosmos.implementation.throughputControl.ThroughputControlStore;
import com.azure.cosmos.models.CosmosContainerIdentity;
import com.azure.cosmos.implementation.http.HttpHeaders;
import com.azure.cosmos.implementation.http.HttpRequest;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpMethod;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;

/**
* While this class is public, but it is not part of our published public APIs.
* This is meant to be internally used only by our sdk.
* This is meant to be internally used only by our sdk.
*
* Used internally to provide functionality to communicate and process response from THINCLIENT in the Azure Cosmos DB database service.
*/
Expand Down Expand Up @@ -76,4 +79,53 @@ protected Map<String, String> getDefaultHeaders(

return defaultHeaders;
}

@Override
public HttpRequest wrapInHttpRequest(RxDocumentServiceRequest request, URI requestUri) throws Exception {

// todo - neharao1 - validate b/w name() v/s toString()
request.setThinclientHeaders(request.getOperationType().name(), request.getResourceType().name());

// todo - neharao1: no concept of a replica / service endpoint that can be passed
RntbdRequestArgs rntbdRequestArgs = new RntbdRequestArgs(request);

// todo - neharao1: validate what HTTP headers are needed - for now have put default ThinClient HTTP headers
// todo - based on fabianm comment - thinClient also takes op type and resource type headers as HTTP headers
HttpHeaders headers = this.getHttpHeaders();

RntbdRequest rntbdRequest = RntbdRequest.from(rntbdRequestArgs);

// todo: neharao1 - validate whether Java heap buffer is okay v/s Direct buffer
ByteBuf byteBuf = Unpooled.buffer();

// todo: comment can be removed - RntbdRequestEncoder does the same - a type of ChannelHandler in ChannelPipeline (a Netty concept)
// todo: lifting the logic from there to encode the RntbdRequest instance into a ByteBuf (ByteBuf is a network compatible format)
// todo: double-check with fabianm to see if RntbdRequest across RNTBD over TCP (Direct connectivity mode) is same as that when using ThinClient proxy
rntbdRequest.encode(byteBuf);

return new HttpRequest(
// todo: HttpMethod when using ThinClient is presumably always an HttpMethod.POST - validate this
HttpMethod.POST,
requestUri,
requestUri.getPort(),
headers,
Flux.just(byteBuf.array()));
}

// todo: neharao1 - validate if RxGatewayStoreModel#unwrapToStoreResponse can be reused
// @Override
// public StoreResponse unwrapToStoreResponse(RxDocumentServiceRequest request, int statusCode, HttpHeaders headers, ByteBuf content) {
// return null;
// }

private HttpHeaders getHttpHeaders() {
HttpHeaders httpHeaders = new HttpHeaders();
Map<String, String> defaultHeaders = this.getDefaultHeaders();

for (Map.Entry<String, String> header : defaultHeaders.entrySet()) {
httpHeaders.set(header.getKey(), header.getValue());
}

return httpHeaders;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public static RntbdRequest decode(final ByteBuf in) {
return new RntbdRequest(header, metadata, payload);
}

void encode(final ByteBuf out) {
public void encode(final ByteBuf out) {

final int expectedLength = RntbdRequestFrame.LENGTH + this.headers.computeLength();
final int start = out.writerIndex();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ public RntbdRequestArgs(final RxDocumentServiceRequest serviceRequest, final Uri
this.transportRequestId = instanceCount.incrementAndGet();
}

public RntbdRequestArgs(final RxDocumentServiceRequest serviceRequest) {
this(serviceRequest, null);
}

// region Accessors

@JsonProperty
Expand Down
Loading