From cdb6d527eb335c975f45ce3291c933f7bc10224b Mon Sep 17 00:00:00 2001 From: annie-mac Date: Tue, 17 Sep 2024 10:07:18 -0700 Subject: [PATCH 1/6] expose feedRange as a class type --- .../azure-cosmos/azure/cosmos/__init__.py | 2 + .../cosmos/_change_feed/change_feed_state.py | 16 ++- ...feed_range_composite_continuation_token.py | 16 +-- .../{feed_range.py => feed_range_internal.py} | 15 +-- .../azure-cosmos/azure/cosmos/_feed_range.py | 97 +++++++++++++++++++ .../azure/cosmos/_routing/routing_range.py | 4 - .../azure/cosmos/aio/_container.py | 15 +-- .../azure-cosmos/azure/cosmos/container.py | 15 +-- 8 files changed, 140 insertions(+), 40 deletions(-) rename sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/{feed_range.py => feed_range_internal.py} (88%) create mode 100644 sdk/cosmos/azure-cosmos/azure/cosmos/_feed_range.py diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/__init__.py b/sdk/cosmos/azure-cosmos/azure/cosmos/__init__.py index 6565ebed8c89..b1e3d8bf2a30 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/__init__.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/__init__.py @@ -42,6 +42,7 @@ ) from .partition_key import PartitionKey from .permission import Permission +from ._feed_range import FeedRange __all__ = ( "CosmosClient", @@ -64,5 +65,6 @@ "TriggerType", "ConnectionRetryPolicy", "ThroughputProperties", + "FeedRange" ) __version__ = VERSION diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/change_feed_state.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/change_feed_state.py index 46dd1afddcfe..cefc5b5b2c3a 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/change_feed_state.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/change_feed_state.py @@ -34,7 +34,7 @@ from azure.cosmos._change_feed.change_feed_start_from import ChangeFeedStartFromInternal, \ ChangeFeedStartFromETagAndFeedRange from azure.cosmos._change_feed.composite_continuation_token import CompositeContinuationToken -from azure.cosmos._change_feed.feed_range import FeedRange, FeedRangeEpk, FeedRangePartitionKey +from azure.cosmos._change_feed.feed_range_internal import FeedRangeInternal, FeedRangeInternalEpk, FeedRangeInternalPartitionKey from azure.cosmos._change_feed.feed_range_composite_continuation_token import FeedRangeCompositeContinuation from azure.cosmos._routing.aio.routing_map_provider import SmartRoutingMapProvider as AsyncSmartRoutingMapProvider from azure.cosmos._routing.routing_map_provider import SmartRoutingMapProvider @@ -79,7 +79,7 @@ def apply_server_response_continuation(self, continuation: str, has_modified_res def from_json( container_link: str, container_rid: str, - change_feed_state_context: Dict[str, Any]): + change_feed_state_context: Dict[str, Any]) -> 'ChangeFeedState': if (change_feed_state_context.get("partitionKeyRangeId") or change_feed_state_context.get("continuationPkRangeId")): @@ -184,7 +184,7 @@ def __init__( self, container_link: str, container_rid: str, - feed_range: FeedRange, + feed_range: FeedRangeInternal, change_feed_start_from: ChangeFeedStartFromInternal, continuation: Optional[FeedRangeCompositeContinuation] ) -> None: @@ -380,22 +380,20 @@ def from_initial_state( collection_rid: str, change_feed_state_context: Dict[str, Any]) -> 'ChangeFeedStateV2': - feed_range: Optional[FeedRange] = None + feed_range: Optional[FeedRangeInternal] = None if change_feed_state_context.get("feedRange"): - feed_range_str = base64.b64decode(change_feed_state_context["feedRange"]).decode('utf-8') - feed_range_json = json.loads(feed_range_str) - feed_range = FeedRangeEpk(Range.ParseFromDict(feed_range_json)) + feed_range = change_feed_state_context.get("feedRange") elif change_feed_state_context.get("partitionKey"): if change_feed_state_context.get("partitionKeyFeedRange"): feed_range =\ - FeedRangePartitionKey( + FeedRangeInternalPartitionKey( change_feed_state_context["partitionKey"], change_feed_state_context["partitionKeyFeedRange"]) else: raise ValueError("partitionKey is in the changeFeedStateContext, but missing partitionKeyFeedRange") else: # default to full range - feed_range = FeedRangeEpk( + feed_range = FeedRangeInternalEpk( Range( "", "FF", diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/feed_range_composite_continuation_token.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/feed_range_composite_continuation_token.py index f5967b6bf34b..7d40f51f7994 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/feed_range_composite_continuation_token.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/feed_range_composite_continuation_token.py @@ -26,7 +26,7 @@ from typing import Any, Deque, Dict, Optional from azure.cosmos._change_feed.composite_continuation_token import CompositeContinuationToken -from azure.cosmos._change_feed.feed_range import FeedRange, FeedRangeEpk, FeedRangePartitionKey +from azure.cosmos._change_feed.feed_range_internal import FeedRangeInternal, FeedRangeInternalEpk, FeedRangeInternalPartitionKey from azure.cosmos._routing.routing_map_provider import SmartRoutingMapProvider from azure.cosmos._routing.aio.routing_map_provider import SmartRoutingMapProvider as AsyncSmartRoutingMapProvider from azure.cosmos._routing.routing_range import Range @@ -39,7 +39,7 @@ class FeedRangeCompositeContinuation: def __init__( self, container_rid: str, - feed_range: FeedRange, + feed_range: FeedRangeInternal, continuation: Deque[CompositeContinuationToken]) -> None: if container_rid is None: raise ValueError("container_rid is missing") @@ -87,11 +87,11 @@ def from_json(cls, data) -> 'FeedRangeCompositeContinuation': for child_range_continuation_token in continuation_data] # parsing feed range - feed_range: Optional[FeedRange] = None - if data.get(FeedRangeEpk.type_property_name): - feed_range = FeedRangeEpk.from_json(data) - elif data.get(FeedRangePartitionKey.type_property_name): - feed_range = FeedRangePartitionKey.from_json(data, continuation[0].feed_range) + feed_range: Optional[FeedRangeInternal] = None + if data.get(FeedRangeInternalEpk.type_property_name): + feed_range = FeedRangeInternalEpk.from_json(data) + elif data.get(FeedRangeInternalPartitionKey.type_property_name): + feed_range = FeedRangeInternalPartitionKey.from_json(data, continuation[0].feed_range) else: raise ValueError("Invalid feed range composite continuation token [Missing feed range scope]") @@ -171,5 +171,5 @@ def apply_not_modified_response(self) -> None: self._initial_no_result_range = self._current_token.feed_range @property - def feed_range(self) -> FeedRange: + def feed_range(self) -> FeedRangeInternal: return self._feed_range diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/feed_range.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/feed_range_internal.py similarity index 88% rename from sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/feed_range.py rename to sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/feed_range_internal.py index b4f731f2c2ef..b1940a031c2d 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/feed_range.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/feed_range_internal.py @@ -29,7 +29,7 @@ from azure.cosmos.partition_key import _Undefined, _Empty -class FeedRange(ABC): +class FeedRangeInternal(ABC): @abstractmethod def get_normalized_range(self) -> Range: @@ -39,7 +39,7 @@ def get_normalized_range(self) -> Range: def to_dict(self) -> Dict[str, Any]: pass -class FeedRangePartitionKey(FeedRange): +class FeedRangeInternalPartitionKey(FeedRangeInternal): type_property_name = "PK" def __init__( @@ -69,7 +69,7 @@ def to_dict(self) -> Dict[str, Any]: return { self.type_property_name: self._pk_value } @classmethod - def from_json(cls, data: Dict[str, Any], feed_range: Range) -> 'FeedRangePartitionKey': + def from_json(cls, data: Dict[str, Any], feed_range: Range) -> 'FeedRangeInternalPartitionKey': if data.get(cls.type_property_name): pk_value = data.get(cls.type_property_name) if not pk_value: @@ -80,11 +80,11 @@ def from_json(cls, data: Dict[str, Any], feed_range: Range) -> 'FeedRangePartiti return cls(list(pk_value), feed_range) return cls(data[cls.type_property_name], feed_range) - raise ValueError(f"Can not parse FeedRangePartitionKey from the json," + raise ValueError(f"Can not parse FeedRangeInternalPartitionKey from the json," f" there is no property {cls.type_property_name}") -class FeedRangeEpk(FeedRange): +class FeedRangeInternalEpk(FeedRangeInternal): type_property_name = "Range" def __init__(self, feed_range: Range) -> None: @@ -102,8 +102,9 @@ def to_dict(self) -> Dict[str, Any]: } @classmethod - def from_json(cls, data: Dict[str, Any]) -> 'FeedRangeEpk': + def from_json(cls, data: Dict[str, Any]) -> 'FeedRangeInternalEpk': if data.get(cls.type_property_name): feed_range = Range.ParseFromDict(data.get(cls.type_property_name)) return cls(feed_range) - raise ValueError(f"Can not parse FeedRangeEPK from the json, there is no property {cls.type_property_name}") + raise ValueError(f"Can not parse FeedRangeInternalEPK from the json," + f" there is no property {cls.type_property_name}") diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_feed_range.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_feed_range.py new file mode 100644 index 000000000000..5d80a45c13aa --- /dev/null +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_feed_range.py @@ -0,0 +1,97 @@ + +# The MIT License (MIT) +# Copyright (c) 2014 Microsoft Corporation + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import base64 +import json +from abc import ABC, abstractmethod +from typing import Any, Dict + +from azure.cosmos._change_feed.feed_range_internal import FeedRangeInternal, FeedRangeInternalEpk +from azure.cosmos._routing.routing_range import Range + + +class FeedRange(ABC): + """Represents a single feed range in an Azure Cosmos DB SQL API container. """ + + def to_string(self) -> str: + """Get a json representation of the feed range. + The returned json string can be used to create a new feed range from it. + :return: A json representation of the feed range. + :rtype: str + """ + return self._to_base64_encoded_string() + + @staticmethod + def from_string(json_str: str) -> 'FeedRange': + """ + Create a feed range from previously obtained string representation. + + :param json_str: A string representation of a feed range. + :return: A feed range. + :rtype: ~azure.cosmos.FeedRange + """ + feed_range_json_str = base64.b64decode(json_str).decode('utf-8') + feed_range_json = json.loads(feed_range_json_str) + if feed_range_json.get(FeedRangeEpk.type_property_name): + return FeedRangeEpk._from_json(feed_range_json) + else: + raise ValueError("Invalid feed range base64 encoded string [Wrong feed range type]") + + @abstractmethod + def _to_dict(self) -> Dict[str, Any]: + pass + + @abstractmethod + def _to_feed_range_internal(self) -> 'FeedRangeInternal': + pass + + def _to_base64_encoded_string(self) -> str: + data_json = json.dumps(self._to_dict()) + json_bytes = data_json.encode('utf-8') + # Encode the bytes to a Base64 string + base64_bytes = base64.b64encode(json_bytes) + # Convert the Base64 bytes to a string + return base64_bytes.decode('utf-8') + +class FeedRangeEpk (FeedRange): + type_property_name = "Range" + + def __init__(self, feed_range: Range) -> None: + if feed_range is None: + raise ValueError("feed_range cannot be None") + + self._feed_range = feed_range + + def _to_dict(self) -> Dict[str, Any]: + return { + self.type_property_name: self._feed_range.to_dict() + } + + def _to_feed_range_internal(self) -> 'FeedRangeInternal': + return FeedRangeInternalEpk(self._feed_range) + + @classmethod + def _from_json(cls, data: Dict[str, Any]) -> 'FeedRange': + if data.get(cls.type_property_name): + feed_range = Range.ParseFromDict(data.get(cls.type_property_name)) + return cls(feed_range) + raise ValueError(f"Can not parse FeedRangeEPK from the json, there is no property {cls.type_property_name}") diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/routing_range.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/routing_range.py index f2e7576bf376..a2d789f20644 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/routing_range.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/routing_range.py @@ -25,12 +25,8 @@ import base64 import binascii import json -from typing import Dict, Any -def partition_key_range_to_range_string(partition_key_range: Dict[str, Any]) -> str: - return Range.PartitionKeyRangeToRange(partition_key_range).to_base64_encoded_string() - class PartitionKeyRange(object): """Partition Key Range Constants""" diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py index d7d66738b4ee..17227813c29d 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py @@ -41,7 +41,8 @@ GenerateGuidId, _set_properties_cache ) -from .._routing.routing_range import Range, partition_key_range_to_range_string +from .._feed_range import FeedRange, FeedRangeEpk +from .._routing.routing_range import Range from ..offer import ThroughputProperties from ..partition_key import ( NonePartitionKeyValue, @@ -526,7 +527,7 @@ def query_items_change_feed( def query_items_change_feed( self, *, - feed_range: str, + feed_range: FeedRange, max_item_count: Optional[int] = None, start_time: Optional[Union[datetime, Literal["Now", "Beginning"]]] = None, priority: Optional[Literal["High", "Low"]] = None, @@ -534,7 +535,8 @@ def query_items_change_feed( ) -> AsyncItemPaged[Dict[str, Any]]: """Get a sorted list of items that were changed, in the order in which they were modified. - :keyword str feed_range: The feed range that is used to define the scope. + :keyword feed_range: The feed range that is used to define the scope. + :type feed_range: ~azure.cosmos.FeedRange :keyword int max_item_count: Max number of items to be returned in the enumeration operation. :keyword start_time: The start time to start processing chang feed items. Beginning: Processing the change feed items from the beginning of the change feed. @@ -659,7 +661,8 @@ def query_items_change_feed( # pylint: disable=unused-argument self._get_epk_range_for_partition_key(kwargs.pop('partition_key')) if kwargs.get("feed_range") is not None: - change_feed_state_context["feedRange"] = kwargs.pop('feed_range') + feed_range: FeedRange = kwargs.pop('feed_range') + change_feed_state_context["feedRange"] = feed_range._to_feed_range_internal() feed_options["containerProperties"] = self._get_properties() feed_options["changeFeedStateContext"] = change_feed_state_context @@ -1243,7 +1246,7 @@ async def read_feed_ranges( *, force_refresh: Optional[bool] = False, **kwargs: Any - ) -> List[str]: + ) -> List[FeedRange]: """ Obtains a list of feed ranges that can be used to parallelize feed operations. :keyword bool force_refresh: @@ -1262,4 +1265,4 @@ async def read_feed_ranges( [Range("", "FF", True, False)], **kwargs) - return [partition_key_range_to_range_string(partitionKeyRange) for partitionKeyRange in partition_key_ranges] + return [FeedRangeEpk(Range.PartitionKeyRangeToRange(partitionKeyRange)) for partitionKeyRange in partition_key_ranges] diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py index e6a6ac7b36b9..d84d286e461c 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py @@ -39,7 +39,8 @@ _set_properties_cache ) from ._cosmos_client_connection import CosmosClientConnection -from ._routing.routing_range import Range, partition_key_range_to_range_string +from ._feed_range import FeedRange, FeedRangeEpk +from ._routing.routing_range import Range from .offer import Offer, ThroughputProperties from .partition_key import ( NonePartitionKeyValue, @@ -352,7 +353,7 @@ def query_items_change_feed( def query_items_change_feed( self, *, - feed_range: str, + feed_range: FeedRange, max_item_count: Optional[int] = None, start_time: Optional[Union[datetime, Literal["Now", "Beginning"]]] = None, priority: Optional[Literal["High", "Low"]] = None, @@ -361,7 +362,8 @@ def query_items_change_feed( """Get a sorted list of items that were changed, in the order in which they were modified. - :keyword str feed_range: The feed range that is used to define the scope. + :keyword feed_range: The feed range that is used to define the scope. + :type feed_range: ~azure.cosmos.FeedRange :keyword int max_item_count: Max number of items to be returned in the enumeration operation. :keyword start_time: The start time to start processing chang feed items. Beginning: Processing the change feed items from the beginning of the change feed. @@ -500,7 +502,8 @@ def query_items_change_feed( self._get_epk_range_for_partition_key(kwargs.pop('partition_key')) if kwargs.get("feed_range") is not None: - change_feed_state_context["feedRange"] = kwargs.pop('feed_range') + feed_range: FeedRange = kwargs.pop('feed_range') + change_feed_state_context["feedRange"] = feed_range._to_feed_range_internal() container_properties = self._get_properties() feed_options["changeFeedStateContext"] = change_feed_state_context @@ -1310,7 +1313,7 @@ def read_feed_ranges( self, *, force_refresh: Optional[bool] = False, - **kwargs: Any) -> List[str]: + **kwargs: Any) -> List[FeedRange]: """ Obtains a list of feed ranges that can be used to parallelize feed operations. @@ -1329,4 +1332,4 @@ def read_feed_ranges( [Range("", "FF", True, False)], # default to full range **kwargs) - return [partition_key_range_to_range_string(partitionKeyRange) for partitionKeyRange in partition_key_ranges] + return [FeedRangeEpk(Range.PartitionKeyRangeToRange(partitionKeyRange)) for partitionKeyRange in partition_key_ranges] From ddd598e91ba949b7f5aeb4e87c20105ae77ad644 Mon Sep 17 00:00:00 2001 From: annie-mac Date: Tue, 17 Sep 2024 10:30:33 -0700 Subject: [PATCH 2/6] clean up change feed logic from query pipeline --- .../azure/cosmos/_cosmos_client_connection.py | 1 - .../aio/base_execution_context.py | 13 +------------ .../_execution_context/base_execution_context.py | 12 +----------- .../cosmos/aio/_cosmos_client_connection_async.py | 1 - 4 files changed, 2 insertions(+), 25 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py index 49198910b772..aa0241d7f289 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py @@ -1162,7 +1162,6 @@ def _QueryChangeFeed( options = {} else: options = dict(options) - options["changeFeed"] = True resource_key_map = {"Documents": "docs"} diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/base_execution_context.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/base_execution_context.py index 4ccef73388de..0e10cf263d75 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/base_execution_context.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/base_execution_context.py @@ -44,7 +44,6 @@ def __init__(self, client, options): """ self._client = client self._options = options - self._is_change_feed = "changeFeed" in options and options["changeFeed"] is True self._continuation = self._get_initial_continuation() self._has_started = False self._has_finished = False @@ -117,10 +116,6 @@ async def _fetch_items_helper_no_retries(self, fetch_function): fetched_items = [] new_options = copy.deepcopy(self._options) while self._continuation or not self._has_started: - # Check if this is first fetch for read from specific time change feed. - # For read specific time the first fetch will return empty even if we have more pages. - is_s_time_first_fetch = self._is_change_feed and self._options.get("startTime") and not self._has_started - new_options["continuation"] = self._continuation response_headers = {} @@ -129,13 +124,7 @@ async def _fetch_items_helper_no_retries(self, fetch_function): self._has_started = True continuation_key = http_constants.HttpHeaders.Continuation - # Use Etag as continuation token for change feed queries. - if self._is_change_feed: - continuation_key = http_constants.HttpHeaders.ETag - # In change feed queries, the continuation token is always populated. The hasNext() test is whether - # there is any items in the response or not. - # No initial fetch for start time change feed, so we need to pass continuation token for first fetch - if not self._is_change_feed or fetched_items or is_s_time_first_fetch: + if fetched_items: self._continuation = response_headers.get(continuation_key) else: self._continuation = None diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/base_execution_context.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/base_execution_context.py index b7ef17898656..c3924ffb6807 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/base_execution_context.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/base_execution_context.py @@ -42,7 +42,6 @@ def __init__(self, client, options): """ self._client = client self._options = options - self._is_change_feed = "changeFeed" in options and options["changeFeed"] is True self._continuation = self._get_initial_continuation() self._has_started = False self._has_finished = False @@ -115,9 +114,6 @@ def _fetch_items_helper_no_retries(self, fetch_function): fetched_items = [] new_options = copy.deepcopy(self._options) while self._continuation or not self._has_started: - # Check if this is first fetch for read from specific time change feed. - # For read specific time the first fetch will return empty even if we have more pages. - is_s_time_first_fetch = self._is_change_feed and self._options.get("startTime") and not self._has_started if not self._has_started: self._has_started = True new_options["continuation"] = self._continuation @@ -126,13 +122,7 @@ def _fetch_items_helper_no_retries(self, fetch_function): (fetched_items, response_headers) = fetch_function(new_options) continuation_key = http_constants.HttpHeaders.Continuation - # Use Etag as continuation token for change feed queries. - if self._is_change_feed: - continuation_key = http_constants.HttpHeaders.ETag - # In change feed queries, the continuation token is always populated. The hasNext() test is whether - # there is any items in the response or not. - # For start time however we get no initial results, so we need to pass continuation token - if not self._is_change_feed or fetched_items or is_s_time_first_fetch: + if fetched_items: self._continuation = response_headers.get(continuation_key) else: self._continuation = None diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py index 9e73445e2063..eeb67225660a 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py @@ -2277,7 +2277,6 @@ def _QueryChangeFeed( options = {} else: options = dict(options) - options["changeFeed"] = True resource_key_map = {"Documents": "docs"} From 27584f3b44b640d614564a0925d1919fe778eb78 Mon Sep 17 00:00:00 2001 From: annie-mac Date: Tue, 17 Sep 2024 16:16:29 -0700 Subject: [PATCH 3/6] fix --- .../cosmos/_execution_context/aio/base_execution_context.py | 6 ++---- .../cosmos/_execution_context/base_execution_context.py | 6 ++---- sdk/cosmos/azure-cosmos/azure/cosmos/_feed_range.py | 1 - 3 files changed, 4 insertions(+), 9 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/base_execution_context.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/base_execution_context.py index 0e10cf263d75..560ca6c05389 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/base_execution_context.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/base_execution_context.py @@ -124,10 +124,8 @@ async def _fetch_items_helper_no_retries(self, fetch_function): self._has_started = True continuation_key = http_constants.HttpHeaders.Continuation - if fetched_items: - self._continuation = response_headers.get(continuation_key) - else: - self._continuation = None + self._continuation = response_headers.get(continuation_key) + if fetched_items: break return fetched_items diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/base_execution_context.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/base_execution_context.py index c3924ffb6807..23ba3d170994 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/base_execution_context.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/base_execution_context.py @@ -122,10 +122,8 @@ def _fetch_items_helper_no_retries(self, fetch_function): (fetched_items, response_headers) = fetch_function(new_options) continuation_key = http_constants.HttpHeaders.Continuation - if fetched_items: - self._continuation = response_headers.get(continuation_key) - else: - self._continuation = None + self._continuation = response_headers.get(continuation_key) + if fetched_items: break return fetched_items diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_feed_range.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_feed_range.py index 5d80a45c13aa..671b71fa4723 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_feed_range.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_feed_range.py @@ -1,4 +1,3 @@ - # The MIT License (MIT) # Copyright (c) 2014 Microsoft Corporation From 95572e39beb2feb9a1c710a4e5cf189fac9d3757 Mon Sep 17 00:00:00 2001 From: annie-mac Date: Tue, 17 Sep 2024 16:56:44 -0700 Subject: [PATCH 4/6] fix doc --- sdk/cosmos/azure-cosmos/azure/cosmos/_feed_range.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_feed_range.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_feed_range.py index 671b71fa4723..4aba18fe1423 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_feed_range.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_feed_range.py @@ -29,14 +29,18 @@ class FeedRange(ABC): - """Represents a single feed range in an Azure Cosmos DB SQL API container. """ + """Represents a single feed range in an Azure Cosmos DB SQL API container. + + """ def to_string(self) -> str: - """Get a json representation of the feed range. - The returned json string can be used to create a new feed range from it. + """ + Get a json representation of the feed range. + The returned json string can be used to create a new feed range from it. :return: A json representation of the feed range. :rtype: str """ + return self._to_base64_encoded_string() @staticmethod From 8eeff36c146983ad16ed897a6866ddd65f15e2c4 Mon Sep 17 00:00:00 2001 From: annie-mac Date: Tue, 17 Sep 2024 17:27:38 -0700 Subject: [PATCH 5/6] fix doc --- .../azure/cosmos/_change_feed/change_feed_state.py | 7 ++++--- .../feed_range_composite_continuation_token.py | 3 ++- sdk/cosmos/azure-cosmos/azure/cosmos/_feed_range.py | 9 +++++---- sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py | 3 ++- sdk/cosmos/azure-cosmos/azure/cosmos/container.py | 3 ++- 5 files changed, 15 insertions(+), 10 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/change_feed_state.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/change_feed_state.py index cefc5b5b2c3a..f5674ba3e2d7 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/change_feed_state.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/change_feed_state.py @@ -34,7 +34,8 @@ from azure.cosmos._change_feed.change_feed_start_from import ChangeFeedStartFromInternal, \ ChangeFeedStartFromETagAndFeedRange from azure.cosmos._change_feed.composite_continuation_token import CompositeContinuationToken -from azure.cosmos._change_feed.feed_range_internal import FeedRangeInternal, FeedRangeInternalEpk, FeedRangeInternalPartitionKey +from azure.cosmos._change_feed.feed_range_internal import (FeedRangeInternal, FeedRangeInternalEpk, + FeedRangeInternalPartitionKey) from azure.cosmos._change_feed.feed_range_composite_continuation_token import FeedRangeCompositeContinuation from azure.cosmos._routing.aio.routing_map_provider import SmartRoutingMapProvider as AsyncSmartRoutingMapProvider from azure.cosmos._routing.routing_map_provider import SmartRoutingMapProvider @@ -388,7 +389,7 @@ def from_initial_state( feed_range =\ FeedRangeInternalPartitionKey( change_feed_state_context["partitionKey"], - change_feed_state_context["partitionKeyFeedRange"]) + change_feed_state_context["partitionKeyFeedRange"]) # type: FeedRangeInternal else: raise ValueError("partitionKey is in the changeFeedStateContext, but missing partitionKeyFeedRange") else: @@ -399,7 +400,7 @@ def from_initial_state( "FF", True, False) - ) + ) # type: FeedRangeInternal change_feed_start_from = ( ChangeFeedStartFromInternal.from_start_time(change_feed_state_context.get("startTime"))) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/feed_range_composite_continuation_token.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/feed_range_composite_continuation_token.py index 7d40f51f7994..8f87ccfa194a 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/feed_range_composite_continuation_token.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/feed_range_composite_continuation_token.py @@ -26,7 +26,8 @@ from typing import Any, Deque, Dict, Optional from azure.cosmos._change_feed.composite_continuation_token import CompositeContinuationToken -from azure.cosmos._change_feed.feed_range_internal import FeedRangeInternal, FeedRangeInternalEpk, FeedRangeInternalPartitionKey +from azure.cosmos._change_feed.feed_range_internal import (FeedRangeInternal, FeedRangeInternalEpk, + FeedRangeInternalPartitionKey) from azure.cosmos._routing.routing_map_provider import SmartRoutingMapProvider from azure.cosmos._routing.aio.routing_map_provider import SmartRoutingMapProvider as AsyncSmartRoutingMapProvider from azure.cosmos._routing.routing_range import Range diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_feed_range.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_feed_range.py index 4aba18fe1423..fafb64c9d660 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_feed_range.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_feed_range.py @@ -27,7 +27,7 @@ from azure.cosmos._change_feed.feed_range_internal import FeedRangeInternal, FeedRangeInternalEpk from azure.cosmos._routing.routing_range import Range - +# pylint: disable=protected-access class FeedRange(ABC): """Represents a single feed range in an Azure Cosmos DB SQL API container. @@ -37,6 +37,7 @@ def to_string(self) -> str: """ Get a json representation of the feed range. The returned json string can be used to create a new feed range from it. + :return: A json representation of the feed range. :rtype: str """ @@ -48,7 +49,7 @@ def from_string(json_str: str) -> 'FeedRange': """ Create a feed range from previously obtained string representation. - :param json_str: A string representation of a feed range. + :param str json_str: A string representation of a feed range. :return: A feed range. :rtype: ~azure.cosmos.FeedRange """ @@ -56,8 +57,8 @@ def from_string(json_str: str) -> 'FeedRange': feed_range_json = json.loads(feed_range_json_str) if feed_range_json.get(FeedRangeEpk.type_property_name): return FeedRangeEpk._from_json(feed_range_json) - else: - raise ValueError("Invalid feed range base64 encoded string [Wrong feed range type]") + + raise ValueError("Invalid feed range base64 encoded string [Wrong feed range type]") @abstractmethod def _to_dict(self) -> Dict[str, Any]: diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py index 17227813c29d..f4ccd2547d72 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py @@ -1265,4 +1265,5 @@ async def read_feed_ranges( [Range("", "FF", True, False)], **kwargs) - return [FeedRangeEpk(Range.PartitionKeyRangeToRange(partitionKeyRange)) for partitionKeyRange in partition_key_ranges] + return [FeedRangeEpk(Range.PartitionKeyRangeToRange(partitionKeyRange)) + for partitionKeyRange in partition_key_ranges] diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py index d84d286e461c..af5097eaa4f4 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py @@ -1332,4 +1332,5 @@ def read_feed_ranges( [Range("", "FF", True, False)], # default to full range **kwargs) - return [FeedRangeEpk(Range.PartitionKeyRangeToRange(partitionKeyRange)) for partitionKeyRange in partition_key_ranges] + return [FeedRangeEpk(Range.PartitionKeyRangeToRange(partitionKeyRange)) + for partitionKeyRange in partition_key_ranges] From 5061de2ba6f7c5e8c33e9adcabc5efab143aa083 Mon Sep 17 00:00:00 2001 From: annie-mac Date: Wed, 18 Sep 2024 08:41:50 -0700 Subject: [PATCH 6/6] fix pylint --- .../azure/cosmos/_change_feed/change_feed_state.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/change_feed_state.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/change_feed_state.py index f5674ba3e2d7..f31330fd2318 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/change_feed_state.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/change_feed_state.py @@ -389,7 +389,7 @@ def from_initial_state( feed_range =\ FeedRangeInternalPartitionKey( change_feed_state_context["partitionKey"], - change_feed_state_context["partitionKeyFeedRange"]) # type: FeedRangeInternal + change_feed_state_context["partitionKeyFeedRange"]) else: raise ValueError("partitionKey is in the changeFeedStateContext, but missing partitionKeyFeedRange") else: @@ -400,10 +400,12 @@ def from_initial_state( "FF", True, False) - ) # type: FeedRangeInternal + ) change_feed_start_from = ( ChangeFeedStartFromInternal.from_start_time(change_feed_state_context.get("startTime"))) + + assert feed_range is not None return cls( container_link=container_link, container_rid=collection_rid,