Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
991a274
sync changes and sample for vector search control plane
simorenoh Mar 21, 2024
3a3a652
Update index_management.py
simorenoh Mar 21, 2024
20f533c
Update index_management.py
simorenoh Mar 21, 2024
09f33b7
async and samples
simorenoh Mar 28, 2024
8e527fd
sync and async tests
simorenoh Mar 28, 2024
7c44137
Update CHANGELOG.md
simorenoh Mar 28, 2024
7eb5439
developed typehints
simorenoh Mar 28, 2024
c428476
skip tests
simorenoh Mar 29, 2024
58000fd
create_if_not_exists, README
simorenoh Apr 2, 2024
4c4b1ab
Update README.md
simorenoh Apr 2, 2024
0e6b24f
add provisional, add dimension limit
simorenoh Apr 3, 2024
b42f3cb
Merge branch 'main' into vector-search-query
simorenoh Apr 16, 2024
fef391d
adds sync changes, adds changelog
simorenoh May 3, 2024
8583dbf
async changes
simorenoh May 3, 2024
158f60f
some comments addressed
simorenoh May 3, 2024
c880436
Update CHANGELOG.md
simorenoh May 3, 2024
a414f05
bug fix on ordering
simorenoh May 8, 2024
d217210
ordering bug fix
simorenoh May 8, 2024
8869ea4
fix datetime
simorenoh May 8, 2024
0c6d8eb
samples added
simorenoh May 8, 2024
30b0645
small fixes
simorenoh May 9, 2024
5056d89
fix some additional PQ logic
simorenoh May 9, 2024
358deae
last bit of pq fixes
simorenoh May 9, 2024
617c709
Update non_streaming_order_by_aggregator.py
simorenoh May 9, 2024
73e3709
memory optimization
simorenoh May 10, 2024
6bb8090
Update sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/do…
simorenoh May 10, 2024
326b155
Merge branch 'main' into vector-search-query
simorenoh May 10, 2024
540a645
addressing comments
simorenoh May 10, 2024
98a4fc9
test name fix, improve readme/ samples
simorenoh May 10, 2024
d487519
add sync tests, improve readme
simorenoh May 10, 2024
abd2bc0
async tests
simorenoh May 10, 2024
a0547b1
pylint
simorenoh May 10, 2024
07acb93
remove print
simorenoh May 10, 2024
7cd5b92
pylint
simorenoh May 10, 2024
5834b29
adds env variable
simorenoh May 10, 2024
f615f3e
adds JS tests
simorenoh May 13, 2024
0081bbe
error logic improvements
simorenoh May 13, 2024
674f483
readme updates
simorenoh May 13, 2024
0e26bf6
more fixes to logic
simorenoh May 13, 2024
a65eb0a
oops
simorenoh May 13, 2024
6563bc3
memory optimization
simorenoh May 13, 2024
9935dc1
Update sdk/cosmos/azure-cosmos/README.md
simorenoh May 13, 2024
ad36a9c
update variable for naming conventions
simorenoh May 13, 2024
86b78b7
remove/ comment out diskANN
simorenoh May 13, 2024
3cff42f
offset + limit fix, tests fixes
simorenoh May 14, 2024
dd187dd
add capabilities env var flag
simorenoh May 14, 2024
d2fbb1b
use feature flag for existing query tests
simorenoh May 14, 2024
fe7742a
disable emulator for query tests
simorenoh May 14, 2024
7cd4d9d
missed some tests
simorenoh May 14, 2024
b3876c6
Update test_aggregate.py
simorenoh May 14, 2024
d8bc50d
Update test-resources.bicep
simorenoh May 15, 2024
1e699e4
forgot tests were being skipped
simorenoh May 15, 2024
e79839b
Update sdk/cosmos/azure-cosmos/test/test_vector_policy.py
Pilchie May 15, 2024
16860dc
Update sdk/cosmos/azure-cosmos/test/test_vector_policy_async.py
Pilchie May 15, 2024
1431e9e
test fixes
simorenoh May 15, 2024
28bef5b
Merge branch 'vector-search-query' of https://github.com/simorenoh/az…
simorenoh May 15, 2024
8701b80
Update README.md
simorenoh May 15, 2024
58af1bb
create separate db for vectors
simorenoh May 15, 2024
9bfdf57
tests
simorenoh May 15, 2024
45e5b6d
tests
simorenoh May 15, 2024
c4a7c60
more tests
simorenoh May 15, 2024
b6dbe45
small bit
simorenoh May 15, 2024
fca1294
final fixes hopefully
simorenoh May 15, 2024
445ba94
raise time limit on test so it doesnt fail
simorenoh May 15, 2024
f64775d
Update test_query_vector_similarity_async.py
simorenoh May 15, 2024
ae9524d
add date for release prep
simorenoh May 15, 2024
e616c4a
Merge branch 'main' into vector-search-query
simorenoh May 15, 2024
8ad2591
Update CHANGELOG.md
simorenoh May 15, 2024
fd10e89
Merge branch 'main' into vector-search-query
simorenoh May 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
async changes
  • Loading branch information
