diff --git a/sdk/cosmos/azure-cosmos-test/CHANGELOG.md b/sdk/cosmos/azure-cosmos-test/CHANGELOG.md index 8bfe27c5aa13..e7cdc6f02af1 100644 --- a/sdk/cosmos/azure-cosmos-test/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-test/CHANGELOG.md @@ -7,8 +7,12 @@ #### Breaking Changes #### Bugs Fixed +* Fixed an issue where `FaultInjectionRule` can not apply on partition level when using `Gateway` Mode and non-session consistency - See [40005](https://github.com/Azure/azure-sdk-for-java/pull/40005) -#### Other Changes +### 1.0.0-beta.7 (2024-05-03) + +#### Bugs Fixed +* Fixed an issue where `FaultInjectionRule` can not apply on partition level when using `Gateway` Mode and non-session consistency - See [40005](https://github.com/Azure/azure-sdk-for-java/pull/40005) ### 1.0.0-beta.7 (2024-05-03) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/VectorIndexTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/VectorIndexTest.java new file mode 100644 index 000000000000..d4f5b8cf6602 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/VectorIndexTest.java @@ -0,0 +1,345 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.rx; + +import com.azure.cosmos.ConsistencyLevel; +import com.azure.cosmos.CosmosAsyncClient; +import com.azure.cosmos.CosmosAsyncContainer; +import com.azure.cosmos.CosmosAsyncDatabase; +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.CosmosDatabaseForTest; +import com.azure.cosmos.CosmosException; +import com.azure.cosmos.DirectConnectionConfig; +import com.azure.cosmos.implementation.TestConfigurations; +import com.azure.cosmos.implementation.Utils; +import com.azure.cosmos.implementation.guava25.collect.ImmutableList; +import com.azure.cosmos.models.CosmosContainerProperties; +import com.azure.cosmos.models.CosmosVectorDataType; +import com.azure.cosmos.models.CosmosVectorDistanceFunction; +import com.azure.cosmos.models.CosmosVectorEmbedding; +import com.azure.cosmos.models.CosmosVectorEmbeddingPolicy; +import com.azure.cosmos.models.ExcludedPath; +import com.azure.cosmos.models.IncludedPath; +import com.azure.cosmos.models.IndexingMode; +import com.azure.cosmos.models.IndexingPolicy; +import com.azure.cosmos.models.PartitionKeyDefinition; +import com.azure.cosmos.models.CosmosVectorIndexSpec; +import com.azure.cosmos.models.CosmosVectorIndexType; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Ignore; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +@Ignore("TODO: Ignore these test cases until the public emulator with vector indexes is released.") +public class VectorIndexTest extends TestSuiteBase { + protected static final int TIMEOUT = 30000; + protected static final int SETUP_TIMEOUT = 20000; + protected static final int SHUTDOWN_TIMEOUT = 20000; + + protected static Logger logger = LoggerFactory.getLogger(VectorIndexTest.class.getSimpleName()); + private final ObjectMapper simpleObjectMapper = Utils.getSimpleObjectMapper(); + private final String databaseId = CosmosDatabaseForTest.generateId(); + private CosmosAsyncClient client; + private CosmosAsyncDatabase database; + + @BeforeClass(groups = {"emulator"}, timeOut = SETUP_TIMEOUT) + public void before_VectorIndexTest() { + // set up the client + client = new CosmosClientBuilder() + .endpoint(TestConfigurations.HOST) + .key(TestConfigurations.MASTER_KEY) + .directMode(DirectConnectionConfig.getDefaultConfig()) + .consistencyLevel(ConsistencyLevel.SESSION) + .contentResponseOnWriteEnabled(true) + .buildAsyncClient(); + + database = createDatabase(client, databaseId); + } + + @AfterClass(groups = {"emulator"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + public void afterClass() { + safeDeleteDatabase(database); + safeClose(client); + } + + @Test(groups = {"emulator"}, timeOut = TIMEOUT*10000) + public void shouldCreateVectorEmbeddingPolicy() { + PartitionKeyDefinition partitionKeyDef = new PartitionKeyDefinition(); + ArrayList paths = new ArrayList(); + paths.add("/mypk"); + partitionKeyDef.setPaths(paths); + + CosmosContainerProperties collectionDefinition = new CosmosContainerProperties(UUID.randomUUID().toString(), partitionKeyDef); + + IndexingPolicy indexingPolicy = new IndexingPolicy(); + indexingPolicy.setIndexingMode(IndexingMode.CONSISTENT); + ExcludedPath excludedPath = new ExcludedPath("/*"); + indexingPolicy.setExcludedPaths(Collections.singletonList(excludedPath)); + + IncludedPath includedPath1 = new IncludedPath("/name/?"); + IncludedPath includedPath2 = new IncludedPath("/description/?"); + indexingPolicy.setIncludedPaths(ImmutableList.of(includedPath1, includedPath2)); + + indexingPolicy.setVectorIndexes(populateVectorIndexes()); + + CosmosVectorEmbeddingPolicy cosmosVectorEmbeddingPolicy = new CosmosVectorEmbeddingPolicy(); + cosmosVectorEmbeddingPolicy.setCosmosVectorEmbeddings(populateEmbeddings()); + + collectionDefinition.setIndexingPolicy(indexingPolicy); + collectionDefinition.setVectorEmbeddingPolicy(cosmosVectorEmbeddingPolicy); + + database.createContainer(collectionDefinition).block(); + CosmosAsyncContainer createdCollection = database.getContainer(collectionDefinition.getId()); + CosmosContainerProperties collectionProperties = createdCollection.read().block().getProperties(); + validateCollectionProperties(collectionDefinition, collectionProperties); + } + + @Test(groups = {"emulator"}, timeOut = TIMEOUT) + public void shouldFailOnEmptyVectorEmbeddingPolicy() { + PartitionKeyDefinition partitionKeyDef = new PartitionKeyDefinition(); + ArrayList paths = new ArrayList(); + paths.add("/mypk"); + partitionKeyDef.setPaths(paths); + + CosmosContainerProperties collectionDefinition = new CosmosContainerProperties(UUID.randomUUID().toString(), partitionKeyDef); + + IndexingPolicy indexingPolicy = new IndexingPolicy(); + indexingPolicy.setIndexingMode(IndexingMode.CONSISTENT); + ExcludedPath excludedPath = new ExcludedPath("/*"); + indexingPolicy.setExcludedPaths(Collections.singletonList(excludedPath)); + + IncludedPath includedPath1 = new IncludedPath("/name/?"); + IncludedPath includedPath2 = new IncludedPath("/description/?"); + indexingPolicy.setIncludedPaths(ImmutableList.of(includedPath1, includedPath2)); + + CosmosVectorIndexSpec cosmosVectorIndexSpec = new CosmosVectorIndexSpec(); + cosmosVectorIndexSpec.setPath("/vector1"); + cosmosVectorIndexSpec.setType(CosmosVectorIndexType.FLAT.toString()); + indexingPolicy.setVectorIndexes(ImmutableList.of(cosmosVectorIndexSpec)); + + collectionDefinition.setIndexingPolicy(indexingPolicy); + + try { + database.createContainer(collectionDefinition).block(); + fail("Container creation will fail as no vector embedding policy is being passed"); + } catch (CosmosException ex) { + assertThat(ex.getStatusCode()).isEqualTo(400); + assertThat(ex.getMessage()).contains("vector1 not matching in Embedding's path"); + } + } + + @Test(groups = {"emulator"}, timeOut = TIMEOUT) + public void shouldFailOnWrongVectorIndex() { + PartitionKeyDefinition partitionKeyDef = new PartitionKeyDefinition(); + ArrayList paths = new ArrayList(); + paths.add("/mypk"); + partitionKeyDef.setPaths(paths); + + CosmosContainerProperties collectionDefinition = new CosmosContainerProperties(UUID.randomUUID().toString(), partitionKeyDef); + + IndexingPolicy indexingPolicy = new IndexingPolicy(); + indexingPolicy.setIndexingMode(IndexingMode.CONSISTENT); + ExcludedPath excludedPath = new ExcludedPath("/*"); + indexingPolicy.setExcludedPaths(Collections.singletonList(excludedPath)); + + IncludedPath includedPath1 = new IncludedPath("/name/?"); + IncludedPath includedPath2 = new IncludedPath("/description/?"); + indexingPolicy.setIncludedPaths(ImmutableList.of(includedPath1, includedPath2)); + + CosmosVectorIndexSpec cosmosVectorIndexSpec = new CosmosVectorIndexSpec(); + cosmosVectorIndexSpec.setPath("/vector1"); + cosmosVectorIndexSpec.setType("NonFlat"); + indexingPolicy.setVectorIndexes(ImmutableList.of(cosmosVectorIndexSpec)); + collectionDefinition.setIndexingPolicy(indexingPolicy); + + CosmosVectorEmbedding embedding = new CosmosVectorEmbedding(); + embedding.setPath("/vector1"); + embedding.setDataType(CosmosVectorDataType.FLOAT32); + embedding.setDimensions(3L); + embedding.setDistanceFunction(CosmosVectorDistanceFunction.COSINE); + CosmosVectorEmbeddingPolicy cosmosVectorEmbeddingPolicy = new CosmosVectorEmbeddingPolicy(); + cosmosVectorEmbeddingPolicy.setCosmosVectorEmbeddings(ImmutableList.of(embedding)); + collectionDefinition.setVectorEmbeddingPolicy(cosmosVectorEmbeddingPolicy); + + try { + database.createContainer(collectionDefinition).block(); + fail("Container creation will fail as wrong vector index type is being passed"); + } catch (CosmosException ex) { + assertThat(ex.getStatusCode()).isEqualTo(400); + assertThat(ex.getMessage()).contains("NonFlat is invalid, Valid types are 'flat' or 'quantizedFlat'"); + } + } + + @Test(groups = {"emulator"}, timeOut = TIMEOUT) + public void shouldCreateVectorIndexSimilarPathDifferentVectorType() { + PartitionKeyDefinition partitionKeyDef = new PartitionKeyDefinition(); + ArrayList paths = new ArrayList(); + paths.add("/mypk"); + partitionKeyDef.setPaths(paths); + + CosmosContainerProperties collectionDefinition = new CosmosContainerProperties(UUID.randomUUID().toString(), partitionKeyDef); + + IndexingPolicy indexingPolicy = new IndexingPolicy(); + indexingPolicy.setIndexingMode(IndexingMode.CONSISTENT); + ExcludedPath excludedPath = new ExcludedPath("/*"); + indexingPolicy.setExcludedPaths(Collections.singletonList(excludedPath)); + + IncludedPath includedPath1 = new IncludedPath("/name/?"); + IncludedPath includedPath2 = new IncludedPath("/description/?"); + indexingPolicy.setIncludedPaths(ImmutableList.of(includedPath1, includedPath2)); + + List vectorIndexes = populateVectorIndexes(); + vectorIndexes.get(2).setPath("/vector2"); + indexingPolicy.setVectorIndexes(vectorIndexes); + + List embeddings = populateEmbeddings(); + embeddings.get(2).setPath("/vector2"); + CosmosVectorEmbeddingPolicy cosmosVectorEmbeddingPolicy = new CosmosVectorEmbeddingPolicy(); + cosmosVectorEmbeddingPolicy.setCosmosVectorEmbeddings(embeddings); + + collectionDefinition.setIndexingPolicy(indexingPolicy); + collectionDefinition.setVectorEmbeddingPolicy(cosmosVectorEmbeddingPolicy); + + database.createContainer(collectionDefinition).block(); + CosmosAsyncContainer createdCollection = database.getContainer(collectionDefinition.getId()); + CosmosContainerProperties collectionProperties = createdCollection.read().block().getProperties(); + validateCollectionProperties(collectionDefinition, collectionProperties); + } + + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void shouldFailOnWrongVectorEmbeddingPolicy() { + CosmosVectorEmbedding embedding = new CosmosVectorEmbedding(); + try { + + embedding.setDataType(null); + fail("Embedding creation failed because cosmosVectorDataType argument is empty"); + } catch (NullPointerException ex) { + assertThat(ex.getMessage()).isEqualTo("cosmosVectorDataType cannot be empty"); + } + + try { + embedding.setDistanceFunction(null); + fail("Embedding creation failed because cosmosVectorDistanceFunction argument is empty"); + } catch (NullPointerException ex) { + assertThat(ex.getMessage()).isEqualTo("cosmosVectorDistanceFunction cannot be null"); + } + + try { + embedding.setDimensions(null); + fail("Embedding creation failed because dimensions argument is empty"); + } catch (NullPointerException ex) { + assertThat(ex.getMessage()).isEqualTo("dimensions cannot be empty"); + } + + try { + embedding.setDimensions(-1L); + fail("Vector Embedding policy creation will fail for negative dimensions being passed"); + } catch (IllegalArgumentException ex) { + assertThat(ex.getMessage()).isEqualTo("Dimensions for the embedding has to be a long value greater than 1 for the vector embedding policy"); + } + } + + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void shouldValidateVectorEmbeddingPolicySerializationAndDeserialization() throws JsonProcessingException { + IndexingPolicy indexingPolicy = new IndexingPolicy(); + indexingPolicy.setVectorIndexes(populateVectorIndexes()); + + CosmosVectorEmbeddingPolicy cosmosVectorEmbeddingPolicy = new CosmosVectorEmbeddingPolicy(); + cosmosVectorEmbeddingPolicy.setCosmosVectorEmbeddings(populateEmbeddings()); + String vectorEmbeddingPolicyJson = getVectorEmbeddingPolicyAsString(); + String expectedVectorEmbeddingPolicyJson = simpleObjectMapper.writeValueAsString(cosmosVectorEmbeddingPolicy); + assertThat(vectorEmbeddingPolicyJson).isEqualTo(expectedVectorEmbeddingPolicyJson); + + CosmosVectorEmbeddingPolicy expectedCosmosVectorEmbeddingPolicy = simpleObjectMapper.readValue(expectedVectorEmbeddingPolicyJson, CosmosVectorEmbeddingPolicy.class); + validateVectorEmbeddingPolicy(cosmosVectorEmbeddingPolicy, expectedCosmosVectorEmbeddingPolicy); + } + + private void validateCollectionProperties(CosmosContainerProperties collectionDefinition, CosmosContainerProperties collectionProperties) { + assertThat(collectionProperties.getVectorEmbeddingPolicy()).isNotNull(); + assertThat(collectionProperties.getVectorEmbeddingPolicy().getVectorEmbeddings()).isNotNull(); + validateVectorEmbeddingPolicy(collectionProperties.getVectorEmbeddingPolicy(), + collectionDefinition.getVectorEmbeddingPolicy()); + + assertThat(collectionProperties.getIndexingPolicy().getVectorIndexes()).isNotNull(); + validateVectorIndexes(collectionDefinition.getIndexingPolicy().getVectorIndexes(), collectionProperties.getIndexingPolicy().getVectorIndexes()); + } + + private void validateVectorEmbeddingPolicy(CosmosVectorEmbeddingPolicy actual, CosmosVectorEmbeddingPolicy expected) { + List actualEmbeddings = actual.getVectorEmbeddings(); + List expectedEmbeddings = expected.getVectorEmbeddings(); + assertThat(expectedEmbeddings).hasSameSizeAs(actualEmbeddings); + for (int i = 0; i < expectedEmbeddings.size(); i++) { + assertThat(expectedEmbeddings.get(i).getPath()).isEqualTo(actualEmbeddings.get(i).getPath()); + assertThat(expectedEmbeddings.get(i).getDataType()).isEqualTo(actualEmbeddings.get(i).getDataType()); + assertThat(expectedEmbeddings.get(i).getDimensions()).isEqualTo(actualEmbeddings.get(i).getDimensions()); + assertThat(expectedEmbeddings.get(i).getDistanceFunction()).isEqualTo(actualEmbeddings.get(i).getDistanceFunction()); + } + } + + private void validateVectorIndexes(List actual, List expected) { + assertThat(expected).hasSameSizeAs(actual); + for (int i = 0; i < expected.size(); i++) { + assertThat(expected.get(i).getPath()).isEqualTo(actual.get(i).getPath()); + assertThat(expected.get(i).getType()).isEqualTo(actual.get(i).getType()); + } + } + + private List populateVectorIndexes() { + CosmosVectorIndexSpec cosmosVectorIndexSpec1 = new CosmosVectorIndexSpec(); + cosmosVectorIndexSpec1.setPath("/vector1"); + cosmosVectorIndexSpec1.setType(CosmosVectorIndexType.FLAT.toString()); + + CosmosVectorIndexSpec cosmosVectorIndexSpec2 = new CosmosVectorIndexSpec(); + cosmosVectorIndexSpec2.setPath("/vector2"); + cosmosVectorIndexSpec2.setType(CosmosVectorIndexType.QUANTIZED_FLAT.toString()); + + CosmosVectorIndexSpec cosmosVectorIndexSpec3 = new CosmosVectorIndexSpec(); + cosmosVectorIndexSpec3.setPath("/vector3"); + cosmosVectorIndexSpec3.setType(CosmosVectorIndexType.DISK_ANN.toString()); + + return Arrays.asList(cosmosVectorIndexSpec1, cosmosVectorIndexSpec2, cosmosVectorIndexSpec3); + } + + private List populateEmbeddings() { + CosmosVectorEmbedding embedding1 = new CosmosVectorEmbedding(); + embedding1.setPath("/vector1"); + embedding1.setDataType(CosmosVectorDataType.INT8); + embedding1.setDimensions(3L); + embedding1.setDistanceFunction(CosmosVectorDistanceFunction.COSINE); + + CosmosVectorEmbedding embedding2 = new CosmosVectorEmbedding(); + embedding2.setPath("/vector2"); + embedding2.setDataType(CosmosVectorDataType.FLOAT32); + embedding2.setDimensions(3L); + embedding2.setDistanceFunction(CosmosVectorDistanceFunction.DOT_PRODUCT); + + CosmosVectorEmbedding embedding3 = new CosmosVectorEmbedding(); + embedding3.setPath("/vector3"); + embedding3.setDataType(CosmosVectorDataType.UINT8); + embedding3.setDimensions(3L); + embedding3.setDistanceFunction(CosmosVectorDistanceFunction.EUCLIDEAN); + return Arrays.asList(embedding1, embedding2, embedding3); + } + + private String getVectorEmbeddingPolicyAsString() { + return "{\"vectorEmbeddings\":[" + + "{\"path\":\"/vector1\",\"dataType\":\"int8\",\"dimensions\":3,\"distanceFunction\":\"cosine\"}," + + "{\"path\":\"/vector2\",\"dataType\":\"float32\",\"dimensions\":3,\"distanceFunction\":\"dotproduct\"}," + + "{\"path\":\"/vector3\",\"dataType\":\"uint8\",\"dimensions\":3,\"distanceFunction\":\"euclidean\"}" + + "]}"; + } +} diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 4ea8a84052bd..16cedfc04ef2 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -3,6 +3,9 @@ ### 4.60.0-beta.1 (Unreleased) #### Features Added +* Added `cosmosVectorEmbeddingPolicy` in `cosmosContainerProperties` and `vectorIndexes` in `indexPolicy` to support vector search in CosmosDB - See[39379](https://github.com/Azure/azure-sdk-for-java/pull/39379) + +* Added support for non-streaming OrderBy query and a query feature `NonStreamingOrderBy` to support Vector Search queries. - See [PR 39897](https://github.com/Azure/azure-sdk-for-java/pull/39897/) #### Breaking Changes @@ -11,10 +14,9 @@ #### Other Changes ### 4.59.0 (2024-04-27) - #### Features Added * Added public APIs `getCustomItemSerializer` and `setCustomItemSerializer` to allow customers to specify custom payload transformations or serialization settings. - See [PR 38997](https://github.com/Azure/azure-sdk-for-java/pull/38997) and [PR 39933](https://github.com/Azure/azure-sdk-for-java/pull/39933) - + #### Other Changes * Load Blackbird or Afterburner into the ObjectMapper depending upon Java version and presence of modules in classpath. Make Afterburner and Blackbird optional maven dependencies. See - [PR 39689](https://github.com/Azure/azure-sdk-for-java/pull/39689) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java index da368de5360d..c643379b2fe6 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java @@ -166,6 +166,13 @@ public class Configs { public static final String MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED = "COSMOS.MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED"; private static final int DEFAULT_MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED = 1; + private static final String MAX_ITEM_SIZE_FOR_VECTOR_SEARCH = "COSMOS.MAX_ITEM_SIZE_FOR_VECTOR_SEARCH"; + private static final int DEFAULT_MAX_ITEM_SIZE_FOR_VECTOR_SEARCH = 50000; + + private static final String MAX_ITEM_SIZE_FOR_VECTOR_SEARCH_ENABLED = "COSMOS.MAX_ITEM_SIZE_FOR_VECTOR_SEARCH_ENABLED"; + + private static final boolean DEFAULT_MAX_ITEM_SIZE_FOR_VECTOR_SEARCH_ENABLED = true; + public static final int MIN_MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED = 1; public static final String TCP_CONNECTION_ACQUISITION_TIMEOUT_IN_MS = "COSMOS.TCP_CONNECTION_ACQUISITION_TIMEOUT_IN_MS"; @@ -484,6 +491,14 @@ public static int getMaxRetriesInLocalRegionWhenRemoteRegionPreferred() { MIN_MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED); } + public static int getMaxItemSizeForVectorSearch() { + return getJVMConfigAsInt(MAX_ITEM_SIZE_FOR_VECTOR_SEARCH, DEFAULT_MAX_ITEM_SIZE_FOR_VECTOR_SEARCH); + } + + public static boolean getMaxItemSizeForVectorSearchEnabled() { + return getJVMConfigAsBoolean(MAX_ITEM_SIZE_FOR_VECTOR_SEARCH_ENABLED, DEFAULT_MAX_ITEM_SIZE_FOR_VECTOR_SEARCH_ENABLED); + } + public static Duration getMinRetryTimeInLocalRegionWhenRemoteRegionPreferred() { return Duration.ofMillis(Math.max( diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Constants.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Constants.java index 8409d5b7ec23..f789963783ed 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Constants.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Constants.java @@ -120,6 +120,15 @@ public static final class Properties { public static final String SPATIAL_INDEXES = "spatialIndexes"; public static final String TYPES = "types"; + // Vector Embedding Policy + public static final String VECTOR_EMBEDDING_POLICY = "vectorEmbeddingPolicy"; + public static final String VECTOR_INDEXES = "vectorIndexes"; + public static final String VECTOR_EMBEDDINGS = "vectorEmbeddings"; + public static final String VECTOR_INDEX_TYPE = "type"; + public static final String VECTOR_DATA_TYPE = "dataType"; + public static final String VECTOR_DIMENSIONS = "dimensions"; + public static final String DISTANCE_FUNCTION = "distanceFunction"; + // Unique index. public static final String UNIQUE_KEY_POLICY = "uniqueKeyPolicy"; public static final String UNIQUE_KEYS = "uniqueKeys"; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosQueryRequestOptionsImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosQueryRequestOptionsImpl.java index 4dc62089acc5..6d7b00068393 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosQueryRequestOptionsImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosQueryRequestOptionsImpl.java @@ -25,6 +25,7 @@ public final class CosmosQueryRequestOptionsImpl extends CosmosQueryRequestOptio private boolean queryPlanRetrievalDisallowed; private boolean emptyPageDiagnosticsEnabled; private String queryName; + private Integer maxItemSizeForVectorSearch; private List cancelledRequestDiagnosticsTracker = new ArrayList<>(); /** @@ -62,6 +63,7 @@ public CosmosQueryRequestOptionsImpl(CosmosQueryRequestOptionsImpl options) { this.queryName = options.queryName; this.feedRange = options.feedRange; this.cancelledRequestDiagnosticsTracker = options.cancelledRequestDiagnosticsTracker; + this.maxItemSizeForVectorSearch = options.maxItemSizeForVectorSearch; } /** @@ -196,6 +198,26 @@ public CosmosQueryRequestOptionsImpl setMaxItemCount(Integer maxItemCount) { return this; } + /** + * Gets the maximum item size to fetch during non-streaming order by queries. + * + * @return the max number of items for vector search. + */ + public Integer getMaxItemSizeForVectorSearch() { + return this.maxItemSizeForVectorSearch; + } + + /** + * Sets the maximum item size to fetch during non-streaming order by queries. + * + * @param maxItemSizeForVectorSearch the max number of items for vector search. + * return the CosmosQueryRequestOptions. + */ + public CosmosQueryRequestOptionsImpl setMaxItemSizeForVectorSearch(Integer maxItemSizeForVectorSearch) { + this.maxItemSizeForVectorSearch = maxItemSizeForVectorSearch; + return this; + } + /** * Gets the request continuation token. * diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentCollection.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentCollection.java index 678bdb90378c..1930f2275a61 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentCollection.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentCollection.java @@ -6,10 +6,11 @@ import com.azure.cosmos.CosmosItemSerializer; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.implementation.caches.SerializableWrapper; -import com.azure.cosmos.models.ClientEncryptionPolicy; import com.azure.cosmos.models.ChangeFeedPolicy; +import com.azure.cosmos.models.ClientEncryptionPolicy; import com.azure.cosmos.models.ComputedProperty; import com.azure.cosmos.models.ConflictResolutionPolicy; +import com.azure.cosmos.models.CosmosVectorEmbeddingPolicy; import com.azure.cosmos.models.IndexingPolicy; import com.azure.cosmos.models.ModelBridgeInternal; import com.azure.cosmos.models.PartitionKeyDefinition; @@ -24,6 +25,8 @@ import java.util.Collection; import java.util.Collections; +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; + /** * Represents a document collection in the Azure Cosmos DB database service. A collection is a named logical container * for documents. @@ -40,6 +43,7 @@ public final class DocumentCollection extends Resource { private UniqueKeyPolicy uniqueKeyPolicy; private PartitionKeyDefinition partitionKeyDefinition; private ClientEncryptionPolicy clientEncryptionPolicyInternal; + private CosmosVectorEmbeddingPolicy cosmosVectorEmbeddingPolicy; /** * Constructor. @@ -410,6 +414,33 @@ public void setClientEncryptionPolicy(ClientEncryptionPolicy value) { this.set(Constants.Properties.CLIENT_ENCRYPTION_POLICY, value, CosmosItemSerializer.DEFAULT_SERIALIZER); } + /** + * Gets the Vector Embedding Policy containing paths for embeddings along with path-specific settings for the item + * used in performing vector search on the items in a collection in the Azure CosmosDB database service. + * + * @return the Vector Embedding Policy. + */ + public CosmosVectorEmbeddingPolicy getVectorEmbeddingPolicy() { + if (this.cosmosVectorEmbeddingPolicy == null) { + if (super.has(Constants.Properties.VECTOR_EMBEDDING_POLICY)) { + this.cosmosVectorEmbeddingPolicy = super.getObject(Constants.Properties.VECTOR_EMBEDDING_POLICY, + CosmosVectorEmbeddingPolicy.class); + } + } + return this.cosmosVectorEmbeddingPolicy; + } + + /** + * Sets the Vector Embedding Policy containing paths for embeddings along with path-specific settings for the item + * used in performing vector search on the items in a collection in the Azure CosmosDB database service. + * + * @param value the Vector Embedding Policy. + */ + public void setVectorEmbeddingPolicy(CosmosVectorEmbeddingPolicy value) { + checkNotNull(value, "cosmosVectorEmbeddingPolicy cannot be null"); + this.set(Constants.Properties.VECTOR_EMBEDDING_POLICY, value, CosmosItemSerializer.DEFAULT_SERIALIZER); + } + public void populatePropertyBag() { super.populatePropertyBag(); if (this.indexingPolicy == null) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextFactory.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextFactory.java index a13cfc65348d..7a530816856b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextFactory.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextFactory.java @@ -4,9 +4,11 @@ import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.implementation.BadRequestException; +import com.azure.cosmos.implementation.Configs; import com.azure.cosmos.implementation.Constants; import com.azure.cosmos.implementation.DiagnosticsClientContext; import com.azure.cosmos.implementation.DocumentCollection; +import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.OperationType; import com.azure.cosmos.implementation.PartitionKeyRange; @@ -239,7 +241,8 @@ private static boolean canCacheQuery(QueryInfo queryInfo) { && !queryInfo.hasTop() && !queryInfo.hasOffset() && !queryInfo.hasDCount() - && !queryInfo.hasOrderBy(); + && !queryInfo.hasOrderBy() + && !queryInfo.hasNonStreamingOrderBy(); } private static boolean isScopedToSinglePartition(CosmosQueryRequestOptions cosmosQueryRequestOptions) { @@ -358,6 +361,37 @@ public static Flux> createSpecia boolean getLazyFeedResponse = queryInfo.hasTop(); + // We need to compute the optimal initial age size for non-streaming order-by queries + if (queryInfo.hasNonStreamingOrderBy() && Configs.getMaxItemSizeForVectorSearchEnabled()) { + // Validate the TOP or LIMIT for non-streaming order-by queries + if (!queryInfo.hasTop() && !queryInfo.hasLimit() && queryInfo.getTop() < 0 && queryInfo.getLimit() < 0) { + throw new NonStreamingOrderByBadRequestException(HttpConstants.StatusCodes.BADREQUEST, + "Executing a vector search query without TOP or LIMIT can consume a large number of RUs" + + "very fast and have long runtimes. Please ensure you are using one of the above two filters" + + "with you vector search query."); + } + // Validate the size of TOP or LIMIT against MaxItemSizeForVectorSearch + int maxLimit = Math.max(queryInfo.hasTop() ? queryInfo.getTop() : 0, + queryInfo.hasLimit() ? queryInfo.getLimit() : 0); + int maxItemSizeForVectorSearch = Math.max(Configs.getMaxItemSizeForVectorSearch(), + ModelBridgeInternal.getMaxItemSizeForVectorSearchFromQueryRequestOptions(cosmosQueryRequestOptions)); + if (maxLimit > maxItemSizeForVectorSearch) { + throw new NonStreamingOrderByBadRequestException(HttpConstants.StatusCodes.BADREQUEST, + "Executing a vector search query with TOP or LIMIT larger than the maxItemSizeForVectorSearch " + + "is not allowed"); + } + // Set initialPageSize based on the smallest of TOP or LIMIT + if (queryInfo.hasTop() || queryInfo.hasLimit()) { + int pageSizeWithTopOrLimit = Math.min(queryInfo.hasTop() ? queryInfo.getTop() : Integer.MAX_VALUE, + queryInfo.hasLimit() ? queryInfo.getLimit() : Integer.MAX_VALUE); + if (initialPageSize > 0) { + initialPageSize = Math.min(pageSizeWithTopOrLimit, initialPageSize); + } else { + initialPageSize = pageSizeWithTopOrLimit; + } + } + } + // We need to compute the optimal initial page size for order-by queries if (queryInfo.hasOrderBy()) { int top; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/NonStreamingOrderByBadRequestException.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/NonStreamingOrderByBadRequestException.java new file mode 100644 index 000000000000..4b3b41fbc675 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/NonStreamingOrderByBadRequestException.java @@ -0,0 +1,20 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.implementation.query; + +import com.azure.cosmos.CosmosException; + +public class NonStreamingOrderByBadRequestException extends CosmosException { + + private static final long serialVersionUID = 1L; + + /** + * Creates a new instance of the NonStreamingOrderByBadRequestException class. + * + * @param statusCode the http status code of the response. + * @param errorMessage the error message. + */ + public NonStreamingOrderByBadRequestException(int statusCode, String errorMessage) { + super(statusCode, errorMessage); + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/NonStreamingOrderByDocumentProducer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/NonStreamingOrderByDocumentProducer.java new file mode 100644 index 000000000000..c64ef5d1fbda --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/NonStreamingOrderByDocumentProducer.java @@ -0,0 +1,43 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.query; + +import com.azure.cosmos.implementation.Document; +import com.azure.cosmos.implementation.DocumentClientRetryPolicy; +import com.azure.cosmos.implementation.RxDocumentServiceRequest; +import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl; +import com.azure.cosmos.implementation.query.orderbyquery.OrderbyRowComparer; +import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.FeedResponse; +import reactor.core.publisher.Mono; + +import java.util.UUID; +import java.util.function.Function; +import java.util.function.Supplier; + +public class NonStreamingOrderByDocumentProducer extends DocumentProducer { + private final OrderbyRowComparer consumeComparer; + + NonStreamingOrderByDocumentProducer( + OrderbyRowComparer consumeComparer, + IDocumentQueryClient client, + String collectionResourceId, + CosmosQueryRequestOptions cosmosQueryRequestOptions, + TriFunction createRequestFunc, + Function>> executeRequestFunc, + FeedRangeEpkImpl feedRange, + String collectionLink, + Supplier createRetryPolicyFunc, + Class resourceType, + UUID correlatedActivityId, + int initialPageSize, + String initialContinuationToken, + int top, + Supplier operationContextTextProvider) { + super(client, collectionResourceId, cosmosQueryRequestOptions, createRequestFunc, executeRequestFunc, + collectionLink, createRetryPolicyFunc, resourceType, correlatedActivityId, initialPageSize, + initialContinuationToken, top, feedRange, operationContextTextProvider); + this.consumeComparer = consumeComparer; + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/NonStreamingOrderByDocumentQueryExecutionContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/NonStreamingOrderByDocumentQueryExecutionContext.java new file mode 100644 index 000000000000..2f3abb36e9d1 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/NonStreamingOrderByDocumentQueryExecutionContext.java @@ -0,0 +1,268 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.query; + +import com.azure.cosmos.BridgeInternal; +import com.azure.cosmos.CosmosException; +import com.azure.cosmos.implementation.ClientSideRequestStatistics; +import com.azure.cosmos.implementation.DiagnosticsClientContext; +import com.azure.cosmos.implementation.Document; +import com.azure.cosmos.implementation.DocumentClientRetryPolicy; +import com.azure.cosmos.implementation.DocumentCollection; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.ImplementationBridgeHelpers; +import com.azure.cosmos.implementation.QueryMetrics; +import com.azure.cosmos.implementation.RequestChargeTracker; +import com.azure.cosmos.implementation.ResourceType; +import com.azure.cosmos.implementation.RxDocumentServiceRequest; +import com.azure.cosmos.implementation.Utils; +import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl; +import com.azure.cosmos.implementation.query.orderbyquery.OrderByRowResult; +import com.azure.cosmos.implementation.query.orderbyquery.OrderbyRowComparer; +import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.FeedResponse; +import com.azure.cosmos.models.ModelBridgeInternal; +import com.azure.cosmos.models.SqlQuerySpec; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class NonStreamingOrderByDocumentQueryExecutionContext + extends ParallelDocumentQueryExecutionContextBase { + + private final static + ImplementationBridgeHelpers.CosmosDiagnosticsHelper.CosmosDiagnosticsAccessor diagnosticsAccessor = + ImplementationBridgeHelpers.CosmosDiagnosticsHelper.getCosmosDiagnosticsAccessor(); + + private static final ImplementationBridgeHelpers.FeedResponseHelper.FeedResponseAccessor feedResponseAccessor = + ImplementationBridgeHelpers.FeedResponseHelper.getFeedResponseAccessor(); + + private final static String FormatPlaceHolder = "{documentdb-formattableorderbyquery-filter}"; + private final static String True = "true"; + + private final OrderbyRowComparer consumeComparer; + private final RequestChargeTracker tracker; + private final ConcurrentMap queryMetricMap; + private final Collection clientSideRequestStatistics; + private Flux> orderByObservable; + + private int maxPageSizePerPartition; + + public NonStreamingOrderByDocumentQueryExecutionContext( + DiagnosticsClientContext diagnosticsClientContext, + IDocumentQueryClient client, + ResourceType resourceTypeEnum, + SqlQuerySpec query, + CosmosQueryRequestOptions cosmosQueryRequestOptions, + String resourceLink, + String rewrittenQuery, + OrderbyRowComparer consumeComparer, + UUID correlatedActivityId, + boolean hasSelectValue, + final AtomicBoolean isQueryCancelledOnTimeout) { + super(diagnosticsClientContext, client, resourceTypeEnum, Document.class, query, cosmosQueryRequestOptions, + resourceLink, rewrittenQuery, correlatedActivityId, hasSelectValue, isQueryCancelledOnTimeout); + this.consumeComparer = consumeComparer; + this.tracker = new RequestChargeTracker(); + this.queryMetricMap = new ConcurrentHashMap<>(); + this.clientSideRequestStatistics = ConcurrentHashMap.newKeySet(); + } + + public static Flux> createAsync( + DiagnosticsClientContext diagnosticsClientContext, + IDocumentQueryClient client, + PipelinedDocumentQueryParams initParams, + DocumentCollection collection) { + + QueryInfo queryInfo = initParams.getQueryInfo(); + + NonStreamingOrderByDocumentQueryExecutionContext context = new NonStreamingOrderByDocumentQueryExecutionContext( + diagnosticsClientContext, + client, + initParams.getResourceTypeEnum(), + initParams.getQuery(), + initParams.getCosmosQueryRequestOptions(), + initParams.getResourceLink(), + initParams.getQueryInfo().getRewrittenQuery(), + new OrderbyRowComparer<>(queryInfo.getOrderBy()), + initParams.getCorrelatedActivityId(), + queryInfo.hasSelectValue(), + initParams.isQueryCancelledOnTimeout()); + + context.setTop(initParams.getTop()); + + try { + context.initialize( + initParams.getFeedRanges(), + initParams.getQueryInfo().getOrderBy(), + initParams.getQueryInfo().getOrderByExpressions(), + initParams.getInitialPageSize(), + collection); + + return Flux.just(context); + } catch (CosmosException dce) { + return Flux.error(dce); + } + } + + private void initialize( + List feedRanges, List sortOrders, + Collection orderByExpressions, + int initialPageSize, + DocumentCollection collection) throws CosmosException { + // Since the continuation token will always be null, + // we don't need to handle any initialization based on continuationToken. + // We can directly initialize without any consideration for continuationToken. + Map partitionKeyRangeToContinuationToken = new HashMap<>(); + for (FeedRangeEpkImpl feedRangeEpk : feedRanges) { + partitionKeyRangeToContinuationToken.put(feedRangeEpk, + null); + } + super.initialize(collection, + partitionKeyRangeToContinuationToken, + initialPageSize, + new SqlQuerySpec(querySpec.getQueryText().replace(FormatPlaceHolder, True), + querySpec.getParameters())); + + orderByObservable = NonStreamingOrderByUtils.nonStreamingOrderedMerge( + consumeComparer, + tracker, + documentProducers, + initialPageSize, + queryMetricMap, + clientSideRequestStatistics); + } + + @Override + protected NonStreamingOrderByDocumentProducer createDocumentProducer( + String collectionRid, + String continuationToken, + int initialPageSize, + CosmosQueryRequestOptions cosmosQueryRequestOptions, + SqlQuerySpec querySpecForInit, + Map commonRequestHeaders, + TriFunction createRequestFunc, + Function>> executeFunc, + Supplier createRetryPolicyFunc, + FeedRangeEpkImpl feedRange) { + return new NonStreamingOrderByDocumentProducer( + consumeComparer, + client, + collectionRid, + cosmosQueryRequestOptions, + createRequestFunc, + executeFunc, + feedRange, + collectionRid, + createRetryPolicyFunc, + Document.class, + correlatedActivityId, + maxPageSizePerPartition, + continuationToken, + top, + this.getOperationContextTextProvider()); + } + + @Override + public Flux> drainAsync(int maxPageSize) { + return this.orderByObservable.transformDeferred(new ItemToPageTransformer(tracker, + maxPageSize, + this.queryMetricMap, + this.clientSideRequestStatistics)); + } + + @Override + public Flux> executeAsync() { + return drainAsync(ModelBridgeInternal.getMaxItemCountFromQueryRequestOptions(cosmosQueryRequestOptions)); + } + + private static class ItemToPageTransformer implements + Function>, Flux>> { + private final static int DEFAULT_PAGE_SIZE = 100; + private final RequestChargeTracker tracker; + private final int maxPageSize; + private final ConcurrentMap queryMetricMap; + private final Collection clientSideRequestStatistics; + + public ItemToPageTransformer(RequestChargeTracker tracker, + int maxPageSize, + ConcurrentMap queryMetricsMap, + Collection clientSideRequestStatistics) { + this.tracker = tracker; + this.maxPageSize = maxPageSize > 0 ? maxPageSize : DEFAULT_PAGE_SIZE; + this.queryMetricMap = queryMetricsMap; + this.clientSideRequestStatistics = clientSideRequestStatistics; + } + + private static Map headerResponse(double requestCharge) { + return Utils.immutableMapOf(HttpConstants.HttpHeaders.REQUEST_CHARGE, String.valueOf(requestCharge)); + } + + @Override + public Flux> apply(Flux> source) { + return source + .window(maxPageSize).map(Flux::collectList) + .flatMap(resultListObs -> resultListObs, 1) + .map(orderByRowResults -> { + // construct a page from result with request charge + FeedResponse> feedResponse = feedResponseAccessor.createFeedResponse( + orderByRowResults, + headerResponse(tracker.getAndResetCharge()), + null); + if (!queryMetricMap.isEmpty()) { + for (Map.Entry entry : queryMetricMap.entrySet()) { + BridgeInternal.putQueryMetricsIntoMap(feedResponse, + entry.getKey(), + entry.getValue()); + } + } + return feedResponse; + }) + .concatWith(Flux.defer(() -> { + return Flux.just(feedResponseAccessor.createFeedResponse(Utils.immutableListOf(), + null, null)); + })) + .map(feedOfOrderByRowResults -> { + List unwrappedResults = new ArrayList<>(); + for (OrderByRowResult orderByRowResult : feedOfOrderByRowResults.getResults()) { + unwrappedResults.add(orderByRowResult.getPayload()); + } + FeedResponse feedResponse = BridgeInternal.createFeedResponseWithQueryMetrics(unwrappedResults, + feedOfOrderByRowResults.getResponseHeaders(), + BridgeInternal.queryMetricsFromFeedResponse(feedOfOrderByRowResults), + ModelBridgeInternal.getQueryPlanDiagnosticsContext(feedOfOrderByRowResults), + false, + false, feedOfOrderByRowResults.getCosmosDiagnostics()); + diagnosticsAccessor.addClientSideDiagnosticsToFeed( + feedResponse.getCosmosDiagnostics(), clientSideRequestStatistics); + return feedResponse; + }).switchIfEmpty(Flux.defer(() -> { + // create an empty page if there is no result + FeedResponse frp = BridgeInternal.createFeedResponseWithQueryMetrics(Utils.immutableListOf(), + headerResponse( + tracker.getAndResetCharge()), + queryMetricMap, + null, + false, + false, + null); + diagnosticsAccessor.addClientSideDiagnosticsToFeed( + frp.getCosmosDiagnostics(), clientSideRequestStatistics); + return Flux.just(frp); + })); + } + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/NonStreamingOrderByUtils.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/NonStreamingOrderByUtils.java new file mode 100644 index 000000000000..b4ac33a1dbc2 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/NonStreamingOrderByUtils.java @@ -0,0 +1,114 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.implementation.query; + +import com.azure.cosmos.BridgeInternal; +import com.azure.cosmos.implementation.ClientSideRequestStatistics; +import com.azure.cosmos.implementation.Configs; +import com.azure.cosmos.implementation.Document; +import com.azure.cosmos.implementation.ImplementationBridgeHelpers; +import com.azure.cosmos.implementation.QueryMetrics; +import com.azure.cosmos.implementation.RequestChargeTracker; +import com.azure.cosmos.implementation.Resource; +import com.azure.cosmos.implementation.query.orderbyquery.OrderByRowResult; +import com.azure.cosmos.implementation.query.orderbyquery.OrderbyRowComparer; +import reactor.core.publisher.Flux; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.function.Function; + +public class NonStreamingOrderByUtils { + private final static + ImplementationBridgeHelpers.CosmosDiagnosticsHelper.CosmosDiagnosticsAccessor diagnosticsAccessor = + ImplementationBridgeHelpers.CosmosDiagnosticsHelper.getCosmosDiagnosticsAccessor(); + + public static Flux> nonStreamingOrderedMerge(OrderbyRowComparer consumeComparer, + RequestChargeTracker tracker, + List> documentProducers, + int initialPageSize, + Map queryMetricsMap, + Collection clientSideRequestStatistics) { + @SuppressWarnings("unchecked") + Flux>[] fluxes = documentProducers + .subList(0, documentProducers.size()) + .stream() + .map(producer -> + toNonStreamingOrderByQueryResultObservable(producer, tracker, queryMetricsMap, initialPageSize, + consumeComparer, clientSideRequestStatistics)) + .toArray(Flux[]::new); + return Flux.mergeComparingDelayError(1,consumeComparer, fluxes); + } + + private static Flux> toNonStreamingOrderByQueryResultObservable(DocumentProducer producer, + RequestChargeTracker tracker, + Map queryMetricsMap, + int initialPageSize, + OrderbyRowComparer consumeComparer, + Collection clientSideRequestStatisticsList) { + return producer + .produceAsync() + .transformDeferred(new NonStreamingOrderByUtils.PageToItemTransformer(tracker, queryMetricsMap, initialPageSize, + consumeComparer, clientSideRequestStatisticsList)); + } + + private static class PageToItemTransformer implements + Function.DocumentProducerFeedResponse>, Flux>> { + private final RequestChargeTracker tracker; + private final Map queryMetricsMap; + private final Integer initialPageSize; + private final OrderbyRowComparer consumeComparer; + private final Collection clientSideRequestStatistics; + + private PageToItemTransformer(RequestChargeTracker tracker, Map queryMetricsMap, + Integer initialPageSize, OrderbyRowComparer consumeComparer, + Collection clientSideRequestStatistics) { + this.tracker = tracker; + this.queryMetricsMap = queryMetricsMap; + this.initialPageSize = initialPageSize; + this.consumeComparer = consumeComparer; + this.clientSideRequestStatistics = clientSideRequestStatistics; + } + + @Override + public Flux> apply(Flux.DocumentProducerFeedResponse> source) { + // the size of the priority queue is set to size+1, because when the pq reaches the max size we add that + // item and then remove the element. If we don't do this, then when adding this element the size of the pq + // will be increased automatically by 50% and then there would be inconsistent results for later pages. + PriorityBlockingQueue> priorityQueue = new PriorityBlockingQueue<>(initialPageSize + 1, consumeComparer); + + return source.flatMap(documentProducerFeedResponse -> { + clientSideRequestStatistics.addAll( + diagnosticsAccessor.getClientSideRequestStatisticsForQueryPipelineAggregations(documentProducerFeedResponse + .pageResult.getCosmosDiagnostics())); + + QueryMetrics.mergeQueryMetricsMap(queryMetricsMap, + BridgeInternal.queryMetricsFromFeedResponse(documentProducerFeedResponse.pageResult)); + List results = documentProducerFeedResponse.pageResult.getResults(); + results.forEach(r -> { + OrderByRowResult orderByRowResult = new OrderByRowResult( + r.toJson(), + documentProducerFeedResponse.sourceFeedRange, + null); + if (Configs.getMaxItemSizeForVectorSearchEnabled()) { + if (priorityQueue.size() < initialPageSize) { + priorityQueue.add(orderByRowResult); + } else { + priorityQueue.add(orderByRowResult); + priorityQueue.poll(); + } + } else { + priorityQueue.add(orderByRowResult); + } + + }); + tracker.addCharge(documentProducerFeedResponse.pageResult.getRequestCharge()); + // Returning an empty Flux since we are only processing and managing state here + return Flux.empty(); + }, 1) + .thenMany(Flux.defer(() -> Flux.fromIterable(priorityQueue))); + } + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContext.java index 020b8633d2be..6ed30d3a7fbe 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContext.java @@ -106,7 +106,8 @@ public static Flux> createAsync( || queryInfo.hasAggregates() || queryInfo.hasGroupBy() || queryInfo.hasDCount() - || queryInfo.hasDistinct()), + || queryInfo.hasDistinct() + || queryInfo.hasNonStreamingOrderBy()), initParams.isQueryCancelledOnTimeout()); context.setTop(initParams.getTop()); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PipelinedDocumentQueryExecutionContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PipelinedDocumentQueryExecutionContext.java index 7b6104271762..82746ad412b5 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PipelinedDocumentQueryExecutionContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PipelinedDocumentQueryExecutionContext.java @@ -5,6 +5,7 @@ import com.azure.cosmos.CosmosItemSerializer; import com.azure.cosmos.implementation.DiagnosticsClientContext; import com.azure.cosmos.implementation.DocumentCollection; +import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.Document; import com.azure.cosmos.implementation.ObjectNodeMap; @@ -15,6 +16,8 @@ import java.util.function.BiFunction; +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; + /** * While this class is public, but it is not part of our published public APIs. * This is meant to be internally used only by our sdk. @@ -53,12 +56,21 @@ private static BiFunction, Flux { CosmosQueryRequestOptions orderByCosmosQueryRequestOptions = qryOptAccessor.clone(requestOptions); - ModelBridgeInternal.setQueryRequestOptionsContinuationToken(orderByCosmosQueryRequestOptions, continuationToken); - qryOptAccessor.getImpl(orderByCosmosQueryRequestOptions).setCustomItemSerializer(null); - - documentQueryParams.setCosmosQueryRequestOptions(orderByCosmosQueryRequestOptions); - - return OrderByDocumentQueryExecutionContext.createAsync(diagnosticsClientContext, client, documentQueryParams, collection); + if (queryInfo.hasNonStreamingOrderBy()) { + if (continuationToken != null) { + throw new NonStreamingOrderByBadRequestException( + HttpConstants.StatusCodes.BADREQUEST, + "Can not use a continuation token for a vector search query"); + } + qryOptAccessor.getImpl(orderByCosmosQueryRequestOptions).setCustomItemSerializer(null); + documentQueryParams.setCosmosQueryRequestOptions(orderByCosmosQueryRequestOptions); + return NonStreamingOrderByDocumentQueryExecutionContext.createAsync(diagnosticsClientContext, client, documentQueryParams, collection); + } else { + ModelBridgeInternal.setQueryRequestOptionsContinuationToken(orderByCosmosQueryRequestOptions, continuationToken); + qryOptAccessor.getImpl(orderByCosmosQueryRequestOptions).setCustomItemSerializer(null); + documentQueryParams.setCosmosQueryRequestOptions(orderByCosmosQueryRequestOptions); + return OrderByDocumentQueryExecutionContext.createAsync(diagnosticsClientContext, client, documentQueryParams, collection); + } }; } else { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PipelinedQueryExecutionContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PipelinedQueryExecutionContext.java index c7f67711fabd..6c3fa2827216 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PipelinedQueryExecutionContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PipelinedQueryExecutionContext.java @@ -137,7 +137,7 @@ public Flux> executeAsync() { } private static QueryInfo validateQueryInfo(QueryInfo queryInfo) { - if (queryInfo.hasOrderBy() || queryInfo.hasAggregates() || queryInfo.hasGroupBy()) { + if (queryInfo.hasOrderBy() || queryInfo.hasAggregates() || queryInfo.hasGroupBy() || queryInfo.hasNonStreamingOrderBy()) { // Any query with order by, aggregates or group by needs to go through the Document query pipeline throw new IllegalStateException("This query must not use the simple query pipeline."); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PipelinedQueryExecutionContextBase.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PipelinedQueryExecutionContextBase.java index 232f942b8590..39c6c27efad8 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PipelinedQueryExecutionContextBase.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PipelinedQueryExecutionContextBase.java @@ -63,7 +63,8 @@ public static Flux> createAsync( || queryInfo.hasAggregates() || queryInfo.hasGroupBy() || queryInfo.hasDCount() - || queryInfo.hasDistinct()) { + || queryInfo.hasDistinct() + || queryInfo.hasNonStreamingOrderBy()) { return PipelinedDocumentQueryExecutionContext.createAsyncCore( diagnosticsClientContext, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryFeature.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryFeature.java index ce2b4865767c..1dd3e5928b92 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryFeature.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryFeature.java @@ -14,5 +14,6 @@ public enum QueryFeature { OrderBy, Top, NonValueAggregate, - DCount + DCount, + NonStreamingOrderBy } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryInfo.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryInfo.java index 65efb92c3325..48870a5ae52d 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryInfo.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryInfo.java @@ -36,6 +36,7 @@ public final class QueryInfo extends JsonSerializable { private DistinctQueryType distinctQueryType; private QueryPlanDiagnosticsContext queryPlanDiagnosticsContext; private DCountInfo dCountInfo; + private boolean nonStreamingOrderBy; public QueryInfo() { } @@ -160,6 +161,11 @@ public boolean hasGroupBy() { return groupByExpressions != null && !groupByExpressions.isEmpty(); } + public boolean hasNonStreamingOrderBy() { + this.nonStreamingOrderBy = Boolean.TRUE.equals(super.getBoolean("hasNonStreamingOrderBy")); + return this.nonStreamingOrderBy; + } + public Map getGroupByAliasToAggregateType(){ Map groupByAliasToAggregateMap; groupByAliasToAggregateMap = super.getMap("groupByAliasToAggregateType"); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryPlanRetriever.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryPlanRetriever.java index ce91ff2029e9..83f82dbde363 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryPlanRetriever.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryPlanRetriever.java @@ -43,7 +43,8 @@ class QueryPlanRetriever { QueryFeature.GroupBy.name() + ", " + QueryFeature.Top.name() + ", " + QueryFeature.DCount.name() + ", " + - QueryFeature.NonValueAggregate.name(); + QueryFeature.NonValueAggregate.name() + ", " + + QueryFeature.NonStreamingOrderBy.name(); static Mono getQueryPlanThroughGatewayAsync(DiagnosticsClientContext diagnosticsClientContext, IDocumentQueryClient queryClient, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CompositePath.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CompositePath.java index a278e0aa8f50..8cfd99b46e37 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CompositePath.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CompositePath.java @@ -93,7 +93,7 @@ public CompositePathSortOrder getOrder() { } /** - * Gets the sort order for the composite path. + * Sets the sort order for the composite path. *

