Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
2950e20
merge from main and resolve conflicts
Aug 14, 2024
7a1a1eb
remove async keyword from changeFeed query in aio package
Aug 18, 2024
b6c53fb
refactor
Aug 18, 2024
5f16b14
refactor
Aug 18, 2024
36990ef
fix pylint
Aug 20, 2024
3c569e8
added public surface methods
tvaron3 Aug 20, 2024
7479b0c
pylint fix
Aug 20, 2024
2e76620
fix
Aug 21, 2024
56bbb9e
added functionality for merging session tokens from logical pk
tvaron3 Aug 21, 2024
8c0aa46
fix mypy
Aug 21, 2024
28394b9
added tests for basic merge and split
tvaron3 Aug 21, 2024
25c3363
resolve comments
Aug 27, 2024
cecdfa5
resolve comments
Aug 28, 2024
65ed132
resolve comments
Aug 28, 2024
4bb30d2
resolve comments
Aug 28, 2024
5addcdc
fix pylint
Aug 29, 2024
59814d7
fix mypy
Aug 29, 2024
ec79b94
merge feed range changes
tvaron3 Aug 22, 2024
66c3f7b
fix tests
Sep 4, 2024
1e7a268
merged with feed range branch
tvaron3 Sep 4, 2024
997b6b0
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-python …
tvaron3 Sep 4, 2024
7eda72f
Merge branch 'main' into addFeedRangeSupportInChangeFeed
Sep 4, 2024
3a2e4e1
add tests
Sep 5, 2024
0883dac
fix pylint
Sep 5, 2024
b7d1210
Merge branch 'addFeedRangeSupportInChangeFeed' of https://github.com/…
tvaron3 Sep 5, 2024
195c47c
fix and resolve comments
Sep 6, 2024
246b1be
fix and resolve comments
Sep 6, 2024
10fe387
Added isSubsetFeedRange logic
tvaron3 Sep 9, 2024
6498311
Added request context to crud operations, session token helpers
tvaron3 Sep 11, 2024
5a13ddf
Merge branch 'addFeedRangeSupportInChangeFeed' of https://github.com/…
tvaron3 Sep 11, 2024
f5d0d7b
Merge branch 'main' into addFeedRangeSupportInChangeFeed
Sep 13, 2024
5cde59b
revert unnecessary change
Sep 13, 2024
a494346
Added more tests
tvaron3 Sep 20, 2024
0d75607
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-python …
tvaron3 Sep 20, 2024
c8c099f
Merge branch 'addFeedRangeSupportInChangeFeed' of https://github.com/…
tvaron3 Sep 20, 2024
ad3ae4f
Added more tests
tvaron3 Oct 5, 2024
8f466a1
merge with main
tvaron3 Oct 6, 2024
5249d0a
Changed tests to use new public feed range and more test coverage for…
tvaron3 Oct 6, 2024
40523f5
Added more tests
tvaron3 Oct 7, 2024
9f88b4e
Fix tests and add changelog
tvaron3 Oct 7, 2024
7c23e87
fix spell checks
tvaron3 Oct 7, 2024
4d0b058
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-python …
tvaron3 Oct 7, 2024
d7c598e
Added tests and pushed request context to client level
tvaron3 Oct 8, 2024
8698098
Added async methods and removed feed range from request context
tvaron3 Oct 8, 2024
c252d88
fix tests
tvaron3 Oct 9, 2024
51e721b
fix tests and pylint
tvaron3 Oct 9, 2024
923055b
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-python …
tvaron3 Oct 9, 2024
104e341
Reacting to comments
tvaron3 Oct 10, 2024
5552912
Reacting to comments
tvaron3 Oct 10, 2024
1bbbd0f
pylint and added hpk tests
tvaron3 Oct 10, 2024
a9299ab
reacting to comments
tvaron3 Oct 11, 2024
2155016
fix tests and mypy
tvaron3 Oct 11, 2024
0436355
fix mypy
tvaron3 Oct 11, 2024
103eb41
fix mypy
tvaron3 Oct 11, 2024
76451df
reacting to comments
tvaron3 Oct 15, 2024
7b0f4b7
reacting to comments
tvaron3 Oct 15, 2024
5d7b978
reacting to comments
tvaron3 Oct 15, 2024
d54992f
fix cspell
tvaron3 Oct 15, 2024
fa16830
rename method to get_latest_session_token
tvaron3 Oct 16, 2024
b2ac9d8
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-python …
tvaron3 Oct 17, 2024
6914a20
reacting to reverted feed range
tvaron3 Oct 17, 2024
ab9723a
change based on the api review
Oct 23, 2024
8a4305d
Reacting to API review and adding samples.
tvaron3 Oct 25, 2024
3a1f160
Reacting to API review and adding samples.
tvaron3 Oct 25, 2024
4bc16b1
Merge branch 'main' into tvaron3/sessionTokenHelper
tvaron3 Oct 25, 2024
900d001
Fixed pylint
tvaron3 Oct 25, 2024
96a165f
Merge branch 'tvaron3/sessionTokenHelper' of https://github.com/tvaro…
tvaron3 Oct 25, 2024
eab1822
Reacting to comments
tvaron3 Oct 28, 2024
97ffec7
Reacting to comments
tvaron3 Oct 28, 2024
2264465
Reacting to comments
tvaron3 Oct 29, 2024
35588fa
Reacting to comments
tvaron3 Oct 29, 2024
c42966f
Fix pydoc
tvaron3 Oct 30, 2024
786e357
Fix pydoc
tvaron3 Oct 31, 2024
0de21b4
reacting to comments
tvaron3 Oct 31, 2024
d32a6f1
reacting to comments
tvaron3 Oct 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
merge with main
  • Loading branch information