simorenoh committed May 3, 2024
commit 8583dbfb2acaddcfba94ba5506ce645075b1deee
Original file line number Diff line number Diff line change
Expand Up @@ -271,3 +271,59 @@ def _validate_orderby_items(self, res1, res2):
type2 = _OrderByHelper.getTypeStr(elt2)
if type1 != type2:
raise ValueError("Expected {}, but got {}.".format(type1, type2))

class _NonStreamingDocumentProducer(object):
"""This class takes care of handling of the items to be sorted in a non-streaming context.
One instance of this document producer goes attached to every item coming in for the priority queue to be able
to properly sort items as they get inserted.
"""

def __init__(self, item_result, sort_order):
"""
Constructor
"""
self._item_result = item_result
self._doc_producer_comp = _NonStreamingOrderByComparator(sort_order)



class _NonStreamingOrderByComparator(object):
"""Provide a Comparator for item results which respects orderby sort order.
"""

def __init__(self, sort_order): # pylint: disable=super-init-not-called
"""Instantiates this class
:param list sort_order:
List of sort orders (i.e., Ascending, Descending)
:ivar list sort_order:
List of sort orders (i.e., Ascending, Descending)
"""
self._sort_order = sort_order

async def compare(self, doc_producer1, doc_producer2):
"""Compares the given two instances of DocumentProducers.
Based on the orderby query items and whether the sort order is Ascending
or Descending compares the peek result of the two DocumentProducers.
:param _DocumentProducer doc_producer1: first instance to be compared
:param _DocumentProducer doc_producer2: second instance to be compared
:return:
Integer value of compare result.
positive integer if doc_producers1 > doc_producers2
negative integer if doc_producers1 < doc_producers2
:rtype: int
"""
order1 = doc_producer1._item_result["orderByItems"][0]
order2 = doc_producer2._item_result["orderByItems"][0]
type1_ord = _OrderByHelper.getTypeOrd(order1)
type2_ord = _OrderByHelper.getTypeOrd(order2)

type_ord_diff = type1_ord - type2_ord

if type_ord_diff:
return type_ord_diff

# the same type,
if type1_ord == 0:
return 0