* For example if you want to run the query "SELECT * FROM c ORDER BY c.age asc, c.height desc", * then you need to make the order for "/age" "ascending" and the order for "/height" "descending". diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosContainerProperties.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosContainerProperties.java index 4fae5a797a70..0d357da0cc37 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosContainerProperties.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosContainerProperties.java @@ -347,6 +347,28 @@ public CosmosContainerProperties setClientEncryptionPolicy(ClientEncryptionPolic return this; } + /** + * Gets the Vector Embedding Policy containing paths for embeddings along with path-specific settings for the item + * used in performing vector search on the items in a collection in the Azure CosmosDB database service. + * + * @return the Vector Embedding Policy. + */ + public CosmosVectorEmbeddingPolicy getVectorEmbeddingPolicy() { + return this.documentCollection.getVectorEmbeddingPolicy(); + } + + /** + * Sets the Vector Embedding Policy containing paths for embeddings along with path-specific settings for the item + * used in performing vector search on the items in a collection in the Azure CosmosDB database service. + * + * @param value the Vector Embedding Policy. + * @return the CosmosContainerProperties. + */ + public CosmosContainerProperties setVectorEmbeddingPolicy(CosmosVectorEmbeddingPolicy value) { + this.documentCollection.setVectorEmbeddingPolicy(value); + return this; + } + Resource getResource() { return this.documentCollection; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosQueryRequestOptions.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosQueryRequestOptions.java index c9bf3909fd5d..f9d7d4a72531 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosQueryRequestOptions.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosQueryRequestOptions.java @@ -256,6 +256,26 @@ CosmosQueryRequestOptions setMaxItemCount(Integer maxItemCount) { return this; } + /** + * Gets the maximum item size to fetch during non-streaming order by queries. + * + * @return the max number of items for vector search. + */ + public Integer getMaxItemSizeForVectorSearch() { + return this.actualRequestOptions.getMaxItemSizeForVectorSearch(); + } + + /** + * Sets the maximum item size to fetch during non-streaming order by queries. + * + * @param maxItemSizeForVectorSearch the max number of items for vector search. + * @return the CosmosQueryRequestOptions. + */ + public CosmosQueryRequestOptions setMaxItemSizeForVectorSearch(Integer maxItemSizeForVectorSearch) { + this.actualRequestOptions.setMaxItemSizeForVectorSearch(maxItemSizeForVectorSearch); + return this; + } + /** * Gets the request continuation token. * diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosVectorDataType.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosVectorDataType.java new file mode 100644 index 000000000000..d8a4a63edbe2 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosVectorDataType.java @@ -0,0 +1,59 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.models; + +import com.fasterxml.jackson.annotation.JsonValue; + +import java.util.Arrays; + +/** + * Data types for the embeddings in Cosmos DB database service. + */ +public enum CosmosVectorDataType { + /** + * Represents a int8 data type. + */ + INT8("int8"), + + /** + * Represents a uint8 data type. + */ + UINT8("uint8"), + + /** + * Represents a float16 data type. + */ + FLOAT16("float16"), + + /** + * Represents a float32 data type. + */ + FLOAT32("float32"); + + private final String overWireValue; + + CosmosVectorDataType(String overWireValue) { + this.overWireValue = overWireValue; + } + + @JsonValue + @Override + public String toString() { + return this.overWireValue; + } + + /** + * Method to retrieve the enum constant by its overWireValue. + * @param value the overWire value of the enum constant + * @return the matching CosmosVectorDataType + * @throws IllegalArgumentException if no matching enum constant is found + */ + public static CosmosVectorDataType fromString(String value) { + return Arrays.stream(CosmosVectorDataType.values()) + .filter(vectorDataType -> vectorDataType.toString().equalsIgnoreCase(value)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException(String.format( + "Invalid vector data type with value {%s} for the vector embedding policy.", value))); + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosVectorDistanceFunction.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosVectorDistanceFunction.java new file mode 100644 index 000000000000..57f74fab3dc9 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosVectorDistanceFunction.java @@ -0,0 +1,54 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.models; + +import com.fasterxml.jackson.annotation.JsonValue; + +import java.util.Arrays; + +/** + * Distance Function for the embeddings in the Cosmos DB database service. + */ +public enum CosmosVectorDistanceFunction { + /** + * Represents the euclidean distance function. + */ + EUCLIDEAN("euclidean"), + + /** + * Represents the cosine distance function. + */ + COSINE("cosine"), + + /** + * Represents the dot product distance function. + */ + DOT_PRODUCT("dotproduct"); + + private final String overWireValue; + + CosmosVectorDistanceFunction(String overWireValue) { + this.overWireValue = overWireValue; + } + + @JsonValue + @Override + public String toString() { + return this.overWireValue; + } + + /** + * Method to retrieve the enum constant by its overWireValue. + * @param value the overWire value of the enum constant + * @return the matching CosmosVectorDataType + * @throws IllegalArgumentException if no matching enum constant is found + */ + public static CosmosVectorDistanceFunction fromString(String value) { + return Arrays.stream(CosmosVectorDistanceFunction.values()) + .filter(vectorDistanceFunction -> vectorDistanceFunction.toString().equalsIgnoreCase(value)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException(String.format( + "Invalid distance function with value {%s} for the vector embedding policy.", value ))); + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosVectorEmbedding.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosVectorEmbedding.java new file mode 100644 index 000000000000..0f111d34e00a --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosVectorEmbedding.java @@ -0,0 +1,128 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.models; + +import com.azure.cosmos.implementation.Constants; +import com.azure.cosmos.implementation.JsonSerializable; +import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; +import com.fasterxml.jackson.annotation.JsonProperty; +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; + +/** + * Embedding settings within {@link CosmosVectorEmbeddingPolicy} + */ +public final class CosmosVectorEmbedding { + @JsonProperty(Constants.Properties.PATH) + private String path; + @JsonProperty(Constants.Properties.VECTOR_DATA_TYPE) + private String dataType; + @JsonProperty(Constants.Properties.VECTOR_DIMENSIONS) + private Long dimensions; + @JsonProperty(Constants.Properties.DISTANCE_FUNCTION) + private String distanceFunction; + private JsonSerializable jsonSerializable; + + /** + * Constructor + */ + public CosmosVectorEmbedding() { + this.jsonSerializable = new JsonSerializable(); + } + + /** + * Gets the path for the cosmosVectorEmbedding. + * + * @return path + */ + public String getPath() { + return path; + } + + /** + * Sets the path for the cosmosVectorEmbedding. + * + * @param path the path for the cosmosVectorEmbedding + * @return CosmosVectorEmbedding + */ + public CosmosVectorEmbedding setPath(String path) { + if (StringUtils.isEmpty(path)) { + throw new NullPointerException("embedding path is either null or empty"); + } + + if (path.charAt(0) != '/' || path.lastIndexOf('/') != 0) { + throw new IllegalArgumentException(""); + } + + this.path = path; + return this; + } + + /** + * Gets the data type for the cosmosVectorEmbedding. + * + * @return dataType + */ + public CosmosVectorDataType getDataType() { + return CosmosVectorDataType.fromString(dataType); + } + + /** + * Sets the data type for the cosmosVectorEmbedding. + * + * @param dataType the data type for the cosmosVectorEmbedding + * @return CosmosVectorEmbedding + */ + public CosmosVectorEmbedding setDataType(CosmosVectorDataType dataType) { + checkNotNull(dataType, "cosmosVectorDataType cannot be null"); + this.dataType = dataType.toString(); + return this; + } + + /** + * Gets the dimensions for the cosmosVectorEmbedding. + * + * @return dimensions + */ + public Long getDimensions() { + return dimensions; + } + + /** + * Sets the dimensions for the cosmosVectorEmbedding. + * + * @param dimensions the dimensions for the cosmosVectorEmbedding + * @return CosmosVectorEmbedding + */ + public CosmosVectorEmbedding setDimensions(Long dimensions) { + checkNotNull(dimensions, "dimensions cannot be null"); + if (dimensions < 1) { + throw new IllegalArgumentException("Dimensions for the embedding has to be a long value greater than 0 " + + "for the vector embedding policy"); + } + + this.dimensions = dimensions; + return this; + } + + /** + * Gets the distanceFunction for the cosmosVectorEmbedding. + * + * @return distanceFunction + */ + public CosmosVectorDistanceFunction getDistanceFunction() { + return CosmosVectorDistanceFunction.fromString(distanceFunction); + } + + /** + * Sets the distanceFunction for the cosmosVectorEmbedding. + * + * @param distanceFunction the distanceFunction for the cosmosVectorEmbedding + * @return CosmosVectorEmbedding + */ + public CosmosVectorEmbedding setDistanceFunction(CosmosVectorDistanceFunction distanceFunction) { + checkNotNull(distanceFunction, "cosmosVectorDistanceFunction cannot be null"); + this.distanceFunction = distanceFunction.toString(); + return this; + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosVectorEmbeddingPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosVectorEmbeddingPolicy.java new file mode 100644 index 000000000000..6abcc0028723 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosVectorEmbeddingPolicy.java @@ -0,0 +1,54 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.models; + +import com.azure.cosmos.implementation.Constants; +import com.azure.cosmos.implementation.JsonSerializable; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; + +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; + +/** + * Vector Embedding Policy + */ +public final class CosmosVectorEmbeddingPolicy { + + private JsonSerializable jsonSerializable; + /** + * Paths for embeddings along with path-specific settings for the item. + */ + @JsonProperty(Constants.Properties.VECTOR_EMBEDDINGS) + private List cosmosVectorEmbeddings; + + /** + * Constructor + */ + public CosmosVectorEmbeddingPolicy() { + this.jsonSerializable = new JsonSerializable(); + } + + /** + * Gets the paths for embeddings along with path-specific settings for the item. + * + * @return the paths for embeddings along with path-specific settings for the item. + */ + public List getVectorEmbeddings() { + return this.cosmosVectorEmbeddings; + } + + /** + * Sets the paths for embeddings along with path-specific settings for the item. + * + * @param cosmosVectorEmbeddings paths for embeddings along with path-specific settings for the item. + */ + public void setCosmosVectorEmbeddings(List cosmosVectorEmbeddings) { + cosmosVectorEmbeddings.forEach(embedding -> { + checkNotNull(embedding, "Null values are not allowed in cosmosVectorEmbeddings list."); + }); + this.cosmosVectorEmbeddings = cosmosVectorEmbeddings; + } + +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosVectorIndexSpec.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosVectorIndexSpec.java new file mode 100644 index 000000000000..4ea617eea041 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosVectorIndexSpec.java @@ -0,0 +1,79 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.models; + +import com.azure.cosmos.CosmosItemSerializer; +import com.azure.cosmos.implementation.Constants; +import com.azure.cosmos.implementation.JsonSerializable; + +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; + +/** + * Vector Indexes spec for Azure CosmosDB service. + */ +public final class CosmosVectorIndexSpec { + + private final JsonSerializable jsonSerializable; + private String type; + + /** + * Constructor + */ + public CosmosVectorIndexSpec() { + this.jsonSerializable = new JsonSerializable(); + } + + /** + * Gets path. + * + * @return the path. + */ + public String getPath() { + return this.jsonSerializable.getString(Constants.Properties.PATH); + } + + /** + * Sets path. + * + * @param path the path. + * @return the SpatialSpec. + */ + public CosmosVectorIndexSpec setPath(String path) { + this.jsonSerializable.set(Constants.Properties.PATH, path, CosmosItemSerializer.DEFAULT_SERIALIZER); + return this; + } + + /** + * Gets the vector index type for the vector index + * + * @return the vector index type + */ + public String getType() { + if (this.type == null) { + this.type = this.jsonSerializable.getString(Constants.Properties.VECTOR_INDEX_TYPE); + } + return this.type; + } + + /** + * Sets the vector index type for the vector index + * + * @param type the vector index type + * @return the VectorIndexSpec + */ + public CosmosVectorIndexSpec setType(String type) { + checkNotNull(type, "cosmosVectorIndexType cannot be null"); + this.type = type; + this.jsonSerializable.set(Constants.Properties.VECTOR_INDEX_TYPE, this.type, CosmosItemSerializer.DEFAULT_SERIALIZER); + return this; + } + + void populatePropertyBag() { + this.jsonSerializable.populatePropertyBag(); + } + + JsonSerializable getJsonSerializable() { + return this.jsonSerializable; + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosVectorIndexType.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosVectorIndexType.java new file mode 100644 index 000000000000..679ea1f991c0 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosVectorIndexType.java @@ -0,0 +1,36 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.models; + +/** + * Defines the index type of vector index specification in the Azure Cosmos DB service. + */ +public enum CosmosVectorIndexType { + /** + * Represents a flat vector index type. + */ + FLAT("flat"), + + /** + * Represents a quantized flat vector index type. + */ + QUANTIZED_FLAT("quantizedFlat"), + + /** + * Represents a disk ANN vector index type. + */ + DISK_ANN("diskANN"); + + + private final String overWireValue; + + CosmosVectorIndexType(String overWireValue) { + this.overWireValue = overWireValue; + } + + @Override + public String toString() { + return this.overWireValue; + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/IndexingPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/IndexingPolicy.java index 4ee5153877f8..530557cad90f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/IndexingPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/IndexingPolicy.java @@ -25,8 +25,8 @@ public final class IndexingPolicy { private List excludedPaths; private List> compositeIndexes; private List spatialIndexes; - - private JsonSerializable jsonSerializable; + private List vectorIndexes; + private final JsonSerializable jsonSerializable; /** * Constructor. @@ -54,7 +54,7 @@ public IndexingPolicy() { * * * @param defaultIndexOverrides comma separated set of indexes that serve as default index specifications for the - * root path. + * root path. * @throws IllegalArgumentException throws when defaultIndexOverrides is null */ IndexingPolicy(Index[] defaultIndexOverrides) { @@ -235,7 +235,7 @@ public IndexingPolicy setCompositeIndexes(List> compositeInd } /** - * Sets the spatial indexes for additional indexes. + * Gets the spatial indexes for additional indexes. * * @return the spatial indexes. */ @@ -266,11 +266,55 @@ public IndexingPolicy setSpatialIndexes(List spatialIndexes) { return this; } + /** + * Gets the vector indexes. + * + * @return the vector indexes + */ + public List getVectorIndexes() { + if (this.vectorIndexes == null) { + this.vectorIndexes = this.jsonSerializable.getList(Constants.Properties.VECTOR_INDEXES, CosmosVectorIndexSpec.class); + + if (this.vectorIndexes == null) { + this.vectorIndexes = new ArrayList(); + } + } + + return this.vectorIndexes; + } + + /** + * Sets the vector indexes. + * + * Example of the vectorIndexes: + * "vectorIndexes": [ + * { + * "path": "/vector1", + * "type": "diskANN" + * }, + * { + * "path": "/vector1", + * "type": "flat" + * }, + * { + * "path": "/vector2", + * "type": "quantizedFlat" + * }] + * + * @param vectorIndexes the vector indexes + * @return the Indexing Policy. + */ + public IndexingPolicy setVectorIndexes(List vectorIndexes) { + this.vectorIndexes = vectorIndexes; + this.jsonSerializable.set(Constants.Properties.VECTOR_INDEXES,this.vectorIndexes, CosmosItemSerializer.DEFAULT_SERIALIZER); + return this; + } + void populatePropertyBag() { this.jsonSerializable.populatePropertyBag(); // If indexing mode is not 'none' and not paths are set, set them to the defaults if (this.getIndexingMode() != IndexingMode.NONE && this.getIncludedPaths().size() == 0 - && this.getExcludedPaths().size() == 0) { + && this.getExcludedPaths().size() == 0) { IncludedPath includedPath = new IncludedPath(IndexingPolicy.DEFAULT_PATH); this.getIncludedPaths().add(includedPath); } @@ -296,5 +340,7 @@ void populatePropertyBag() { } } - JsonSerializable getJsonSerializable() { return this.jsonSerializable; } + JsonSerializable getJsonSerializable() { + return this.jsonSerializable; + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/ModelBridgeInternal.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/ModelBridgeInternal.java index a8f344ce4e2d..e814cc16681b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/ModelBridgeInternal.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/ModelBridgeInternal.java @@ -435,6 +435,8 @@ public static void populatePropertyBag(T t) { ((PartitionKeyDefinition) t).populatePropertyBag(); } else if (t instanceof SpatialSpec) { ((SpatialSpec) t).populatePropertyBag(); + } else if (t instanceof CosmosVectorIndexSpec) { + ((CosmosVectorIndexSpec) t).populatePropertyBag(); } else if (t instanceof SqlParameter) { ((SqlParameter) t).populatePropertyBag(); } else if (t instanceof SqlQuerySpec) { @@ -468,6 +470,8 @@ public static JsonSerializable getJsonSerializable(T t) { return ((PartitionKeyDefinition) t).getJsonSerializable(); } else if (t instanceof SpatialSpec) { return ((SpatialSpec) t).getJsonSerializable(); + } else if (t instanceof CosmosVectorIndexSpec) { + return ((CosmosVectorIndexSpec) t).getJsonSerializable(); } else if (t instanceof SqlParameter) { return ((SqlParameter) t).getJsonSerializable(); } else if (t instanceof SqlQuerySpec) { @@ -540,6 +544,11 @@ public static Integer getMaxItemCountFromQueryRequestOptions(CosmosQueryRequestO return options.getMaxItemCount(); } + @Warning(value = INTERNAL_USE_ONLY_WARNING) + public static Integer getMaxItemSizeForVectorSearchFromQueryRequestOptions(CosmosQueryRequestOptions options) { + return options.getMaxItemSizeForVectorSearch(); + } + @Warning(value = INTERNAL_USE_ONLY_WARNING) public static String getRequestContinuationFromQueryRequestOptions(CosmosQueryRequestOptions options) { return options.getRequestContinuation(); diff --git a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md index 6d699b6e17bd..486ca9852bc7 100644 --- a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md @@ -8,6 +8,9 @@ ### Bugs Fixed +- Fixes the session message disposition to use management node as fall back. ([#39913](https://github.com/Azure/azure-sdk-for-java/issues/ 39913)) +- Fixes the session processor idle timeout to fall back to RetryOptions::tryTimeout. ([#39993](https://github.com/Azure/azure-sdk-for-java/issues/39993)) + ### Other Changes ## 7.17.0 (2024-05-06)