Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
540a16d
Initial changes
aayush3011 Apr 24, 2024
6f49c75
Initial changes
aayush3011 Apr 25, 2024
97509eb
Merge branch 'main' into users/akataria/nonStreamingOrderBy
aayush3011 Apr 25, 2024
938279a
Increment versions for core releases (#40003)
azure-sdk May 2, 2024
c6c520e
Ensure ServiceBus session idle timeout fall back to retry-options::tr…
anuchandy May 2, 2024
222f2d9
Added Alpha3 Java Media Streaming Events (#40002)
v-durgeshs May 2, 2024
387170a
Update version of github-event-processor to 1.0.0-dev.20240502.2 (#40…
azure-sdk May 2, 2024
b1e8fdc
Prepare May 2024 Identity Release (#40006)
billwert May 2, 2024
c3c4648
Prepare Identity Broker May 2024 Release (#40014)
billwert May 2, 2024
a5127c6
Increment package versions for identity releases (#40015)
azure-sdk May 2, 2024
ba334df
[JobRouter] SDK Review updates (#40011)
williamzhao87 May 2, 2024
5682de3
FixFaultInjectionRuleFailedToApplyPerPartitionInGatewayMode (#40005)
xinlian12 May 3, 2024
39cb79f
azure-cosmos-test_1.0.0.beta.7Release (#40021)
xinlian12 May 3, 2024
86b36d3
Merge branch 'main' into users/akataria/nonStreamingOrderBy
aayush3011 May 3, 2024
9a38357
Fixed existsById API in ReactiveCosmosTemplate (#40022)
kushagraThapar May 3, 2024
a979c11
Initial changes
aayush3011 May 3, 2024
e2756a5
Initial changes
aayush3011 May 3, 2024
d768057
Skip Recorded test and delete Event record until test proxy to work w…
minwoolee-msft May 3, 2024
71b1905
Fix invalid CODEOWNERS (#40032)
alzimmermsft May 3, 2024
8be2277
Initial changes
aayush3011 May 3, 2024
28a2b5a
ServiceBus: fix session tracing (#39962)
May 3, 2024
deff567
[Automation] Generate SDK based on TypeSpec 0.15.15 (#40048)
azure-sdk May 6, 2024
058f8a8
[CODEOWNERS] Updates for org changes (#40049)
jsquire May 6, 2024
9406783
Move from using the docker image to java2docfx for docs validation (#…
JimSuplizio May 6, 2024
6dee85b
owners (#39686)
harsimar May 6, 2024
61b177c
Use ClientLogger in testing output (#40010)
alzimmermsft May 6, 2024
8349880
Fix null pointer exception and context usage (#40053)
alzimmermsft May 6, 2024
c75b369
Rename AML to AzureMachineLearning (#40056)
alzimmermsft May 6, 2024
af7f403
Fixed the Key Vault `test-resources.json` file to properly configure …
vcolin7 May 6, 2024
c9da24c
Close response body in bearer policy (#40052)
May 6, 2024
7055e66
Running Prepare-Release for azure-messaging-servicebus 7.17.0 (#40058)
anuchandy May 7, 2024
4428897
mgmt, TypeSpec code generation pipeline (#39963)
XiaofeiCao May 7, 2024
a755e6a
Add codeowner linter owners (#39997)
weshaggard May 7, 2024
23b7d1e
Update to ESRP task version that supports federated auth (#40059)
hallipr May 7, 2024
94bd0d4
Increment package versions for cosmos releases (#40031)
azure-sdk May 7, 2024
149bd61
Update azure-sdk-build-tools Repository Resource Refs in Yaml files (…
azure-sdk May 7, 2024
847a8ae
Add reduced embeddings sample to azure-search-documents (#40069)
alzimmermsft May 7, 2024
376955f
Search May Preview Regen Updates (#40057)
jairmyree May 7, 2024
f01f7f4
Preparing Search May 2024 Beta Release (#40071)
jairmyree May 7, 2024
e491b9d
Resolving comments
aayush3011 May 7, 2024
151bb50
Fixing build issues
aayush3011 May 7, 2024
c86b87e
eng, update autorest.java, improve error output in sdk automation (#4…
weidongxu-microsoft May 8, 2024
a3d2d26
Merge to main after spring cloud azure 4.18.0 released (#40075)
Netyyyy May 8, 2024
f6909fb
Miscellaneous Core performance improvements (#39552)
alzimmermsft May 8, 2024
73afd5b
Merge branch 'Azure:main' into users/akataria/nonStreamingOrderBy
aayush3011 May 8, 2024
3271984
Increment package versions for search releases (#40072)
azure-sdk May 9, 2024
0e921d2
Update io.fabric8:kubernetes-client (#40086)
jairmyree May 9, 2024
b2a4bef
Increment package versions for servicebus releases (#40094)
azure-sdk May 9, 2024
6884420
Emit stable auto-instrumented otel metrics (#39960)
heyams May 9, 2024
3b0d751
Merge branch 'Azure:main' into users/akataria/nonStreamingOrderBy
aayush3011 May 9, 2024
87572f7
Merge branch 'feature/vector_search' into users/akataria/nonStreaming…
aayush3011 May 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Resolving comments
  • Loading branch information
aayush3011 committed May 7, 2024
commit e491b9d88acb7d37a2ce48384d83679de282e965
2 changes: 1 addition & 1 deletion sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

#### Features Added

* Added a new non-streaming OrderBy query pipeline and a query feature`NonStreamingOrderBy` to support Vector Search queries. - See [PR 39897](https://github.com/Azure/azure-sdk-for-java/pull/39897/)
* Added support for non-streaming OrderBy query and a query feature `NonStreamingOrderBy` to support Vector Search queries. - See [PR 39897](https://github.com/Azure/azure-sdk-for-java/pull/39897/)

#### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ public class Configs {
public static final String MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED = "COSMOS.MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED";
private static final int DEFAULT_MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED = 1;

private static final String MAX_ITEM_SIZE_FOR_VECTOR_SEARCH = "COSMOS.MAX_ITEM_SIZE_FOR_VECTOR_SEARCH";
private static final int DEFAULT_MAX_ITEM_SIZE_FOR_VECTOR_SEARCH = 50000;

private static final String MAX_ITEM_SIZE_FOR_VECTOR_SEARCH_ENABLED = "COSMOS.MAX_ITEM_SIZE_FOR_VECTOR_SEARCH_ENABLED";

private static final boolean DEFAULT_MAX_ITEM_SIZE_FOR_VECTOR_SEARCH_ENABLED = true;

public static final int MIN_MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED = 1;

public static final String TCP_CONNECTION_ACQUISITION_TIMEOUT_IN_MS = "COSMOS.TCP_CONNECTION_ACQUISITION_TIMEOUT_IN_MS";
Expand Down Expand Up @@ -484,6 +491,14 @@ public static int getMaxRetriesInLocalRegionWhenRemoteRegionPreferred() {
MIN_MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED);
}

public static int getMaxItemSizeForVectorSearch() {
return getJVMConfigAsInt(MAX_ITEM_SIZE_FOR_VECTOR_SEARCH, DEFAULT_MAX_ITEM_SIZE_FOR_VECTOR_SEARCH);
}

public static boolean getMaxItemSizeForVectorSearchEnabled() {
return getJVMConfigAsBoolean(MAX_ITEM_SIZE_FOR_VECTOR_SEARCH_ENABLED, DEFAULT_MAX_ITEM_SIZE_FOR_VECTOR_SEARCH_ENABLED);
}

public static Duration getMinRetryTimeInLocalRegionWhenRemoteRegionPreferred() {
return
Duration.ofMillis(Math.max(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public final class CosmosQueryRequestOptionsImpl extends CosmosQueryRequestOptio
private boolean queryPlanRetrievalDisallowed;
private boolean emptyPageDiagnosticsEnabled;
private String queryName;
private Integer maxItemSizeForVectorSearch;
private List<CosmosDiagnostics> cancelledRequestDiagnosticsTracker = new ArrayList<>();

/**
Expand Down Expand Up @@ -62,6 +63,7 @@ public CosmosQueryRequestOptionsImpl(CosmosQueryRequestOptionsImpl options) {
this.queryName = options.queryName;
this.feedRange = options.feedRange;
this.cancelledRequestDiagnosticsTracker = options.cancelledRequestDiagnosticsTracker;
this.maxItemSizeForVectorSearch = options.maxItemSizeForVectorSearch;
}

/**
Expand Down Expand Up @@ -196,6 +198,26 @@ public CosmosQueryRequestOptionsImpl setMaxItemCount(Integer maxItemCount) {
return this;
}

/**
* Gets the maximum item size to fetch during non-streaming order by queries.
*
* @return the max number of items for vector search.
*/
public Integer getMaxItemSizeForVectorSearch() {
return maxItemSizeForVectorSearch;
}

/**
* Sets the maximum item size to fetch during non-streaming order by queries.
*
* @param maxItemSizeForVectorSearch the max number of items for vector search.
* return the CosmosQueryRequestOptions.
*/
public CosmosQueryRequestOptionsImpl setMaxItemSizeForVectorSearch(Integer maxItemSizeForVectorSearch) {
this.maxItemSizeForVectorSearch = maxItemSizeForVectorSearch;
return this;
}

/**
* Gets the request continuation token.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.implementation.BadRequestException;
import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.Constants;
import com.azure.cosmos.implementation.DiagnosticsClientContext;
import com.azure.cosmos.implementation.DocumentCollection;
Expand Down Expand Up @@ -240,7 +241,8 @@ private static boolean canCacheQuery(QueryInfo queryInfo) {
&& !queryInfo.hasTop()
&& !queryInfo.hasOffset()
&& !queryInfo.hasDCount()
&& !queryInfo.hasOrderBy();
&& !queryInfo.hasOrderBy()
&& !queryInfo.hasNonStreamingOrderBy();
}

private static boolean isScopedToSinglePartition(CosmosQueryRequestOptions cosmosQueryRequestOptions) {
Expand Down Expand Up @@ -360,19 +362,33 @@ public static <T> Flux<? extends IDocumentQueryExecutionContext<T>> createSpecia
boolean getLazyFeedResponse = queryInfo.hasTop();

// We need to compute the optimal initial age size for non-streaming order-by queries
if (queryInfo.hasNonStreamingOrderBy()) {
if (queryInfo.hasNonStreamingOrderBy() && Configs.getMaxItemSizeForVectorSearchEnabled()) {
// Validate the TOP or LIMIT for non-streaming order-by queries
if (!queryInfo.hasTop() || !queryInfo.hasLimit() || queryInfo.getTop() < 0 || queryInfo.getLimit() < 0) {
if (!queryInfo.hasTop() && !queryInfo.hasLimit() && queryInfo.getTop() < 0 && queryInfo.getLimit() < 0) {
throw new NonStreamingOrderByBadRequestException(HttpConstants.StatusCodes.BADREQUEST,
"Executing a vector search query without TOP or LIMIT can consume a large number of RUs" +
"very fast and have long runtimes. Please ensure you are using one of the above two filters" +
"with you vector search query.");
}

// Validate the size of TOP or LIMIT against MaxItemSizeForVectorSearch
int maxLimit = Math.max(queryInfo.hasTop() ? queryInfo.getTop() : 0,
queryInfo.hasLimit() ? queryInfo.getLimit() : 0);
int maxItemSizeForVectorSearch = Math.max(Configs.getMaxItemSizeForVectorSearch(),
ModelBridgeInternal.getMaxItemSizeForVectorSearchFromQueryRequestOptions(cosmosQueryRequestOptions));
if (maxLimit > maxItemSizeForVectorSearch) {
throw new NonStreamingOrderByBadRequestException(HttpConstants.StatusCodes.BADREQUEST,
"Executing a vector search query with TOP or LIMIT larger than the maxItemSizeForVectorSearch " +
"is not allowed");
}
// Set initialPageSize based on the smallest of TOP or LIMIT
if (queryInfo.hasTop() || queryInfo.hasLimit() ) {
initialPageSize = Math.min(queryInfo.hasTop() ? queryInfo.getTop() : Integer.MAX_VALUE,
if (queryInfo.hasTop() || queryInfo.hasLimit()) {
int pageSizeWithTopOrLimit = Math.min(queryInfo.hasTop() ? queryInfo.getTop() : Integer.MAX_VALUE,
queryInfo.hasLimit() ? queryInfo.getLimit() : Integer.MAX_VALUE);
if (initialPageSize > 0) {
initialPageSize = Math.min(pageSizeWithTopOrLimit, initialPageSize);
} else {
initialPageSize = pageSizeWithTopOrLimit;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,23 +71,20 @@ public NonStreamingOrderByDocumentQueryExecutionContext(
OrderbyRowComparer<Document> consumeComparer,
UUID correlatedActivityId,
boolean hasSelectValue,
final AtomicBoolean isQueryCancelledOnTimeout,
int maxPageSizePerPartition) {
final AtomicBoolean isQueryCancelledOnTimeout) {
super(diagnosticsClientContext, client, resourceTypeEnum, Document.class, query, cosmosQueryRequestOptions,
resourceLink, rewrittenQuery, correlatedActivityId, hasSelectValue, isQueryCancelledOnTimeout);
this.consumeComparer = consumeComparer;
this.tracker = new RequestChargeTracker();
this.queryMetricMap = new ConcurrentHashMap<>();
this.clientSideRequestStatistics = ConcurrentHashMap.newKeySet();
this.maxPageSizePerPartition = maxPageSizePerPartition;
}

public static Flux<IDocumentQueryExecutionComponent<Document>> createAsync(
DiagnosticsClientContext diagnosticsClientContext,
IDocumentQueryClient client,
PipelinedDocumentQueryParams<Document> initParams,
DocumentCollection collection,
int maxPageSizePerPartition) {
DocumentCollection collection) {

QueryInfo queryInfo = initParams.getQueryInfo();

Expand All @@ -102,8 +99,7 @@ public static Flux<IDocumentQueryExecutionComponent<Document>> createAsync(
new OrderbyRowComparer<>(queryInfo.getOrderBy()),
initParams.getCorrelatedActivityId(),
queryInfo.hasSelectValue(),
initParams.isQueryCancelledOnTimeout(),
maxPageSizePerPartition);
initParams.isQueryCancelledOnTimeout());

context.setTop(initParams.getTop());

Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.query;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.implementation.ClientSideRequestStatistics;
import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.Document;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.QueryMetrics;
import com.azure.cosmos.implementation.RequestChargeTracker;
import com.azure.cosmos.implementation.Resource;
import com.azure.cosmos.implementation.query.orderbyquery.OrderByRowResult;
import com.azure.cosmos.implementation.query.orderbyquery.OrderbyRowComparer;
import com.azure.cosmos.models.ModelBridgeInternal;
import reactor.core.publisher.Flux;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.function.Function;

public class NonStreamingOrderByUtils {
Expand All @@ -38,7 +39,7 @@ public static <T extends Resource> Flux<OrderByRowResult<Document>> nonStreaming
toNonStreamingOrderByQueryResultObservable(producer, tracker, queryMetricsMap, initialPageSize,
consumeComparer, clientSideRequestStatistics))
.toArray(Flux[]::new);
return Flux.mergeOrdered(consumeComparer, fluxes);
return Flux.mergeComparingDelayError(1,consumeComparer, fluxes);
}

private static Flux<OrderByRowResult<Document>> toNonStreamingOrderByQueryResultObservable(DocumentProducer<Document> producer,
Expand Down Expand Up @@ -73,7 +74,7 @@ private PageToItemTransformer(RequestChargeTracker tracker, Map<String, QueryMet

@Override
public Flux<OrderByRowResult<Document>> apply(Flux<DocumentProducer<Document>.DocumentProducerFeedResponse> source) {
PriorityQueue<OrderByRowResult<Document>> priorityQueue = new PriorityQueue<>(consumeComparer);
PriorityBlockingQueue<OrderByRowResult<Document>> priorityQueue = new PriorityBlockingQueue<>(initialPageSize, consumeComparer);

return source.flatMap(documentProducerFeedResponse -> {
clientSideRequestStatistics.addAll(
Expand All @@ -88,12 +89,17 @@ public Flux<OrderByRowResult<Document>> apply(Flux<DocumentProducer<Document>.Do
r.toJson(),
documentProducerFeedResponse.sourceFeedRange,
null);
if (priorityQueue.size() < initialPageSize) {
priorityQueue.add(orderByRowResult);
if (Configs.getMaxItemSizeForVectorSearchEnabled()) {
if (priorityQueue.size() < initialPageSize) {
priorityQueue.add(orderByRowResult);
} else {
priorityQueue.add(orderByRowResult);
priorityQueue.poll();
}
} else {
priorityQueue.add(orderByRowResult);
priorityQueue.poll();
}

});
tracker.addCharge(documentProducerFeedResponse.pageResult.getRequestCharge());
// Returning an empty Flux since we are only processing and managing state here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import java.util.function.BiFunction;

import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;

/**
* While this class is public, but it is not part of our published public APIs.
* This is meant to be internally used only by our sdk.
Expand Down Expand Up @@ -54,9 +56,10 @@ private static BiFunction<String, PipelinedDocumentQueryParams<Document>, Flux<I
CosmosQueryRequestOptions orderByCosmosQueryRequestOptions =
qryOptAccessor.clone(requestOptions);
if (queryInfo.hasNonStreamingOrderBy()) {
checkNotNull(continuationToken, "Can not use a continuation token for a vector search query ");
qryOptAccessor.getImpl(orderByCosmosQueryRequestOptions).setCustomItemSerializer(null);
documentQueryParams.setCosmosQueryRequestOptions(orderByCosmosQueryRequestOptions);
return NonStreamingOrderByDocumentQueryExecutionContext.createAsync(diagnosticsClientContext, client, documentQueryParams, collection, 1000);
return NonStreamingOrderByDocumentQueryExecutionContext.createAsync(diagnosticsClientContext, client, documentQueryParams, collection);
} else {
ModelBridgeInternal.setQueryRequestOptionsContinuationToken(orderByCosmosQueryRequestOptions, continuationToken);
qryOptAccessor.getImpl(orderByCosmosQueryRequestOptions).setCustomItemSerializer(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,26 @@ CosmosQueryRequestOptions setMaxItemCount(Integer maxItemCount) {
return this;
}

/**
* Gets the maximum item size to fetch during non-streaming order by queries.
*
* @return the max number of items for vector search.
*/
public Integer getMaxItemSizeForVectorSearch() {
return this.actualRequestOptions.getMaxItemSizeForVectorSearch();
}

/**
* Sets the maximum item size to fetch during non-streaming order by queries.
*
* @param maxItemSizeForVectorSearch the max number of items for vector search.
* return the CosmosQueryRequestOptions.
*/
public CosmosQueryRequestOptions setMaxItemSizeForVectorSearch(Integer maxItemSizeForVectorSearch) {
this.actualRequestOptions.setMaxItemSizeForVectorSearch(maxItemSizeForVectorSearch);
return this;
}

/**
* Gets the request continuation token.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,11 @@ public static Integer getMaxItemCountFromQueryRequestOptions(CosmosQueryRequestO
return options.getMaxItemCount();
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static Integer getMaxItemSizeForVectorSearchFromQueryRequestOptions(CosmosQueryRequestOptions options) {
return options.getMaxItemSizeForVectorSearch();
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static String getRequestContinuationFromQueryRequestOptions(CosmosQueryRequestOptions options) {
return options.getRequestContinuation();
Expand Down