tvaron3 committed Oct 6, 2024
commit 8f466a1afffe53c454f032283cc0daa93317c36c
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
from azure.cosmos._change_feed.change_feed_start_from import ChangeFeedStartFromInternal, \
ChangeFeedStartFromETagAndFeedRange
from azure.cosmos._change_feed.composite_continuation_token import CompositeContinuationToken
from azure.cosmos._change_feed.feed_range import FeedRange, FeedRangeEpk, FeedRangePartitionKey
from azure.cosmos._change_feed.feed_range_internal import (FeedRangeInternal, FeedRangeInternalEpk,
FeedRangeInternalPartitionKey)
from azure.cosmos._change_feed.feed_range_composite_continuation_token import FeedRangeCompositeContinuation
from azure.cosmos._routing.aio.routing_map_provider import SmartRoutingMapProvider as AsyncSmartRoutingMapProvider
from azure.cosmos._routing.routing_map_provider import SmartRoutingMapProvider
Expand Down Expand Up @@ -79,7 +80,7 @@ def apply_server_response_continuation(self, continuation: str, has_modified_res
def from_json(
container_link: str,
container_rid: str,
change_feed_state_context: Dict[str, Any]):
change_feed_state_context: Dict[str, Any]) -> 'ChangeFeedState':

if (change_feed_state_context.get("partitionKeyRangeId")
or change_feed_state_context.get("continuationPkRangeId")):
Expand Down Expand Up @@ -184,7 +185,7 @@ def __init__(
self,
container_link: str,
container_rid: str,
feed_range: FeedRange,
feed_range: FeedRangeInternal,
change_feed_start_from: ChangeFeedStartFromInternal,
continuation: Optional[FeedRangeCompositeContinuation]
) -> None:
Expand Down Expand Up @@ -380,22 +381,20 @@ def from_initial_state(
collection_rid: str,
change_feed_state_context: Dict[str, Any]) -> 'ChangeFeedStateV2':

