diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 4ea8a84052bd..8518f5db2855 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -4,6 +4,10 @@ #### Features Added +* Added support for non-streaming OrderBy query and a query feature `NonStreamingOrderBy` to support Vector Search queries. - See [PR 39897](https://github.com/Azure/azure-sdk-for-java/pull/39897/) + +* Added support for non-streaming OrderBy query and a query feature `NonStreamingOrderBy` to support Vector Search queries. - See [PR 39897](https://github.com/Azure/azure-sdk-for-java/pull/39897/) + #### Breaking Changes #### Bugs Fixed diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java index da368de5360d..941cc81c1b8d 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java @@ -166,6 +166,13 @@ public class Configs { public static final String MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED = "COSMOS.MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED"; private static final int DEFAULT_MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED = 1; + private static final String MAX_ITEM_COUNT_FOR_VECTOR_SEARCH = "COSMOS.MAX_ITEM_SIZE_FOR_VECTOR_SEARCH"; + private static final int DEFAULT_MAX_ITEM_COUNT_FOR_VECTOR_SEARCH = 50000; + + private static final String AZURE_COSMOS_DISABLE_NON_STREAMING_ORDER_BY = "COSMOS.AZURE_COSMOS_DISABLE_NON_STREAMING_ORDER_BY"; + + private static final boolean DEFAULT_AZURE_COSMOS_DISABLE_NON_STREAMING_ORDER_BY = false; + public static final int MIN_MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED = 1; public static final String TCP_CONNECTION_ACQUISITION_TIMEOUT_IN_MS = "COSMOS.TCP_CONNECTION_ACQUISITION_TIMEOUT_IN_MS"; @@ -484,6 +491,20 @@ public static int getMaxRetriesInLocalRegionWhenRemoteRegionPreferred() { MIN_MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED); } + public static int getMaxItemCountForVectorSearch() { + return Integer.parseInt(System.getProperty(MAX_ITEM_COUNT_FOR_VECTOR_SEARCH, + firstNonNull( + emptyToNull(System.getenv().get(MAX_ITEM_COUNT_FOR_VECTOR_SEARCH)), + String.valueOf(DEFAULT_MAX_ITEM_COUNT_FOR_VECTOR_SEARCH)))); + } + + public static boolean getAzureCosmosNonStreamingOrderByDisabled() { + return Boolean.parseBoolean(System.getProperty(AZURE_COSMOS_DISABLE_NON_STREAMING_ORDER_BY, + firstNonNull( + emptyToNull(System.getenv().get(AZURE_COSMOS_DISABLE_NON_STREAMING_ORDER_BY)), + String.valueOf(DEFAULT_AZURE_COSMOS_DISABLE_NON_STREAMING_ORDER_BY)))); + } + public static Duration getMinRetryTimeInLocalRegionWhenRemoteRegionPreferred() { return Duration.ofMillis(Math.max( diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosQueryRequestOptionsImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosQueryRequestOptionsImpl.java index 4dc62089acc5..bd30ee28382e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosQueryRequestOptionsImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosQueryRequestOptionsImpl.java @@ -25,6 +25,7 @@ public final class CosmosQueryRequestOptionsImpl extends CosmosQueryRequestOptio private boolean queryPlanRetrievalDisallowed; private boolean emptyPageDiagnosticsEnabled; private String queryName; + private Integer maxItemCountForVectorSearch; private List cancelledRequestDiagnosticsTracker = new ArrayList<>(); /** @@ -62,6 +63,7 @@ public CosmosQueryRequestOptionsImpl(CosmosQueryRequestOptionsImpl options) { this.queryName = options.queryName; this.feedRange = options.feedRange; this.cancelledRequestDiagnosticsTracker = options.cancelledRequestDiagnosticsTracker; + this.maxItemCountForVectorSearch = options.maxItemCountForVectorSearch; } /** @@ -196,6 +198,26 @@ public CosmosQueryRequestOptionsImpl setMaxItemCount(Integer maxItemCount) { return this; } + /** + * Gets the maximum item size to fetch during non-streaming order by queries. + * + * @return the max number of items for vector search. + */ + public Integer getMaxItemCountForVectorSearch() { + return this.maxItemCountForVectorSearch; + } + + /** + * Sets the maximum item size to fetch during non-streaming order by queries. + * + * @param maxItemCountForVectorSearch the max number of items for vector search. + * return the CosmosQueryRequestOptions. + */ + public CosmosQueryRequestOptionsImpl setMaxItemCountForVectorSearch(Integer maxItemCountForVectorSearch) { + this.maxItemCountForVectorSearch = maxItemCountForVectorSearch; + return this; + } + /** * Gets the request continuation token. * diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java index db2bfd16f871..16b8f5ef39b5 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java @@ -281,6 +281,8 @@ void setCancelledRequestDiagnosticsTracker( Integer getMaxItemCount(CosmosQueryRequestOptions options); String getRequestContinuation(CosmosQueryRequestOptions options); + + Integer getMaxItemCountForVectorSearch(CosmosQueryRequestOptions options); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextFactory.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextFactory.java index a13cfc65348d..8e5dd07bf5a9 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextFactory.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextFactory.java @@ -4,9 +4,11 @@ import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.implementation.BadRequestException; +import com.azure.cosmos.implementation.Configs; import com.azure.cosmos.implementation.Constants; import com.azure.cosmos.implementation.DiagnosticsClientContext; import com.azure.cosmos.implementation.DocumentCollection; +import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.OperationType; import com.azure.cosmos.implementation.PartitionKeyRange; @@ -48,6 +50,12 @@ */ public class DocumentQueryExecutionContextFactory { + private static final ImplementationBridgeHelpers + .CosmosQueryRequestOptionsHelper + .CosmosQueryRequestOptionsAccessor qryOptAccessor = ImplementationBridgeHelpers + .CosmosQueryRequestOptionsHelper + .getCosmosQueryRequestOptionsAccessor(); + private final static int PageSizeFactorForTop = 5; private static final Logger logger = LoggerFactory.getLogger(DocumentQueryExecutionContextFactory.class); private static Mono> resolveCollection(DiagnosticsClientContext diagnosticsClientContext, @@ -239,7 +247,8 @@ private static boolean canCacheQuery(QueryInfo queryInfo) { && !queryInfo.hasTop() && !queryInfo.hasOffset() && !queryInfo.hasDCount() - && !queryInfo.hasOrderBy(); + && !queryInfo.hasOrderBy() + && !queryInfo.hasNonStreamingOrderBy(); } private static boolean isScopedToSinglePartition(CosmosQueryRequestOptions cosmosQueryRequestOptions) { @@ -358,6 +367,38 @@ public static Flux> createSpecia boolean getLazyFeedResponse = queryInfo.hasTop(); + // We need to compute the optimal initial age size for non-streaming order-by queries + if (queryInfo.hasNonStreamingOrderBy()) { + // Validate the TOP or LIMIT for non-streaming order-by queries + if (!queryInfo.hasTop() && !queryInfo.hasLimit() && queryInfo.getTop() < 0 && queryInfo.getLimit() < 0) { + throw new NonStreamingOrderByBadRequestException(HttpConstants.StatusCodes.BADREQUEST, + "Executing a vector search query without TOP or LIMIT can consume a large number of RUs" + + "very fast and have long runtimes. Please ensure you are using one of the above two filters" + + "with you vector search query."); + } + // Validate the size of TOP or LIMIT against MaxItemSizeForVectorSearch + int maxLimit = Math.max(queryInfo.hasTop() ? queryInfo.getTop() : 0, + queryInfo.hasLimit() ? queryInfo.getLimit() : 0); + int maxItemSizeForVectorSearch = Math.max(Configs.getMaxItemCountForVectorSearch(), + qryOptAccessor.getMaxItemCountForVectorSearch(cosmosQueryRequestOptions)); + if (maxLimit > maxItemSizeForVectorSearch) { + throw new NonStreamingOrderByBadRequestException(HttpConstants.StatusCodes.BADREQUEST, + "Executing a vector search query with TOP or LIMIT larger than the maxItemSizeForVectorSearch " + + "is not allowed"); + } + // Set initialPageSize based on the smallest of TOP or LIMIT + if (queryInfo.hasTop() || queryInfo.hasLimit()) { + int pageSizeWithTopOrLimit = Math.min(queryInfo.hasTop() ? queryInfo.getTop() : Integer.MAX_VALUE, + queryInfo.hasLimit() && queryInfo.hasOffset() ? + queryInfo.getLimit() + queryInfo.getOffset() : Integer.MAX_VALUE); + if (initialPageSize > 0) { + initialPageSize = Math.min(pageSizeWithTopOrLimit, initialPageSize); + } else { + initialPageSize = pageSizeWithTopOrLimit; + } + } + } + // We need to compute the optimal initial page size for order-by queries if (queryInfo.hasOrderBy()) { int top; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/NonStreamingOrderByBadRequestException.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/NonStreamingOrderByBadRequestException.java new file mode 100644 index 000000000000..4b3b41fbc675 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/NonStreamingOrderByBadRequestException.java @@ -0,0 +1,20 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.implementation.query; + +import com.azure.cosmos.CosmosException; + +public class NonStreamingOrderByBadRequestException extends CosmosException { + + private static final long serialVersionUID = 1L; + + /** + * Creates a new instance of the NonStreamingOrderByBadRequestException class. + * + * @param statusCode the http status code of the response. + * @param errorMessage the error message. + */ + public NonStreamingOrderByBadRequestException(int statusCode, String errorMessage) { + super(statusCode, errorMessage); + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/NonStreamingOrderByDocumentProducer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/NonStreamingOrderByDocumentProducer.java new file mode 100644 index 000000000000..c64ef5d1fbda --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/NonStreamingOrderByDocumentProducer.java @@ -0,0 +1,43 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.query; + +import com.azure.cosmos.implementation.Document; +import com.azure.cosmos.implementation.DocumentClientRetryPolicy; +import com.azure.cosmos.implementation.RxDocumentServiceRequest; +import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl; +import com.azure.cosmos.implementation.query.orderbyquery.OrderbyRowComparer; +import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.FeedResponse; +import reactor.core.publisher.Mono; + +import java.util.UUID; +import java.util.function.Function; +import java.util.function.Supplier; + +public class NonStreamingOrderByDocumentProducer extends DocumentProducer { + private final OrderbyRowComparer consumeComparer; + + NonStreamingOrderByDocumentProducer( + OrderbyRowComparer consumeComparer, + IDocumentQueryClient client, + String collectionResourceId, + CosmosQueryRequestOptions cosmosQueryRequestOptions, + TriFunction createRequestFunc, + Function>> executeRequestFunc, + FeedRangeEpkImpl feedRange, + String collectionLink, + Supplier createRetryPolicyFunc, + Class resourceType, + UUID correlatedActivityId, + int initialPageSize, + String initialContinuationToken, + int top, + Supplier operationContextTextProvider) { + super(client, collectionResourceId, cosmosQueryRequestOptions, createRequestFunc, executeRequestFunc, + collectionLink, createRetryPolicyFunc, resourceType, correlatedActivityId, initialPageSize, + initialContinuationToken, top, feedRange, operationContextTextProvider); + this.consumeComparer = consumeComparer; + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/NonStreamingOrderByDocumentQueryExecutionContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/NonStreamingOrderByDocumentQueryExecutionContext.java new file mode 100644 index 000000000000..2f3abb36e9d1 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/NonStreamingOrderByDocumentQueryExecutionContext.java @@ -0,0 +1,268 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.query; + +import com.azure.cosmos.BridgeInternal; +import com.azure.cosmos.CosmosException; +import com.azure.cosmos.implementation.ClientSideRequestStatistics; +import com.azure.cosmos.implementation.DiagnosticsClientContext; +import com.azure.cosmos.implementation.Document; +import com.azure.cosmos.implementation.DocumentClientRetryPolicy; +import com.azure.cosmos.implementation.DocumentCollection; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.ImplementationBridgeHelpers; +import com.azure.cosmos.implementation.QueryMetrics; +import com.azure.cosmos.implementation.RequestChargeTracker; +import com.azure.cosmos.implementation.ResourceType; +import com.azure.cosmos.implementation.RxDocumentServiceRequest; +import com.azure.cosmos.implementation.Utils; +import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl; +import com.azure.cosmos.implementation.query.orderbyquery.OrderByRowResult; +import com.azure.cosmos.implementation.query.orderbyquery.OrderbyRowComparer; +import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.FeedResponse; +import com.azure.cosmos.models.ModelBridgeInternal; +import com.azure.cosmos.models.SqlQuerySpec; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class NonStreamingOrderByDocumentQueryExecutionContext + extends ParallelDocumentQueryExecutionContextBase { + + private final static + ImplementationBridgeHelpers.CosmosDiagnosticsHelper.CosmosDiagnosticsAccessor diagnosticsAccessor = + ImplementationBridgeHelpers.CosmosDiagnosticsHelper.getCosmosDiagnosticsAccessor(); + + private static final ImplementationBridgeHelpers.FeedResponseHelper.FeedResponseAccessor feedResponseAccessor = + ImplementationBridgeHelpers.FeedResponseHelper.getFeedResponseAccessor(); + + private final static String FormatPlaceHolder = "{documentdb-formattableorderbyquery-filter}"; + private final static String True = "true"; + + private final OrderbyRowComparer consumeComparer; + private final RequestChargeTracker tracker; + private final ConcurrentMap queryMetricMap; + private final Collection clientSideRequestStatistics; + private Flux> orderByObservable; + + private int maxPageSizePerPartition; + + public NonStreamingOrderByDocumentQueryExecutionContext( + DiagnosticsClientContext diagnosticsClientContext, + IDocumentQueryClient client, + ResourceType resourceTypeEnum, + SqlQuerySpec query, + CosmosQueryRequestOptions cosmosQueryRequestOptions, + String resourceLink, + String rewrittenQuery, + OrderbyRowComparer consumeComparer, + UUID correlatedActivityId, + boolean hasSelectValue, + final AtomicBoolean isQueryCancelledOnTimeout) { + super(diagnosticsClientContext, client, resourceTypeEnum, Document.class, query, cosmosQueryRequestOptions, + resourceLink, rewrittenQuery, correlatedActivityId, hasSelectValue, isQueryCancelledOnTimeout); + this.consumeComparer = consumeComparer; + this.tracker = new RequestChargeTracker(); + this.queryMetricMap = new ConcurrentHashMap<>(); + this.clientSideRequestStatistics = ConcurrentHashMap.newKeySet(); + } + + public static Flux> createAsync( + DiagnosticsClientContext diagnosticsClientContext, + IDocumentQueryClient client, + PipelinedDocumentQueryParams initParams, + DocumentCollection collection) { + + QueryInfo queryInfo = initParams.getQueryInfo(); + + NonStreamingOrderByDocumentQueryExecutionContext context = new NonStreamingOrderByDocumentQueryExecutionContext( + diagnosticsClientContext, + client, + initParams.getResourceTypeEnum(), + initParams.getQuery(), + initParams.getCosmosQueryRequestOptions(), + initParams.getResourceLink(), + initParams.getQueryInfo().getRewrittenQuery(), + new OrderbyRowComparer<>(queryInfo.getOrderBy()), + initParams.getCorrelatedActivityId(), + queryInfo.hasSelectValue(), + initParams.isQueryCancelledOnTimeout()); + + context.setTop(initParams.getTop()); + + try { + context.initialize( + initParams.getFeedRanges(), + initParams.getQueryInfo().getOrderBy(), + initParams.getQueryInfo().getOrderByExpressions(), + initParams.getInitialPageSize(), + collection); + + return Flux.just(context); + } catch (CosmosException dce) { + return Flux.error(dce); + } + } + + private void initialize( + List feedRanges, List sortOrders, + Collection orderByExpressions, + int initialPageSize, + DocumentCollection collection) throws CosmosException { + // Since the continuation token will always be null, + // we don't need to handle any initialization based on continuationToken. + // We can directly initialize without any consideration for continuationToken. + Map partitionKeyRangeToContinuationToken = new HashMap<>(); + for (FeedRangeEpkImpl feedRangeEpk : feedRanges) { + partitionKeyRangeToContinuationToken.put(feedRangeEpk, + null); + } + super.initialize(collection, + partitionKeyRangeToContinuationToken, + initialPageSize, + new SqlQuerySpec(querySpec.getQueryText().replace(FormatPlaceHolder, True), + querySpec.getParameters())); + + orderByObservable = NonStreamingOrderByUtils.nonStreamingOrderedMerge( + consumeComparer, + tracker, + documentProducers, + initialPageSize, + queryMetricMap, + clientSideRequestStatistics); + } + + @Override + protected NonStreamingOrderByDocumentProducer createDocumentProducer( + String collectionRid, + String continuationToken, + int initialPageSize, + CosmosQueryRequestOptions cosmosQueryRequestOptions, + SqlQuerySpec querySpecForInit, + Map commonRequestHeaders, + TriFunction createRequestFunc, + Function>> executeFunc, + Supplier createRetryPolicyFunc, + FeedRangeEpkImpl feedRange) { + return new NonStreamingOrderByDocumentProducer( + consumeComparer, + client, + collectionRid, + cosmosQueryRequestOptions, + createRequestFunc, + executeFunc, + feedRange, + collectionRid, + createRetryPolicyFunc, + Document.class, + correlatedActivityId, + maxPageSizePerPartition, + continuationToken, + top, + this.getOperationContextTextProvider()); + } + + @Override + public Flux> drainAsync(int maxPageSize) { + return this.orderByObservable.transformDeferred(new ItemToPageTransformer(tracker, + maxPageSize, + this.queryMetricMap, + this.clientSideRequestStatistics)); + } + + @Override + public Flux> executeAsync() { + return drainAsync(ModelBridgeInternal.getMaxItemCountFromQueryRequestOptions(cosmosQueryRequestOptions)); + } + + private static class ItemToPageTransformer implements + Function>, Flux>> { + private final static int DEFAULT_PAGE_SIZE = 100; + private final RequestChargeTracker tracker; + private final int maxPageSize; + private final ConcurrentMap queryMetricMap; + private final Collection clientSideRequestStatistics; + + public ItemToPageTransformer(RequestChargeTracker tracker, + int maxPageSize, + ConcurrentMap queryMetricsMap, + Collection clientSideRequestStatistics) { + this.tracker = tracker; + this.maxPageSize = maxPageSize > 0 ? maxPageSize : DEFAULT_PAGE_SIZE; + this.queryMetricMap = queryMetricsMap; + this.clientSideRequestStatistics = clientSideRequestStatistics; + } + + private static Map headerResponse(double requestCharge) { + return Utils.immutableMapOf(HttpConstants.HttpHeaders.REQUEST_CHARGE, String.valueOf(requestCharge)); + } + + @Override + public Flux> apply(Flux> source) { + return source + .window(maxPageSize).map(Flux::collectList) + .flatMap(resultListObs -> resultListObs, 1) + .map(orderByRowResults -> { + // construct a page from result with request charge + FeedResponse> feedResponse = feedResponseAccessor.createFeedResponse( + orderByRowResults, + headerResponse(tracker.getAndResetCharge()), + null); + if (!queryMetricMap.isEmpty()) { + for (Map.Entry entry : queryMetricMap.entrySet()) { + BridgeInternal.putQueryMetricsIntoMap(feedResponse, + entry.getKey(), + entry.getValue()); + } + } + return feedResponse; + }) + .concatWith(Flux.defer(() -> { + return Flux.just(feedResponseAccessor.createFeedResponse(Utils.immutableListOf(), + null, null)); + })) + .map(feedOfOrderByRowResults -> { + List unwrappedResults = new ArrayList<>(); + for (OrderByRowResult orderByRowResult : feedOfOrderByRowResults.getResults()) { + unwrappedResults.add(orderByRowResult.getPayload()); + } + FeedResponse feedResponse = BridgeInternal.createFeedResponseWithQueryMetrics(unwrappedResults, + feedOfOrderByRowResults.getResponseHeaders(), + BridgeInternal.queryMetricsFromFeedResponse(feedOfOrderByRowResults), + ModelBridgeInternal.getQueryPlanDiagnosticsContext(feedOfOrderByRowResults), + false, + false, feedOfOrderByRowResults.getCosmosDiagnostics()); + diagnosticsAccessor.addClientSideDiagnosticsToFeed( + feedResponse.getCosmosDiagnostics(), clientSideRequestStatistics); + return feedResponse; + }).switchIfEmpty(Flux.defer(() -> { + // create an empty page if there is no result + FeedResponse frp = BridgeInternal.createFeedResponseWithQueryMetrics(Utils.immutableListOf(), + headerResponse( + tracker.getAndResetCharge()), + queryMetricMap, + null, + false, + false, + null); + diagnosticsAccessor.addClientSideDiagnosticsToFeed( + frp.getCosmosDiagnostics(), clientSideRequestStatistics); + return Flux.just(frp); + })); + } + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/NonStreamingOrderByUtils.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/NonStreamingOrderByUtils.java new file mode 100644 index 000000000000..b4ac33a1dbc2 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/NonStreamingOrderByUtils.java @@ -0,0 +1,114 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.implementation.query; + +import com.azure.cosmos.BridgeInternal; +import com.azure.cosmos.implementation.ClientSideRequestStatistics; +import com.azure.cosmos.implementation.Configs; +import com.azure.cosmos.implementation.Document; +import com.azure.cosmos.implementation.ImplementationBridgeHelpers; +import com.azure.cosmos.implementation.QueryMetrics; +import com.azure.cosmos.implementation.RequestChargeTracker; +import com.azure.cosmos.implementation.Resource; +import com.azure.cosmos.implementation.query.orderbyquery.OrderByRowResult; +import com.azure.cosmos.implementation.query.orderbyquery.OrderbyRowComparer; +import reactor.core.publisher.Flux; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.function.Function; + +public class NonStreamingOrderByUtils { + private final static + ImplementationBridgeHelpers.CosmosDiagnosticsHelper.CosmosDiagnosticsAccessor diagnosticsAccessor = + ImplementationBridgeHelpers.CosmosDiagnosticsHelper.getCosmosDiagnosticsAccessor(); + + public static Flux> nonStreamingOrderedMerge(OrderbyRowComparer consumeComparer, + RequestChargeTracker tracker, + List> documentProducers, + int initialPageSize, + Map queryMetricsMap, + Collection clientSideRequestStatistics) { + @SuppressWarnings("unchecked") + Flux>[] fluxes = documentProducers + .subList(0, documentProducers.size()) + .stream() + .map(producer -> + toNonStreamingOrderByQueryResultObservable(producer, tracker, queryMetricsMap, initialPageSize, + consumeComparer, clientSideRequestStatistics)) + .toArray(Flux[]::new); + return Flux.mergeComparingDelayError(1,consumeComparer, fluxes); + } + + private static Flux> toNonStreamingOrderByQueryResultObservable(DocumentProducer producer, + RequestChargeTracker tracker, + Map queryMetricsMap, + int initialPageSize, + OrderbyRowComparer consumeComparer, + Collection clientSideRequestStatisticsList) { + return producer + .produceAsync() + .transformDeferred(new NonStreamingOrderByUtils.PageToItemTransformer(tracker, queryMetricsMap, initialPageSize, + consumeComparer, clientSideRequestStatisticsList)); + } + + private static class PageToItemTransformer implements + Function.DocumentProducerFeedResponse>, Flux>> { + private final RequestChargeTracker tracker; + private final Map queryMetricsMap; + private final Integer initialPageSize; + private final OrderbyRowComparer consumeComparer; + private final Collection clientSideRequestStatistics; + + private PageToItemTransformer(RequestChargeTracker tracker, Map queryMetricsMap, + Integer initialPageSize, OrderbyRowComparer consumeComparer, + Collection clientSideRequestStatistics) { + this.tracker = tracker; + this.queryMetricsMap = queryMetricsMap; + this.initialPageSize = initialPageSize; + this.consumeComparer = consumeComparer; + this.clientSideRequestStatistics = clientSideRequestStatistics; + } + + @Override + public Flux> apply(Flux.DocumentProducerFeedResponse> source) { + // the size of the priority queue is set to size+1, because when the pq reaches the max size we add that + // item and then remove the element. If we don't do this, then when adding this element the size of the pq + // will be increased automatically by 50% and then there would be inconsistent results for later pages. + PriorityBlockingQueue> priorityQueue = new PriorityBlockingQueue<>(initialPageSize + 1, consumeComparer); + + return source.flatMap(documentProducerFeedResponse -> { + clientSideRequestStatistics.addAll( + diagnosticsAccessor.getClientSideRequestStatisticsForQueryPipelineAggregations(documentProducerFeedResponse + .pageResult.getCosmosDiagnostics())); + + QueryMetrics.mergeQueryMetricsMap(queryMetricsMap, + BridgeInternal.queryMetricsFromFeedResponse(documentProducerFeedResponse.pageResult)); + List results = documentProducerFeedResponse.pageResult.getResults(); + results.forEach(r -> { + OrderByRowResult orderByRowResult = new OrderByRowResult( + r.toJson(), + documentProducerFeedResponse.sourceFeedRange, + null); + if (Configs.getMaxItemSizeForVectorSearchEnabled()) { + if (priorityQueue.size() < initialPageSize) { + priorityQueue.add(orderByRowResult); + } else { + priorityQueue.add(orderByRowResult); + priorityQueue.poll(); + } + } else { + priorityQueue.add(orderByRowResult); + } + + }); + tracker.addCharge(documentProducerFeedResponse.pageResult.getRequestCharge()); + // Returning an empty Flux since we are only processing and managing state here + return Flux.empty(); + }, 1) + .thenMany(Flux.defer(() -> Flux.fromIterable(priorityQueue))); + } + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContext.java index 020b8633d2be..6ed30d3a7fbe 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContext.java @@ -106,7 +106,8 @@ public static Flux> createAsync( || queryInfo.hasAggregates() || queryInfo.hasGroupBy() || queryInfo.hasDCount() - || queryInfo.hasDistinct()), + || queryInfo.hasDistinct() + || queryInfo.hasNonStreamingOrderBy()), initParams.isQueryCancelledOnTimeout()); context.setTop(initParams.getTop()); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PipelinedDocumentQueryExecutionContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PipelinedDocumentQueryExecutionContext.java index 7b6104271762..82746ad412b5 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PipelinedDocumentQueryExecutionContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PipelinedDocumentQueryExecutionContext.java @@ -5,6 +5,7 @@ import com.azure.cosmos.CosmosItemSerializer; import com.azure.cosmos.implementation.DiagnosticsClientContext; import com.azure.cosmos.implementation.DocumentCollection; +import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.Document; import com.azure.cosmos.implementation.ObjectNodeMap; @@ -15,6 +16,8 @@ import java.util.function.BiFunction; +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; + /** * While this class is public, but it is not part of our published public APIs. * This is meant to be internally used only by our sdk. @@ -53,12 +56,21 @@ private static BiFunction, Flux { CosmosQueryRequestOptions orderByCosmosQueryRequestOptions = qryOptAccessor.clone(requestOptions); - ModelBridgeInternal.setQueryRequestOptionsContinuationToken(orderByCosmosQueryRequestOptions, continuationToken); - qryOptAccessor.getImpl(orderByCosmosQueryRequestOptions).setCustomItemSerializer(null); - - documentQueryParams.setCosmosQueryRequestOptions(orderByCosmosQueryRequestOptions); - - return OrderByDocumentQueryExecutionContext.createAsync(diagnosticsClientContext, client, documentQueryParams, collection); + if (queryInfo.hasNonStreamingOrderBy()) { + if (continuationToken != null) { + throw new NonStreamingOrderByBadRequestException( + HttpConstants.StatusCodes.BADREQUEST, + "Can not use a continuation token for a vector search query"); + } + qryOptAccessor.getImpl(orderByCosmosQueryRequestOptions).setCustomItemSerializer(null); + documentQueryParams.setCosmosQueryRequestOptions(orderByCosmosQueryRequestOptions); + return NonStreamingOrderByDocumentQueryExecutionContext.createAsync(diagnosticsClientContext, client, documentQueryParams, collection); + } else { + ModelBridgeInternal.setQueryRequestOptionsContinuationToken(orderByCosmosQueryRequestOptions, continuationToken); + qryOptAccessor.getImpl(orderByCosmosQueryRequestOptions).setCustomItemSerializer(null); + documentQueryParams.setCosmosQueryRequestOptions(orderByCosmosQueryRequestOptions); + return OrderByDocumentQueryExecutionContext.createAsync(diagnosticsClientContext, client, documentQueryParams, collection); + } }; } else { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PipelinedQueryExecutionContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PipelinedQueryExecutionContext.java index c7f67711fabd..6c3fa2827216 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PipelinedQueryExecutionContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PipelinedQueryExecutionContext.java @@ -137,7 +137,7 @@ public Flux> executeAsync() { } private static QueryInfo validateQueryInfo(QueryInfo queryInfo) { - if (queryInfo.hasOrderBy() || queryInfo.hasAggregates() || queryInfo.hasGroupBy()) { + if (queryInfo.hasOrderBy() || queryInfo.hasAggregates() || queryInfo.hasGroupBy() || queryInfo.hasNonStreamingOrderBy()) { // Any query with order by, aggregates or group by needs to go through the Document query pipeline throw new IllegalStateException("This query must not use the simple query pipeline."); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PipelinedQueryExecutionContextBase.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PipelinedQueryExecutionContextBase.java index 232f942b8590..39c6c27efad8 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PipelinedQueryExecutionContextBase.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PipelinedQueryExecutionContextBase.java @@ -63,7 +63,8 @@ public static Flux> createAsync( || queryInfo.hasAggregates() || queryInfo.hasGroupBy() || queryInfo.hasDCount() - || queryInfo.hasDistinct()) { + || queryInfo.hasDistinct() + || queryInfo.hasNonStreamingOrderBy()) { return PipelinedDocumentQueryExecutionContext.createAsyncCore( diagnosticsClientContext, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryFeature.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryFeature.java index ce2b4865767c..1dd3e5928b92 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryFeature.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryFeature.java @@ -14,5 +14,6 @@ public enum QueryFeature { OrderBy, Top, NonValueAggregate, - DCount + DCount, + NonStreamingOrderBy } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryInfo.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryInfo.java index 65efb92c3325..48870a5ae52d 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryInfo.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryInfo.java @@ -36,6 +36,7 @@ public final class QueryInfo extends JsonSerializable { private DistinctQueryType distinctQueryType; private QueryPlanDiagnosticsContext queryPlanDiagnosticsContext; private DCountInfo dCountInfo; + private boolean nonStreamingOrderBy; public QueryInfo() { } @@ -160,6 +161,11 @@ public boolean hasGroupBy() { return groupByExpressions != null && !groupByExpressions.isEmpty(); } + public boolean hasNonStreamingOrderBy() { + this.nonStreamingOrderBy = Boolean.TRUE.equals(super.getBoolean("hasNonStreamingOrderBy")); + return this.nonStreamingOrderBy; + } + public Map getGroupByAliasToAggregateType(){ Map groupByAliasToAggregateMap; groupByAliasToAggregateMap = super.getMap("groupByAliasToAggregateType"); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryPlanRetriever.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryPlanRetriever.java index ce91ff2029e9..fb79a316c445 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryPlanRetriever.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryPlanRetriever.java @@ -5,6 +5,7 @@ import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfig; +import com.azure.cosmos.implementation.Configs; import com.azure.cosmos.implementation.DiagnosticsClientContext; import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.routing.PartitionKeyInternal; @@ -43,7 +44,20 @@ class QueryPlanRetriever { QueryFeature.GroupBy.name() + ", " + QueryFeature.Top.name() + ", " + QueryFeature.DCount.name() + ", " + - QueryFeature.NonValueAggregate.name(); + QueryFeature.NonValueAggregate.name() + ", " + + QueryFeature.NonStreamingOrderBy.name(); + + private static final String OLD_SUPPORTED_QUERY_FEATURES = QueryFeature.Aggregate.name() + ", " + + QueryFeature.CompositeAggregate.name() + ", " + + QueryFeature.MultipleOrderBy.name() + ", " + + QueryFeature.MultipleAggregates.name() + ", " + + QueryFeature.OrderBy.name() + ", " + + QueryFeature.OffsetAndLimit.name() + ", " + + QueryFeature.Distinct.name() + ", " + + QueryFeature.GroupBy.name() + ", " + + QueryFeature.Top.name() + ", " + + QueryFeature.DCount.name() + ", " + + QueryFeature.NonValueAggregate.name(); static Mono getQueryPlanThroughGatewayAsync(DiagnosticsClientContext diagnosticsClientContext, IDocumentQueryClient queryClient, @@ -61,7 +75,8 @@ static Mono getQueryPlanThroughGatewayAsync(Diagn final Map requestHeaders = new HashMap<>(); requestHeaders.put(HttpConstants.HttpHeaders.CONTENT_TYPE, RuntimeConstants.MediaTypes.JSON); requestHeaders.put(HttpConstants.HttpHeaders.IS_QUERY_PLAN_REQUEST, TRUE); - requestHeaders.put(HttpConstants.HttpHeaders.SUPPORTED_QUERY_FEATURES, SUPPORTED_QUERY_FEATURES); + requestHeaders.put(HttpConstants.HttpHeaders.SUPPORTED_QUERY_FEATURES, + Configs.getAzureCosmosNonStreamingOrderByDisabled() ? OLD_SUPPORTED_QUERY_FEATURES : SUPPORTED_QUERY_FEATURES); requestHeaders.put(HttpConstants.HttpHeaders.QUERY_VERSION, HttpConstants.Versions.QUERY_VERSION); if (partitionKey != null && partitionKey != PartitionKey.NONE) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosQueryRequestOptions.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosQueryRequestOptions.java index c9bf3909fd5d..856dee68b563 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosQueryRequestOptions.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosQueryRequestOptions.java @@ -256,6 +256,26 @@ CosmosQueryRequestOptions setMaxItemCount(Integer maxItemCount) { return this; } + /** + * Gets the maximum item size to fetch during non-streaming order by queries. + * + * @return the max number of items for vector search. + */ + Integer getMaxItemCountForVectorSearch() { + return this.actualRequestOptions.getMaxItemCountForVectorSearch(); + } + + /** + * Sets the maximum item size to fetch during non-streaming order by queries. + * + * @param maxItemCountForVectorSearch the max number of items for vector search. + * @return the CosmosQueryRequestOptions. + */ + CosmosQueryRequestOptions setMaxItemCountForVectorSearch(Integer maxItemCountForVectorSearch) { + this.actualRequestOptions.setMaxItemCountForVectorSearch(maxItemCountForVectorSearch); + return this; + } + /** * Gets the request continuation token. * @@ -603,6 +623,11 @@ public Integer getMaxItemCount(CosmosQueryRequestOptions options) { public String getRequestContinuation(CosmosQueryRequestOptions options) { return options.getRequestContinuation(); } + + @Override + public Integer getMaxItemCountForVectorSearch(CosmosQueryRequestOptions options) { + return options.getMaxItemCountForVectorSearch(); + } }); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/IndexingPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/IndexingPolicy.java index 4ee5153877f8..e8f30d2f508c 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/IndexingPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/IndexingPolicy.java @@ -20,7 +20,6 @@ */ public final class IndexingPolicy { private static final String DEFAULT_PATH = "/*"; - private List includedPaths; private List excludedPaths; private List> compositeIndexes; diff --git a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md index 6d699b6e17bd..34f6139030e7 100644 --- a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md @@ -7,6 +7,8 @@ ### Breaking Changes ### Bugs Fixed +- Fixes the session message disposition to use management node as fall back. ([#39913](https://github.com/Azure/azure-sdk-for-java/issues/ 39913)) +- Fixes the session processor idle timeout to fall back to RetryOptions::tryTimeout. ([#39993](https://github.com/Azure/azure-sdk-for-java/issues/39993)) ### Other Changes