return _compare_helper(order1['item'], order2['item'])
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,15 @@ class _QueryExecutionOrderByEndpointComponent(_QueryExecutionEndpointComponent):
"""
async def __anext__(self):
payload = await self._execution_context.__anext__()
return payload["payload"]
return payload._item_result["payload"]

class _QueryExecutionNonStreamingEndpointComponent(_QueryExecutionEndpointComponent):
"""Represents an endpoint in handling a non-streaming order by query results.
For each processed orderby result it returns the item result.
"""
async def __anext__(self):
payload = await self._execution_context.__anext__()
return payload._item_result["payload"]

class _QueryExecutionTopEndpointComponent(_QueryExecutionEndpointComponent):
"""Represents an endpoint in handling top query.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
Cosmos database service.
"""

from azure.cosmos._execution_context.aio import endpoint_component
from azure.cosmos._execution_context.aio import multi_execution_aggregator
from azure.cosmos._execution_context.aio import endpoint_component, multi_execution_aggregator
from azure.cosmos._execution_context.aio import non_streaming_order_by_aggregator
from azure.cosmos._execution_context.aio.base_execution_context import _QueryExecutionContextBase
from azure.cosmos._execution_context.aio.base_execution_context import _DefaultQueryExecutionContext
from azure.cosmos._execution_context.execution_dispatcher import _is_partitioned_execution_info
Expand Down Expand Up @@ -106,13 +106,28 @@ async def _create_pipelined_execution_context(self, query_execution_info):
and self._options["enableCrossPartitionQuery"]):
raise CosmosHttpResponseError(StatusCodes.BAD_REQUEST,
"Cross partition query only supports 'VALUE <AggregateFunc>' for aggregates")

execution_context_aggregator = multi_execution_aggregator._MultiExecutionContextAggregator(self._client,
# throw exception here for vector search query without limit filter
if query_execution_info.get_has_non_streaming_order_by():
if query_execution_info.get_top() is None and query_execution_info.get_limit() is None:
# TODO: missing one last if statement here to check for the system variable bypass - need name
raise CosmosHttpResponseError(StatusCodes.BAD_REQUEST,
"Executing a vector search query without TOP or LIMIT can consume many" +
" RUs very fast and have long runtimes. Please ensure you are using one" +
" of the two filters with your vector search query.")
execution_context_aggregator =\
non_streaming_order_by_aggregator._NonStreamingOrderByContextAggregator(self._client,
self._resource_link,
self._query,
self._options,
query_execution_info)
await execution_context_aggregator._configure_partition_ranges()
else:
execution_context_aggregator = multi_execution_aggregator._MultiExecutionContextAggregator(self._client,
self._resource_link,
self._query,
self._options,
query_execution_info)
await execution_context_aggregator._configure_partition_ranges()
await execution_context_aggregator._configure_partition_ranges()
return _PipelineExecutionContext(self._client, self._options, execution_context_aggregator,
query_execution_info)

Expand All @@ -134,7 +149,9 @@ def __init__(self, client, options, execution_context, query_execution_info):
self._endpoint = endpoint_component._QueryExecutionEndpointComponent(execution_context)

order_by = query_execution_info.get_order_by()
if order_by:
if query_execution_info.get_has_non_streaming_order_by():
self._endpoint = endpoint_component._QueryExecutionNonStreamingEndpointComponent(self._endpoint)
elif order_by:
self._endpoint = endpoint_component._QueryExecutionOrderByEndpointComponent(self._endpoint)

aggregates = query_execution_info.get_aggregates()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
# The MIT License (MIT)
# Copyright (c) 2024 Microsoft Corporation

"""Internal class for multi execution context aggregator implementation in the Azure Cosmos database service.
"""

from azure.cosmos._execution_context.aio.base_execution_context import _QueryExecutionContextBase
from azure.cosmos._execution_context.aio import document_producer, _queue_async_helper
from azure.cosmos._routing import routing_range
from azure.cosmos import exceptions

# pylint: disable=protected-access


class FixedSizePriorityQueue:
"""Provides a Fixed Size Priority Queue abstraction data structure"""

def __init__(self, max_size):
self._heap = []
self.max_size = max_size

async def pop_async(self, document_producer_comparator):
return await _queue_async_helper.heap_pop(self._heap, document_producer_comparator)

async def push_async(self, item, document_producer_comparator):
await _queue_async_helper.heap_push(self._heap, item, document_producer_comparator)
if len(self._heap) > self.max_size:
await _queue_async_helper.heap_pop(self._heap, document_producer_comparator)

def peek(self):
return self._heap[0]

def size(self):
return len(self._heap)

class _NonStreamingOrderByContextAggregator(_QueryExecutionContextBase):
"""This class is a subclass of the query execution context base and serves for
non-streaming order by queries. It is very similar to the existing MultiExecutionContextAggregator,
but is needed since we're dealing with items and not document producers.
This class builds upon the multi-execution aggregator, building a document producer per partition
and draining their results entirely in order to create the result set relevant to the filters passed
by the user.
"""

def __init__(self, client, resource_link, query, options, partitioned_query_ex_info):
super(_NonStreamingOrderByContextAggregator, self).__init__(client, options)

# use the routing provider in the client
self._routing_provider = client._routing_map_provider
self._client = client
self._resource_link = resource_link
self._query = query
self._partitioned_query_ex_info = partitioned_query_ex_info
self._sort_orders = partitioned_query_ex_info.get_order_by()

pq_size = partitioned_query_ex_info.get_top() or partitioned_query_ex_info.get_limit()
self._orderByPQ = FixedSizePriorityQueue(pq_size)

async def __anext__(self):
"""Returns the next result
:return: The next result.
:rtype: dict
:raises StopIteration: If no more result is left.
"""
if self._orderByPQ.size() > 0:
res = await self._orderByPQ.pop_async(self._document_producer_comparator)
return res
raise StopAsyncIteration

async def fetch_next_block(self):

raise NotImplementedError("You should use pipeline's fetch_next_block.")

async def _repair_document_producer(self):
"""Repairs the document producer context by using the re-initialized routing map provider in the client,
which loads in a refreshed partition key range cache to re-create the partition key ranges.
After loading this new cache, the document producers get re-created with the new valid ranges.
"""
# refresh the routing provider to get the newly initialized one post-refresh
self._routing_provider = self._client._routing_map_provider
# will be a list of (partition_min, partition_max) tuples
targetPartitionRanges = await self._get_target_partition_key_range()

targetPartitionQueryExecutionContextList = []
for partitionTargetRange in targetPartitionRanges:
# create and add the child execution context for the target range
targetPartitionQueryExecutionContextList.append(
self._createTargetPartitionQueryExecutionContext(partitionTargetRange)
)

self._doc_producers = []
for targetQueryExContext in targetPartitionQueryExecutionContextList:
try:
await targetQueryExContext.peek()
# if there are matching results in the target ex range add it to the priority queue
self._doc_producers.append(targetQueryExContext)

except StopAsyncIteration:
continue

def _createTargetPartitionQueryExecutionContext(self, partition_key_target_range):

rewritten_query = self._partitioned_query_ex_info.get_rewritten_query()
if rewritten_query:
if isinstance(self._query, dict):
# this is a parameterized query, collect all the parameters
query = dict(self._query)
query["query"] = rewritten_query
else:
query = rewritten_query
else:
query = self._query

return document_producer._DocumentProducer(
partition_key_target_range,
self._client,
self._resource_link,
query,
self._document_producer_comparator,
self._options,
)

async def _get_target_partition_key_range(self):

query_ranges = self._partitioned_query_ex_info.get_query_ranges()
return await self._routing_provider.get_overlapping_ranges(
self._resource_link, [routing_range.Range.ParseFromDict(range_as_dict) for range_as_dict in query_ranges]
)

async def _configure_partition_ranges(self):
# will be a list of (partition_min, partition_max) tuples
targetPartitionRanges = await self._get_target_partition_key_range()

self._document_producer_comparator = document_producer._NonStreamingOrderByComparator(self._sort_orders)

targetPartitionQueryExecutionContextList = []
for partitionTargetRange in targetPartitionRanges:
# create and add the child execution context for the target range
targetPartitionQueryExecutionContextList.append(
self._createTargetPartitionQueryExecutionContext(partitionTargetRange)
)

self._doc_producers = []
for targetQueryExContext in targetPartitionQueryExecutionContextList:
try:
await targetQueryExContext.peek()
self._doc_producers.append(targetQueryExContext)
except exceptions.CosmosHttpResponseError as e:
if exceptions._partition_range_is_gone(e):
# repairing document producer context on partition split
await self._repair_document_producer()
else:
raise

except StopAsyncIteration:
continue

pq_size = self._partitioned_query_ex_info.get_top() or self._partitioned_query_ex_info.get_limit()
self._orderByPQ = FixedSizePriorityQueue(pq_size)
for doc_producer in self._doc_producers:
while True:
try:
result = await doc_producer.peek()
item_result = document_producer._NonStreamingDocumentProducer(result, self._sort_orders)
await self._orderByPQ.push_async(item_result, self._document_producer_comparator)
await doc_producer.__anext__()
except StopAsyncIteration:
break
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,18 @@ def compare(self, doc_producer1, doc_producer2):
negative integer if doc_producers1 < doc_producers2
:rtype: int
"""
# TODO: this is not fully safe - doesn't deal with scenario of having orderByItems of [{}]
rank1 = doc_producer1._item_result["orderByItems"][0]['item']
rank2 = doc_producer2._item_result["orderByItems"][0]['item']
return _compare_helper(rank1, rank2)
order1 = doc_producer1._item_result["orderByItems"][0]
order2 = doc_producer2._item_result["orderByItems"][0]
type1_ord = _OrderByHelper.getTypeOrd(order1)
type2_ord = _OrderByHelper.getTypeOrd(order2)

type_ord_diff = type1_ord - type2_ord

if type_ord_diff:
return type_ord_diff

# the same type,
if type1_ord == 0:
return 0

return _compare_helper(order1['item'], order2['item'])
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ def _repair_document_producer(self):
self._doc_producers = []
for targetQueryExContext in targetPartitionQueryExecutionContextList:
try:
# TODO: we can also use more_itertools.peekable to be more python friendly
targetQueryExContext.peek()
# if there are matching results in the target ex range add it to the priority queue
self._doc_producers.append(targetQueryExContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3112,7 +3112,8 @@ async def _GetQueryPlanThroughGateway(self, query: str, resource_link: str, **kw
documents._QueryFeature.MultipleOrderBy + "," +
documents._QueryFeature.OffsetAndLimit + "," +
documents._QueryFeature.OrderBy + "," +
documents._QueryFeature.Top)
documents._QueryFeature.Top + "," +
documents._QueryFeature.NonStreamingOrderBy)

options = {
"contentType": runtime_constants.MediaTypes.Json,
Expand Down