Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
540a16d
Initial changes
aayush3011 Apr 24, 2024
6f49c75
Initial changes
aayush3011 Apr 25, 2024
97509eb
Merge branch 'main' into users/akataria/nonStreamingOrderBy
aayush3011 Apr 25, 2024
528a0eb
[Cosmos][VectorIndex]Adding changes for vectorIndex and vectorEmbeddi…
aayush3011 May 2, 2024
86b36d3
Merge branch 'main' into users/akataria/nonStreamingOrderBy
aayush3011 May 3, 2024
a979c11
Initial changes
aayush3011 May 3, 2024
e2756a5
Initial changes
aayush3011 May 3, 2024
8be2277
Initial changes
aayush3011 May 3, 2024
e491b9d
Resolving comments
aayush3011 May 7, 2024
151bb50
Fixing build issues
aayush3011 May 7, 2024
73afd5b
Merge branch 'Azure:main' into users/akataria/nonStreamingOrderBy
aayush3011 May 8, 2024
148cba5
[Cosmos][VectorSearch] Non Streaming Order By Query (#40085)
aayush3011 May 8, 2024
3b0d751
Merge branch 'Azure:main' into users/akataria/nonStreamingOrderBy
aayush3011 May 9, 2024
87572f7
Merge branch 'feature/vector_search' into users/akataria/nonStreaming…
aayush3011 May 9, 2024
df7e838
[Cosmos][VectorSearch] Non Streaming Order By Query (#40096)
aayush3011 May 9, 2024
179f904
Initial changes
aayush3011 May 9, 2024
5602e33
Merge branch 'users/akataria/nonStreamingOrderBy' of github.com:aayus…
aayush3011 May 9, 2024
36ab9b7
Merge branch 'feature/vector_search' into users/akataria/nonStreaming…
aayush3011 May 9, 2024
70639b5
Initial changes
aayush3011 May 10, 2024
c45c3a5
Fixes
aayush3011 May 10, 2024
6c255ee
Merge branch 'Azure:main' into users/akataria/nonStreamingOrderBy
aayush3011 May 10, 2024
9d427e6
Users/akataria/vectorindexing (#40117)
aayush3011 May 10, 2024
1cadb1b
Merge branch 'feature/VectorSearch' into users/akataria/nonStreamingO…
aayush3011 May 10, 2024
0f1be0c
Users/akataria/non streaming order by (#40118)
aayush3011 May 10, 2024
d4dcad2
Fixing some merge issues
aayush3011 May 10, 2024
cdaa5bc
Fixing some merge issues
aayush3011 May 10, 2024
dfa8b64
Fixing some merge issues
aayush3011 May 10, 2024
7549cbe
Resolving comments
aayush3011 May 14, 2024
c6e2376
Merge branch 'feature/VectorSearch' into users/akataria/nonStreamingO…
aayush3011 May 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Initial changes
  • Loading branch information
aayush3011 committed Apr 24, 2024
commit 540a16de033a41c6b43a44adbc2555598d3edfa7
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation.query;

import com.azure.cosmos.implementation.Document;
import com.azure.cosmos.implementation.DocumentClientRetryPolicy;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
import com.azure.cosmos.implementation.query.orderbyquery.OrderbyRowComparer;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedResponse;
import reactor.core.publisher.Mono;

import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;

public class NonStreamingOrderByDocumentProducer extends DocumentProducer<Document>{
private final OrderbyRowComparer<Document> consumeComparer;

NonStreamingOrderByDocumentProducer(
OrderbyRowComparer<Document> consumeComparer,
IDocumentQueryClient client,
String collectionResourceId,
CosmosQueryRequestOptions cosmosQueryRequestOptions,
TriFunction<FeedRangeEpkImpl, String, Integer, RxDocumentServiceRequest> createRequestFunc,
Function<RxDocumentServiceRequest, Mono<FeedResponse<Document>>> executeRequestFunc,
FeedRangeEpkImpl feedRange,
String collectionLink,
Supplier<DocumentClientRetryPolicy> createRetryPolicyFunc,
Class<Document> resourceType,
UUID correlatedActivityId,
int initialPageSize,
String initialContinuationToken,
int top,
Supplier<String> operationContextTextProvider) {
super(client, collectionResourceId, cosmosQueryRequestOptions, createRequestFunc, executeRequestFunc,
collectionLink, createRetryPolicyFunc, resourceType, correlatedActivityId, initialPageSize,
initialContinuationToken, top, feedRange, operationContextTextProvider);
this.consumeComparer = consumeComparer;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,302 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation.query;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.ClientSideRequestStatistics;
import com.azure.cosmos.implementation.DiagnosticsClientContext;
import com.azure.cosmos.implementation.Document;
import com.azure.cosmos.implementation.DocumentClientRetryPolicy;
import com.azure.cosmos.implementation.DocumentCollection;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.QueryMetrics;
import com.azure.cosmos.implementation.RequestChargeTracker;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
import com.azure.cosmos.implementation.query.orderbyquery.OrderByRowResult;
import com.azure.cosmos.implementation.query.orderbyquery.OrderbyRowComparer;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.SqlQuerySpec;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public class NonStreamingOrderByDocumentQueryExecutionContext
extends ParallelDocumentQueryExecutionContextBase<Document> {

private final static
ImplementationBridgeHelpers.CosmosDiagnosticsHelper.CosmosDiagnosticsAccessor diagnosticsAccessor =
ImplementationBridgeHelpers.CosmosDiagnosticsHelper.getCosmosDiagnosticsAccessor();

private static final ImplementationBridgeHelpers.FeedResponseHelper.FeedResponseAccessor feedResponseAccessor =
ImplementationBridgeHelpers.FeedResponseHelper.getFeedResponseAccessor();

private final static String FormatPlaceHolder = "{documentdb-formattableorderbyquery-filter}";
private final static String True = "true";

private final OrderbyRowComparer<Document> consumeComparer;
private final RequestChargeTracker tracker;
private final ConcurrentMap<String, QueryMetrics> queryMetricMap;
private final Collection<ClientSideRequestStatistics> clientSideRequestStatistics;

private int maxPageSizePerPartition;

public NonStreamingOrderByDocumentQueryExecutionContext(
DiagnosticsClientContext diagnosticsClientContext,
IDocumentQueryClient client,
ResourceType resourceTypeEnum,
SqlQuerySpec query,
CosmosQueryRequestOptions cosmosQueryRequestOptions,
String resourceLink,
String rewrittenQuery,
OrderbyRowComparer<Document> consumeComparer,
UUID correlatedActivityId,
boolean hasSelectValue,
final AtomicBoolean isQueryCancelledOnTimeout,
int maxPageSizePerPartition) {
super(diagnosticsClientContext, client, resourceTypeEnum, Document.class, query, cosmosQueryRequestOptions,
resourceLink, rewrittenQuery, correlatedActivityId, hasSelectValue, isQueryCancelledOnTimeout);
this.consumeComparer = consumeComparer;
this.tracker = new RequestChargeTracker();
this.queryMetricMap = new ConcurrentHashMap<>();
this.clientSideRequestStatistics = ConcurrentHashMap.newKeySet();
this.maxPageSizePerPartition = maxPageSizePerPartition;
}

public static Flux<IDocumentQueryExecutionComponent<Document>> createAsync(
DiagnosticsClientContext diagnosticsClientContext,
IDocumentQueryClient client,
PipelinedDocumentQueryParams<Document> initParams,
DocumentCollection collection,
int maxPageSizePerPartition) {

QueryInfo queryInfo = initParams.getQueryInfo();

NonStreamingOrderByDocumentQueryExecutionContext context = new NonStreamingOrderByDocumentQueryExecutionContext(
diagnosticsClientContext,
client,
initParams.getResourceTypeEnum(),
initParams.getQuery(),
initParams.getCosmosQueryRequestOptions(),
initParams.getResourceLink(),
initParams.getQueryInfo().getRewrittenQuery(),
new OrderbyRowComparer<>(queryInfo.getOrderBy()),
initParams.getCorrelatedActivityId(),
queryInfo.hasSelectValue(),
initParams.isQueryCancelledOnTimeout(),
maxPageSizePerPartition);

context.setTop(initParams.getTop());

try {
context.initialize(
initParams.getFeedRanges(),
initParams.getQueryInfo().getOrderBy(),
initParams.getQueryInfo().getOrderByExpressions(),
initParams.getInitialPageSize(),
collection);

return Flux.just(context);
} catch (CosmosException dce) {
return Flux.error(dce);
}
}

private void initialize(
List<FeedRangeEpkImpl> feedRanges, List<SortOrder> sortOrders,
Collection<String> orderByExpressions,
int initialPageSize,
DocumentCollection collection) throws CosmosException {
// Since the continuation token will always be null,
// we don't need to handle any initialization based on continuationToken.
// We can directly initialize without any consideration for continuationToken.
super.initialize(collection,
feedRanges.stream().collect(Collectors.toMap(
feedRangeEpk -> feedRangeEpk,
feedRangeEpk -> null)),
initialPageSize,
new SqlQuerySpec(querySpec.getQueryText().replace(FormatPlaceHolder, True),
querySpec.getParameters()));
}

@Override
protected NonStreamingOrderByDocumentProducer createDocumentProducer(
String collectionRid,
String continuationToken,
int initialPageSize,
CosmosQueryRequestOptions cosmosQueryRequestOptions,
SqlQuerySpec querySpecForInit,
Map<String, String> commonRequestHeaders,
TriFunction<FeedRangeEpkImpl, String, Integer, RxDocumentServiceRequest> createRequestFunc,
Function<RxDocumentServiceRequest, Mono<FeedResponse<Document>>> executeFunc,
Supplier<DocumentClientRetryPolicy> createRetryPolicyFunc,
FeedRangeEpkImpl feedRange) {
return new NonStreamingOrderByDocumentProducer(
consumeComparer,
client,
collectionRid,
cosmosQueryRequestOptions,
createRequestFunc,
executeFunc,
feedRange,
collectionRid,
createRetryPolicyFunc,
Document.class,
correlatedActivityId,
maxPageSizePerPartition,
continuationToken,
top,
this.getOperationContextTextProvider());
}

@Override
public Flux<FeedResponse<Document>> drainAsync(int maxPageSize) {
int adjustedMaxPageSize = Math.min(maxPageSize, maxPageSizePerPartition);
maxPageSizePerPartition -= adjustedMaxPageSize;
return Flux.defer(() -> {
List<Flux<OrderByRowResult<Document>>> orderedFluxes = new ArrayList<>();
for (DocumentProducer<Document> producer : documentProducers) {
Flux<OrderByRowResult<Document>> orderedFlux = producer.produceAsync()
.transform(new PageToItemTransformer(tracker, queryMetricMap,
consumeComparer.getSortOrders(), adjustedMaxPageSize));
orderedFluxes.add(orderedFlux);
}
return Flux.mergeOrdered(consumeComparer, orderedFluxes.toArray(new Flux[0]))
.transformDeferred(new ItemToPageTransformer(tracker, maxPageSize, queryMetricMap,
this.clientSideRequestStatistics));
});
}

@Override
public Flux<FeedResponse<Document>> executeAsync() {
return drainAsync(ModelBridgeInternal.getMaxItemCountFromQueryRequestOptions(cosmosQueryRequestOptions));
}

private static class PageToItemTransformer implements
Function<Flux<DocumentProducer<Document>.DocumentProducerFeedResponse>, Flux<OrderByRowResult<Document>>> {
private final RequestChargeTracker tracker;
private final Map<String, QueryMetrics> queryMetricsMap;
private final List<SortOrder> sortOrders;
private final int maxPageSize;

public PageToItemTransformer(
RequestChargeTracker tracker, Map<String, QueryMetrics> queryMetricsMap,
List<SortOrder> sortOrders, int maxPageSize) {
this.tracker = tracker;
this.queryMetricsMap = queryMetricsMap;
this.sortOrders = sortOrders;
this.maxPageSize = maxPageSize;
}

@Override
public Flux<OrderByRowResult<Document>> apply(Flux<DocumentProducer<Document>.DocumentProducerFeedResponse> source) {
return source.flatMap(documentProducerFeedResponse -> {
QueryMetrics.mergeQueryMetricsMap(queryMetricsMap,
BridgeInternal.queryMetricsFromFeedResponse(documentProducerFeedResponse.pageResult));
List<Document> results = documentProducerFeedResponse.pageResult.getResults();
tracker.addCharge(documentProducerFeedResponse.pageResult.getRequestCharge());
int pageSize = Math.min(maxPageSize, results.size());
return Flux.fromIterable(results.subList(0, pageSize))
.map(r -> new OrderByRowResult<Document>(
ModelBridgeInternal.toJsonFromJsonSerializable(r),
documentProducerFeedResponse.sourceFeedRange,
null)); // Continuation token is always null
}, 1);
}
}

private static class ItemToPageTransformer implements
Function<Flux<OrderByRowResult<Document>>, Flux<FeedResponse<Document>>> {
private final static int DEFAULT_PAGE_SIZE = 100;
private final RequestChargeTracker tracker;
private final int maxPageSize;
private final ConcurrentMap<String, QueryMetrics> queryMetricMap;
private final Collection<ClientSideRequestStatistics> clientSideRequestStatistics;

public ItemToPageTransformer(RequestChargeTracker tracker,
int maxPageSize,
ConcurrentMap<String, QueryMetrics> queryMetricsMap,
Collection<ClientSideRequestStatistics> clientSideRequestStatistics) {
this.tracker = tracker;
this.maxPageSize = maxPageSize > 0 ? maxPageSize : DEFAULT_PAGE_SIZE;
this.queryMetricMap = queryMetricsMap;
this.clientSideRequestStatistics = clientSideRequestStatistics;
}

private static Map<String, String> headerResponse(double requestCharge) {
return Utils.immutableMapOf(HttpConstants.HttpHeaders.REQUEST_CHARGE, String.valueOf(requestCharge));
}

@Override
public Flux<FeedResponse<Document>> apply(Flux<OrderByRowResult<Document>> source) {
return source
.window(maxPageSize).map(Flux::collectList)
.flatMap(resultListObs -> resultListObs, 1)
.map(orderByRowResults -> {
// construct a page from result with request charge
FeedResponse<OrderByRowResult<Document>> feedResponse = feedResponseAccessor.createFeedResponse(
orderByRowResults,
headerResponse(tracker.getAndResetCharge()),
null);
if (!queryMetricMap.isEmpty()) {
for (Map.Entry<String, QueryMetrics> entry : queryMetricMap.entrySet()) {
BridgeInternal.putQueryMetricsIntoMap(feedResponse,
entry.getKey(),
entry.getValue());
}
}
return feedResponse;
})
.concatWith(Flux.defer(() -> {
return Flux.just(feedResponseAccessor.createFeedResponse(Utils.immutableListOf(),
null, null));
}))
.map(feedOfOrderByRowResults -> {
List<Document> unwrappedResults = new ArrayList<>();
for (OrderByRowResult<Document> orderByRowResult : feedOfOrderByRowResults.getResults()) {
unwrappedResults.add(orderByRowResult.getPayload());
}
FeedResponse<Document> feedResponse = BridgeInternal.createFeedResponseWithQueryMetrics(unwrappedResults,
feedOfOrderByRowResults.getResponseHeaders(),
BridgeInternal.queryMetricsFromFeedResponse(feedOfOrderByRowResults),
ModelBridgeInternal.getQueryPlanDiagnosticsContext(feedOfOrderByRowResults),
false,
false, feedOfOrderByRowResults.getCosmosDiagnostics());
diagnosticsAccessor.addClientSideDiagnosticsToFeed(
feedResponse.getCosmosDiagnostics(), clientSideRequestStatistics);
return feedResponse;
}).switchIfEmpty(Flux.defer(() -> {
// create an empty page if there is no result
FeedResponse<Document> frp = BridgeInternal.createFeedResponseWithQueryMetrics(Utils.immutableListOf(),
headerResponse(
tracker.getAndResetCharge()),
queryMetricMap,
null,
false,
false,
null);
diagnosticsAccessor.addClientSideDiagnosticsToFeed(
frp.getCosmosDiagnostics(), clientSideRequestStatistics);
return Flux.just(frp);
}));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ public static <T> Flux<IDocumentQueryExecutionComponent<T>> createAsync(
|| queryInfo.hasAggregates()
|| queryInfo.hasGroupBy()
|| queryInfo.hasDCount()
|| queryInfo.hasDistinct()),
|| queryInfo.hasDistinct()
|| queryInfo.hasNonStreamingOrderBy()),
initParams.isQueryCancelledOnTimeout());
context.setTop(initParams.getTop());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,17 @@ private static BiFunction<String, PipelinedDocumentQueryParams<Document>, Flux<I
createBaseComponentFunction = (continuationToken, documentQueryParams) -> {
CosmosQueryRequestOptions orderByCosmosQueryRequestOptions =
qryOptAccessor.clone(requestOptions);
ModelBridgeInternal.setQueryRequestOptionsContinuationToken(orderByCosmosQueryRequestOptions, continuationToken);
qryOptAccessor.getImpl(orderByCosmosQueryRequestOptions).setItemFactoryMethod(null);

documentQueryParams.setCosmosQueryRequestOptions(orderByCosmosQueryRequestOptions);

return OrderByDocumentQueryExecutionContext.createAsync(diagnosticsClientContext, client, documentQueryParams, collection);
if (queryInfo.hasNonStreamingOrderBy()) {
qryOptAccessor.getImpl(orderByCosmosQueryRequestOptions).setItemFactoryMethod(null);
documentQueryParams.setCosmosQueryRequestOptions(orderByCosmosQueryRequestOptions);
return NonStreamingOrderByDocumentQueryExecutionContext.createAsync(diagnosticsClientContext, client, documentQueryParams, collection, 1000);
} else {
ModelBridgeInternal.setQueryRequestOptionsContinuationToken(orderByCosmosQueryRequestOptions, continuationToken);
qryOptAccessor.getImpl(orderByCosmosQueryRequestOptions).setItemFactoryMethod(null);
documentQueryParams.setCosmosQueryRequestOptions(orderByCosmosQueryRequestOptions);
return OrderByDocumentQueryExecutionContext.createAsync(diagnosticsClientContext, client, documentQueryParams, collection);
}
};
} else {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public Flux<FeedResponse<T>> executeAsync() {
}

private static QueryInfo validateQueryInfo(QueryInfo queryInfo) {
if (queryInfo.hasOrderBy() || queryInfo.hasAggregates() || queryInfo.hasGroupBy()) {
if (queryInfo.hasOrderBy() || queryInfo.hasAggregates() || queryInfo.hasGroupBy() || queryInfo.hasNonStreamingOrderBy()) {
// Any query with order by, aggregates or group by needs to go through the Document query pipeline
throw new IllegalStateException("This query must not use the simple query pipeline.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public static <T> Flux<PipelinedQueryExecutionContextBase<T>> createAsync(
|| queryInfo.hasAggregates()
|| queryInfo.hasGroupBy()
|| queryInfo.hasDCount()
|| queryInfo.hasDistinct()) {
|| queryInfo.hasDistinct()
|| queryInfo.hasNonStreamingOrderBy()) {

return PipelinedDocumentQueryExecutionContext.createAsyncCore(
diagnosticsClientContext,
Expand Down
Loading