Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
540a16d
Initial changes
aayush3011 Apr 24, 2024
6f49c75
Initial changes
aayush3011 Apr 25, 2024
97509eb
Merge branch 'main' into users/akataria/nonStreamingOrderBy
aayush3011 Apr 25, 2024
528a0eb
[Cosmos][VectorIndex]Adding changes for vectorIndex and vectorEmbeddi…
aayush3011 May 2, 2024
86b36d3
Merge branch 'main' into users/akataria/nonStreamingOrderBy
aayush3011 May 3, 2024
a979c11
Initial changes
aayush3011 May 3, 2024
e2756a5
Initial changes
aayush3011 May 3, 2024
8be2277
Initial changes
aayush3011 May 3, 2024
e491b9d
Resolving comments
aayush3011 May 7, 2024
151bb50
Fixing build issues
aayush3011 May 7, 2024
73afd5b
Merge branch 'Azure:main' into users/akataria/nonStreamingOrderBy
aayush3011 May 8, 2024
148cba5
[Cosmos][VectorSearch] Non Streaming Order By Query (#40085)
aayush3011 May 8, 2024
3b0d751
Merge branch 'Azure:main' into users/akataria/nonStreamingOrderBy
aayush3011 May 9, 2024
87572f7
Merge branch 'feature/vector_search' into users/akataria/nonStreaming…
aayush3011 May 9, 2024
df7e838
[Cosmos][VectorSearch] Non Streaming Order By Query (#40096)
aayush3011 May 9, 2024
179f904
Initial changes
aayush3011 May 9, 2024
5602e33
Merge branch 'users/akataria/nonStreamingOrderBy' of github.com:aayus…
aayush3011 May 9, 2024
36ab9b7
Merge branch 'feature/vector_search' into users/akataria/nonStreaming…
aayush3011 May 9, 2024
70639b5
Initial changes
aayush3011 May 10, 2024
c45c3a5
Fixes
aayush3011 May 10, 2024
6c255ee
Merge branch 'Azure:main' into users/akataria/nonStreamingOrderBy
aayush3011 May 10, 2024
1cadb1b
Merge branch 'feature/VectorSearch' into users/akataria/nonStreamingO…
aayush3011 May 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Initial changes
  • Loading branch information
aayush3011 committed Apr 25, 2024
commit 6f49c75162b5aab4b0cd5d39a8f05859f908d33d
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public final class CosmosQueryRequestOptionsImpl extends CosmosQueryRequestOptio
private boolean queryPlanRetrievalDisallowed;
private boolean emptyPageDiagnosticsEnabled;
private String queryName;
private Integer maxSizePerPartition;
private List<CosmosDiagnostics> cancelledRequestDiagnosticsTracker = new ArrayList<>();

/**
Expand Down Expand Up @@ -62,6 +63,7 @@ public CosmosQueryRequestOptionsImpl(CosmosQueryRequestOptionsImpl options) {
this.queryName = options.queryName;
this.feedRange = options.feedRange;
this.cancelledRequestDiagnosticsTracker = options.cancelledRequestDiagnosticsTracker;
this.maxSizePerPartition = options.maxSizePerPartition;
}

/**
Expand Down Expand Up @@ -196,6 +198,26 @@ public CosmosQueryRequestOptionsImpl setMaxItemCount(Integer maxItemCount) {
return this;
}

/**
* Gets the maximum size to fetch from a partition during non-streaming order by queries
*
* @return the max size per partition
*/
public Integer getMaxSizePerPartition() {
return maxSizePerPartition;
}

/**
* Sets the maximum size to fetch from a partition during non-streaming order by queries
*
* @param maxSizePerPartition the max size per partition
* @return the CosmosQueryRequestOptions.
*/
public CosmosQueryRequestOptionsImpl setMaxSizePerPartition(Integer maxSizePerPartition) {
this.maxSizePerPartition = maxSizePerPartition;
return this;
}

/**
* Gets the request continuation token.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import java.util.function.Function;
import java.util.function.Supplier;

public class NonStreamingOrderByDocumentProducer extends DocumentProducer<Document>{
public class NonStreamingOrderByDocumentProducer extends DocumentProducer<Document> {
private final OrderbyRowComparer<Document> consumeComparer;

NonStreamingOrderByDocumentProducer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class NonStreamingOrderByDocumentQueryExecutionContext
private final RequestChargeTracker tracker;
private final ConcurrentMap<String, QueryMetrics> queryMetricMap;
private final Collection<ClientSideRequestStatistics> clientSideRequestStatistics;
private Flux<OrderByRowResult<Document>> orderByObservable;

private int maxPageSizePerPartition;

Expand Down Expand Up @@ -112,7 +113,8 @@ public static Flux<IDocumentQueryExecutionComponent<Document>> createAsync(
initParams.getQueryInfo().getOrderBy(),
initParams.getQueryInfo().getOrderByExpressions(),
initParams.getInitialPageSize(),
collection);
collection,
ModelBridgeInternal.getMaxSizePerPartitionFromQueryRequestOptions(initParams.getCosmosQueryRequestOptions()));

return Flux.just(context);
} catch (CosmosException dce) {
Expand All @@ -124,7 +126,8 @@ private void initialize(
List<FeedRangeEpkImpl> feedRanges, List<SortOrder> sortOrders,
Collection<String> orderByExpressions,
int initialPageSize,
DocumentCollection collection) throws CosmosException {
DocumentCollection collection,
int maxSizePerPartition) throws CosmosException {
// Since the continuation token will always be null,
// we don't need to handle any initialization based on continuationToken.
// We can directly initialize without any consideration for continuationToken.
Expand All @@ -135,6 +138,14 @@ private void initialize(
initialPageSize,
new SqlQuerySpec(querySpec.getQueryText().replace(FormatPlaceHolder, True),
querySpec.getParameters()));

orderByObservable = NonStreamingOrderByUtils.nonStreamingOrderedMerge(
consumeComparer,
tracker,
documentProducers,
queryMetricMap,
maxSizePerPartition,
clientSideRequestStatistics);
}

@Override
Expand Down Expand Up @@ -169,60 +180,17 @@ protected NonStreamingOrderByDocumentProducer createDocumentProducer(

@Override
public Flux<FeedResponse<Document>> drainAsync(int maxPageSize) {
int adjustedMaxPageSize = Math.min(maxPageSize, maxPageSizePerPartition);
maxPageSizePerPartition -= adjustedMaxPageSize;
return Flux.defer(() -> {
List<Flux<OrderByRowResult<Document>>> orderedFluxes = new ArrayList<>();
for (DocumentProducer<Document> producer : documentProducers) {
Flux<OrderByRowResult<Document>> orderedFlux = producer.produceAsync()
.transform(new PageToItemTransformer(tracker, queryMetricMap,
consumeComparer.getSortOrders(), adjustedMaxPageSize));
orderedFluxes.add(orderedFlux);
}
return Flux.mergeOrdered(consumeComparer, orderedFluxes.toArray(new Flux[0]))
.transformDeferred(new ItemToPageTransformer(tracker, maxPageSize, queryMetricMap,
this.clientSideRequestStatistics));
});
return this.orderByObservable.transformDeferred(new ItemToPageTransformer(tracker,
maxPageSize,
this.queryMetricMap,
this.clientSideRequestStatistics));
}

@Override
public Flux<FeedResponse<Document>> executeAsync() {
return drainAsync(ModelBridgeInternal.getMaxItemCountFromQueryRequestOptions(cosmosQueryRequestOptions));
}

private static class PageToItemTransformer implements
Function<Flux<DocumentProducer<Document>.DocumentProducerFeedResponse>, Flux<OrderByRowResult<Document>>> {
private final RequestChargeTracker tracker;
private final Map<String, QueryMetrics> queryMetricsMap;
private final List<SortOrder> sortOrders;
private final int maxPageSize;

public PageToItemTransformer(
RequestChargeTracker tracker, Map<String, QueryMetrics> queryMetricsMap,
List<SortOrder> sortOrders, int maxPageSize) {
this.tracker = tracker;
this.queryMetricsMap = queryMetricsMap;
this.sortOrders = sortOrders;
this.maxPageSize = maxPageSize;
}

@Override
public Flux<OrderByRowResult<Document>> apply(Flux<DocumentProducer<Document>.DocumentProducerFeedResponse> source) {
return source.flatMap(documentProducerFeedResponse -> {
QueryMetrics.mergeQueryMetricsMap(queryMetricsMap,
BridgeInternal.queryMetricsFromFeedResponse(documentProducerFeedResponse.pageResult));
List<Document> results = documentProducerFeedResponse.pageResult.getResults();
tracker.addCharge(documentProducerFeedResponse.pageResult.getRequestCharge());
int pageSize = Math.min(maxPageSize, results.size());
return Flux.fromIterable(results.subList(0, pageSize))
.map(r -> new OrderByRowResult<Document>(
ModelBridgeInternal.toJsonFromJsonSerializable(r),
documentProducerFeedResponse.sourceFeedRange,
null)); // Continuation token is always null
}, 1);
}
}

private static class ItemToPageTransformer implements
Function<Flux<OrderByRowResult<Document>>, Flux<FeedResponse<Document>>> {
private final static int DEFAULT_PAGE_SIZE = 100;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package com.azure.cosmos.implementation.query;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.implementation.ClientSideRequestStatistics;
import com.azure.cosmos.implementation.Document;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.QueryMetrics;
import com.azure.cosmos.implementation.RequestChargeTracker;
import com.azure.cosmos.implementation.Resource;
import com.azure.cosmos.implementation.query.orderbyquery.OrderByRowResult;
import com.azure.cosmos.implementation.query.orderbyquery.OrderbyRowComparer;
import com.azure.cosmos.models.ModelBridgeInternal;
import reactor.core.publisher.Flux;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

public class NonStreamingOrderByUtils {
private final static
ImplementationBridgeHelpers.CosmosDiagnosticsHelper.CosmosDiagnosticsAccessor diagnosticsAccessor =
ImplementationBridgeHelpers.CosmosDiagnosticsHelper.getCosmosDiagnosticsAccessor();

public static <T extends Resource> Flux<OrderByRowResult<Document>> nonStreamingOrderedMerge(OrderbyRowComparer<Document> consumeComparer,
RequestChargeTracker tracker,
List<DocumentProducer<Document>> documentProducers,
Map<String, QueryMetrics> queryMetricsMap,
int maxSizePerPartition,
Collection<ClientSideRequestStatistics> clientSideRequestStatistics) {
@SuppressWarnings("unchecked")
Flux<OrderByRowResult<Document>>[] fluxes = documentProducers
.subList(0, documentProducers.size())
.stream()
.map(producer ->
toNonStreamingOrderByQueryResultObservable(producer, tracker, queryMetricsMap,
maxSizePerPartition, consumeComparer, clientSideRequestStatistics))
.toArray(Flux[]::new);
return Flux.mergeOrdered(consumeComparer, fluxes);
}

private static Flux<OrderByRowResult<Document>> toNonStreamingOrderByQueryResultObservable(DocumentProducer<Document> producer,
RequestChargeTracker tracker,
Map<String, QueryMetrics> queryMetricsMap,
int maxSizePerPartition,
OrderbyRowComparer<Document> consumeComparer,
Collection<ClientSideRequestStatistics> clientSideRequestStatisticsList) {
return producer
.produceAsync()
.transformDeferred(new NonStreamingOrderByUtils.PageToItemTransformer(tracker, queryMetricsMap,
maxSizePerPartition, consumeComparer, clientSideRequestStatisticsList));
}

private static class PageToItemTransformer implements
Function<Flux<DocumentProducer<Document>.DocumentProducerFeedResponse>, Flux<OrderByRowResult<Document>>> {
private final RequestChargeTracker tracker;
private final Map<String, QueryMetrics> queryMetricsMap;
private final Integer maxSizePerPartition;
private final OrderbyRowComparer<Document> consumeComparer;
private final Collection<ClientSideRequestStatistics> clientSideRequestStatistics;

private PageToItemTransformer(RequestChargeTracker tracker, Map<String, QueryMetrics> queryMetricsMap,
Integer maxSizePerPartition, OrderbyRowComparer<Document> consumeComparer,
Collection<ClientSideRequestStatistics> clientSideRequestStatistics) {
this.tracker = tracker;
this.queryMetricsMap = queryMetricsMap;
this.maxSizePerPartition = maxSizePerPartition;
this.consumeComparer = consumeComparer;
this.clientSideRequestStatistics = clientSideRequestStatistics;
}

@Override
public Flux<OrderByRowResult<Document>> apply(Flux<DocumentProducer<Document>.DocumentProducerFeedResponse> source) {
PriorityQueue<OrderByRowResult<Document>> priorityQueue = new PriorityQueue<>(consumeComparer);
AtomicBoolean emitFlag = new AtomicBoolean(true);

return source.flatMap(documentProducerFeedResponse -> {
// Checks if the max size has been reached, if so, stop processing new pages
if (emitFlag.get()) {
clientSideRequestStatistics.addAll(
diagnosticsAccessor.getClientSideRequestStatisticsForQueryPipelineAggregations(documentProducerFeedResponse
.pageResult.getCosmosDiagnostics()));

QueryMetrics.mergeQueryMetricsMap(queryMetricsMap,
BridgeInternal.queryMetricsFromFeedResponse(documentProducerFeedResponse.pageResult));
List<Document> results = documentProducerFeedResponse.pageResult.getResults();
results.forEach(r -> {
OrderByRowResult<Document> orderByRowResult = new OrderByRowResult<Document>(
ModelBridgeInternal.toJsonFromJsonSerializable(r),
documentProducerFeedResponse.sourceFeedRange,
null);
if (priorityQueue.size() < maxSizePerPartition) {
priorityQueue.add(orderByRowResult);
} else {
emitFlag.set(false);
}
});
tracker.addCharge(documentProducerFeedResponse.pageResult.getRequestCharge());
}
// Returning an empty Flux since we are only processing and managing state here
return Flux.empty();
}, 1)
.thenMany(Flux.defer(() -> Flux.fromIterable(priorityQueue)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,26 @@ CosmosQueryRequestOptions setMaxItemCount(Integer maxItemCount) {
return this;
}

/**
* Gets the maximum size to fetch from a partition during non-streaming order by queries
*
* @return the max size per partition
*/
public Integer getMaxSizePerPartition() {
return this.actualRequestOptions.getMaxSizePerPartition();
}

/**
* Sets the maximum size to fetch from a partition during non-streaming order by queries
*
* @param maxSizePerPartition the max size per partition
* @return the CosmosQueryRequestOptions.
*/
public CosmosQueryRequestOptions setMaxSizePerPartition(Integer maxSizePerPartition) {
this.actualRequestOptions.setMaxSizePerPartition(maxSizePerPartition);
return this;
}

/**
* Gets the request continuation token.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,12 @@ public static void setQueryRequestOptionsMaxItemCount(CosmosQueryRequestOptions
cosmosQueryRequestOptions.setMaxItemCount(maxItemCount);
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static void setMaxSizePerPartitionFromQueryRequestOptions(CosmosQueryRequestOptions options, Integer maxSizePerPartition) {
options.setMaxSizePerPartition(maxSizePerPartition);
}


@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static CosmosChangeFeedRequestOptions getEffectiveChangeFeedRequestOptions(
CosmosChangeFeedRequestOptions cosmosChangeFeedRequestOptions,
Expand Down Expand Up @@ -703,6 +709,11 @@ public static Integer getMaxItemCountFromQueryRequestOptions(CosmosQueryRequestO
return options.getMaxItemCount();
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static Integer getMaxSizePerPartitionFromQueryRequestOptions(CosmosQueryRequestOptions options) {
return options.getMaxSizePerPartition();
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static String getRequestContinuationFromQueryRequestOptions(CosmosQueryRequestOptions options) {
return options.getRequestContinuation();
Expand Down