feed_range: Optional[FeedRange] = None
feed_range: Optional[FeedRangeInternal] = None
if change_feed_state_context.get("feedRange"):
feed_range_str = base64.b64decode(change_feed_state_context["feedRange"]).decode('utf-8')
feed_range_json = json.loads(feed_range_str)
feed_range = FeedRangeEpk(Range.ParseFromDict(feed_range_json))
feed_range = change_feed_state_context.get("feedRange")
elif change_feed_state_context.get("partitionKey"):
if change_feed_state_context.get("partitionKeyFeedRange"):
feed_range =\
FeedRangePartitionKey(
FeedRangeInternalPartitionKey(
change_feed_state_context["partitionKey"],
change_feed_state_context["partitionKeyFeedRange"])
else:
raise ValueError("partitionKey is in the changeFeedStateContext, but missing partitionKeyFeedRange")
else:
# default to full range
feed_range = FeedRangeEpk(
feed_range = FeedRangeInternalEpk(
Range(
"",
"FF",
Expand All @@ -405,9 +404,12 @@ def from_initial_state(

change_feed_start_from = (
ChangeFeedStartFromInternal.from_start_time(change_feed_state_context.get("startTime")))
return cls(
container_link=container_link,
container_rid=collection_rid,
feed_range=feed_range,
change_feed_start_from=change_feed_start_from,
continuation=None)

if feed_range is not None:
return cls(
container_link=container_link,
container_rid=collection_rid,
feed_range=feed_range,
change_feed_start_from=change_feed_start_from,
continuation=None)
raise RuntimeError("feed_range is empty")
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
from typing import Any, Deque, Dict, Optional

from azure.cosmos._change_feed.composite_continuation_token import CompositeContinuationToken
from azure.cosmos._change_feed.feed_range import FeedRange, FeedRangeEpk, FeedRangePartitionKey
from azure.cosmos._change_feed.feed_range_internal import (FeedRangeInternal, FeedRangeInternalEpk,
FeedRangeInternalPartitionKey)
from azure.cosmos._routing.routing_map_provider import SmartRoutingMapProvider
from azure.cosmos._routing.aio.routing_map_provider import SmartRoutingMapProvider as AsyncSmartRoutingMapProvider
from azure.cosmos._routing.routing_range import Range
Expand All @@ -39,7 +40,7 @@ class FeedRangeCompositeContinuation:
def __init__(
self,
container_rid: str,
feed_range: FeedRange,
feed_range: FeedRangeInternal,
continuation: Deque[CompositeContinuationToken]) -> None:
if container_rid is None:
raise ValueError("container_rid is missing")
Expand Down Expand Up @@ -87,11 +88,11 @@ def from_json(cls, data) -> 'FeedRangeCompositeContinuation':
for child_range_continuation_token in continuation_data]

# parsing feed range
feed_range: Optional[FeedRange] = None
if data.get(FeedRangeEpk.type_property_name):
feed_range = FeedRangeEpk.from_json(data)
elif data.get(FeedRangePartitionKey.type_property_name):
feed_range = FeedRangePartitionKey.from_json(data, continuation[0].feed_range)
feed_range: Optional[FeedRangeInternal] = None
if data.get(FeedRangeInternalEpk.type_property_name):
feed_range = FeedRangeInternalEpk.from_json(data)
elif data.get(FeedRangeInternalPartitionKey.type_property_name):
feed_range = FeedRangeInternalPartitionKey.from_json(data, continuation[0].feed_range)
else:
raise ValueError("Invalid feed range composite continuation token [Missing feed range scope]")

Expand Down Expand Up @@ -171,5 +172,5 @@ def apply_not_modified_response(self) -> None:
self._initial_no_result_range = self._current_token.feed_range

@property
def feed_range(self) -> FeedRange:
def feed_range(self) -> FeedRangeInternal:
return self._feed_range
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
from urllib3.util.retry import Retry

from azure.core import PipelineClient

from ._change_feed.feed_range import FeedRange
from ._session_token_helpers import is_compound_session_token, merge_session_tokens
from ._vector_session_token import VectorSessionToken
from azure.core.credentials import TokenCredential
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,8 @@
import base64
import binascii
import json
from typing import Dict, Any


def partition_key_range_to_range_string(partition_key_range: Dict[str, Any]) -> str:
return Range.PartitionKeyRangeToRange(partition_key_range).to_base64_encoded_string()

class PartitionKeyRange(object):
"""Partition Key Range Constants"""

Expand Down
40 changes: 34 additions & 6 deletions sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
GenerateGuidId,
_set_properties_cache
)
from .._routing.routing_range import Range, partition_key_range_to_range_string
from .._feed_range import FeedRange, FeedRangeEpk
from .._routing.routing_range import Range
from ..offer import ThroughputProperties
from ..partition_key import (
NonePartitionKeyValue,
Expand Down Expand Up @@ -532,15 +533,16 @@ def query_items_change_feed(
def query_items_change_feed(
self,
*,
feed_range: str,
feed_range: FeedRange,
max_item_count: Optional[int] = None,
start_time: Optional[Union[datetime, Literal["Now", "Beginning"]]] = None,
priority: Optional[Literal["High", "Low"]] = None,
**kwargs: Any
) -> AsyncItemPaged[Dict[str, Any]]:
"""Get a sorted list of items that were changed, in the order in which they were modified.

:keyword str feed_range: The feed range that is used to define the scope.
:keyword feed_range: The feed range that is used to define the scope.
:type feed_range: ~azure.cosmos.FeedRange
:keyword int max_item_count: Max number of items to be returned in the enumeration operation.
:keyword start_time: The start time to start processing chang feed items.
Beginning: Processing the change feed items from the beginning of the change feed.
Expand Down Expand Up @@ -614,6 +616,30 @@ def query_items_change_feed( # pylint: disable=unused-argument
*args: Any,
**kwargs: Any
) -> AsyncItemPaged[Dict[str, Any]]:

"""Get a sorted list of items that were changed, in the order in which they were modified.

:keyword str continuation: The continuation token retrieved from previous response.
:keyword feed_range: The feed range that is used to define the scope.
:type feed_range: ~azure.cosmos.FeedRange
:keyword partition_key: The partition key that is used to define the scope
(logical partition or a subset of a container)
:type partition_key: Union[str, int, float, bool, List[Union[str, int, float, bool]]]
:keyword int max_item_count: Max number of items to be returned in the enumeration operation.
:keyword start_time: The start time to start processing chang feed items.
Beginning: Processing the change feed items from the beginning of the change feed.
Now: Processing change feed from the current time, so only events for all future changes will be retrieved.
~datetime.datetime: processing change feed from a point of time. Provided value will be converted to UTC.
By default, it is start from current ("Now")
:type start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]]
:keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each
request. Once the user has reached their provisioned throughput, low priority requests are throttled
before high priority requests start getting throttled. Feature must first be enabled at the account level.
:keyword Callable response_hook: A callable invoked with the response metadata.
:param Any args: args
:returns: An Iterable of items (dicts).
:rtype: Iterable[Dict[str, Any]]
"""
# pylint: disable=too-many-statements
if kwargs.get("priority") is not None:
kwargs['priority'] = kwargs['priority']
Expand Down Expand Up @@ -665,7 +691,8 @@ def query_items_change_feed( # pylint: disable=unused-argument
self._get_epk_range_for_partition_key(kwargs.pop('partition_key'))

if kwargs.get("feed_range") is not None:
change_feed_state_context["feedRange"] = kwargs.pop('feed_range')
feed_range: FeedRangeEpk = kwargs.pop('feed_range')
change_feed_state_context["feedRange"] = feed_range._feed_range_internal

feed_options["containerProperties"] = self._get_properties()
feed_options["changeFeedStateContext"] = change_feed_state_context
Expand Down Expand Up @@ -1271,7 +1298,7 @@ async def read_feed_ranges(
*,
force_refresh: Optional[bool] = False,
**kwargs: Any
) -> List[str]:
) -> List[FeedRange]:
""" Obtains a list of feed ranges that can be used to parallelize feed operations.

:keyword bool force_refresh:
Expand All @@ -1290,4 +1317,5 @@ async def read_feed_ranges(
[Range("", "FF", True, False)],
**kwargs)

return [partition_key_range_to_range_string(partitionKeyRange) for partitionKeyRange in partition_key_ranges]
return [FeedRangeEpk(Range.PartitionKeyRangeToRange(partitionKeyRange))
for partitionKeyRange in partition_key_ranges]
49 changes: 39 additions & 10 deletions sdk/cosmos/azure-cosmos/azure/cosmos/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@
ParsePaths,
TrimBeginningAndEndingSlashes
)
from ._change_feed.feed_range import FeedRange, FeedRangeEpk
from ._cosmos_client_connection import CosmosClientConnection
from ._routing.routing_range import Range, partition_key_range_to_range_string, PartitionKeyRange
from ._feed_range import FeedRange, FeedRangeEpk
from ._routing.routing_range import Range
from .offer import Offer, ThroughputProperties
from .partition_key import (
NonePartitionKeyValue,
Expand Down Expand Up @@ -358,7 +358,7 @@ def query_items_change_feed(
def query_items_change_feed(
self,
*,
feed_range: str,
feed_range: FeedRange,
max_item_count: Optional[int] = None,
start_time: Optional[Union[datetime, Literal["Now", "Beginning"]]] = None,
priority: Optional[Literal["High", "Low"]] = None,
Expand All @@ -367,7 +367,8 @@ def query_items_change_feed(

"""Get a sorted list of items that were changed, in the order in which they were modified.

:keyword str feed_range: The feed range that is used to define the scope.
:keyword feed_range: The feed range that is used to define the scope.
:type feed_range: ~azure.cosmos.FeedRange
:keyword int max_item_count: Max number of items to be returned in the enumeration operation.
:keyword start_time: The start time to start processing chang feed items.
Beginning: Processing the change feed items from the beginning of the change feed.
Expand Down Expand Up @@ -440,6 +441,31 @@ def query_items_change_feed(
*args: Any,
**kwargs: Any
) -> ItemPaged[Dict[str, Any]]:

"""Get a sorted list of items that were changed, in the order in which they were modified.

:keyword str continuation: The continuation token retrieved from previous response.
:keyword feed_range: The feed range that is used to define the scope.
:type feed_range: ~azure.cosmos.FeedRange
:keyword partition_key: The partition key that is used to define the scope
(logical partition or a subset of a container)
:type partition_key: Union[str, int, float, bool, List[Union[str, int, float, bool]]]
:keyword int max_item_count: Max number of items to be returned in the enumeration operation.
:keyword start_time: The start time to start processing chang feed items.
Beginning: Processing the change feed items from the beginning of the change feed.
Now: Processing change feed from the current time, so only events for all future changes will be retrieved.
~datetime.datetime: processing change feed from a point of time. Provided value will be converted to UTC.
By default, it is start from current ("Now")
:type start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]]
:keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each
request. Once the user has reached their provisioned throughput, low priority requests are throttled
before high priority requests start getting throttled. Feature must first be enabled at the account level.
:keyword Callable response_hook: A callable invoked with the response metadata.
:param Any args: args
:returns: An Iterable of items (dicts).
:rtype: Iterable[Dict[str, Any]]
"""

# pylint: disable=too-many-statements
if kwargs.get("priority") is not None:
kwargs['priority'] = kwargs['priority']
Expand Down Expand Up @@ -508,6 +534,8 @@ def query_items_change_feed(

if kwargs.get("feed_range") is not None:
change_feed_state_context["feedRange"] = kwargs.pop('feed_range')
feed_range: FeedRangeEpk = kwargs.pop('feed_range')
change_feed_state_context["feedRange"] = feed_range._feed_range_internal

container_properties = self._get_properties()
feed_options["changeFeedStateContext"] = change_feed_state_context
Expand Down Expand Up @@ -736,7 +764,7 @@ def replace_item( # pylint:disable=docstring-missing-param
document_link=item_link, new_document=body, request_context=request_context, options=request_options, **kwargs
)
self._add_request_context(request_context)
return result
return result or {}

@distributed_trace
def upsert_item( # pylint:disable=docstring-missing-param
Expand Down Expand Up @@ -814,7 +842,7 @@ def upsert_item( # pylint:disable=docstring-missing-param
**kwargs
)
self._add_request_context(request_context)
return result
return result or {}

@distributed_trace
def create_item( # pylint:disable=docstring-missing-param
Expand Down Expand Up @@ -895,7 +923,7 @@ def create_item( # pylint:disable=docstring-missing-param
result = self.client_connection.CreateItem(
database_or_container_link=self.container_link, document=body, request_context=request_context, options=request_options, **kwargs)
self._add_request_context(request_context)
return result
return result or {}

@distributed_trace
def patch_item(
Expand Down Expand Up @@ -972,7 +1000,7 @@ def patch_item(
result = self.client_connection.PatchItem(
document_link=item_link, operations=patch_operations, request_context=request_context, options=request_options, **kwargs)
self._add_request_context(request_context)
return result
return result or {}

@distributed_trace
def execute_item_batch(
Expand Down Expand Up @@ -1361,7 +1389,7 @@ def read_feed_ranges(
self,
*,
force_refresh: Optional[bool] = False,
**kwargs: Any) -> List[str]:
**kwargs: Any) -> List[FeedRange]:

""" Obtains a list of feed ranges that can be used to parallelize feed operations.

Expand All @@ -1380,7 +1408,8 @@ def read_feed_ranges(
[Range("", "FF", True, False)], # default to full range
**kwargs)

return [partition_key_range_to_range_string(partitionKeyRange) for partitionKeyRange in partition_key_ranges]
return [FeedRangeEpk(Range.PartitionKeyRangeToRange(partitionKeyRange))
for partitionKeyRange in partition_key_ranges]

def get_updated_session_token(self,
feed_ranges_to_session_tokens: List,
Expand Down
3 changes: 2 additions & 1 deletion sdk/cosmos/azure-cosmos/test/test_vector_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ def test_fail_replace_vector_indexing_policy(self):
pytest.fail("Container replace should have failed for indexing policy.")
except exceptions.CosmosHttpResponseError as e:
assert e.status_code == 400
assert "vector indexing policy cannot be modified in Collection Replace" in e.http_error_message
assert "Paths in existing vector indexing policy cannot be modified in Collection Replace." \
" They can only be added or removed." in e.http_error_message
self.test_db.delete_container(container_id)

def test_fail_create_vector_embedding_policy(self):
Expand Down
3 changes: 2 additions & 1 deletion sdk/cosmos/azure-cosmos/test/test_vector_policy_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ async def test_fail_replace_vector_indexing_policy_async(self):
pytest.fail("Container replace should have failed for indexing policy.")
except exceptions.CosmosHttpResponseError as e:
assert e.status_code == 400
assert "vector indexing policy cannot be modified in Collection Replace" in e.http_error_message
assert "Paths in existing vector indexing policy cannot be modified in Collection Replace." \
" They can only be added or removed." in e.http_error_message
await self.test_db.delete_container(container_id)

async def test_fail_create_vector_embedding_policy_async(self):
Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.