Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix mypy
  • Loading branch information
annie-mac committed Aug 29, 2024
commit 59814d7e368420bfdc806fabede945d61af9420d
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@

"""Iterable change feed results in the Azure Cosmos database service.
"""
from typing import Dict, Any, Optional, Callable, Tuple, List, AsyncIterator, Awaitable
from typing import Dict, Any, Optional, Callable, Tuple, List, Awaitable, Union

from azure.core.async_paging import AsyncPageIterator

from azure.cosmos._change_feed.aio.change_feed_fetcher import ChangeFeedFetcherV1, ChangeFeedFetcherV2, \
ChangeFeedFetcher
from azure.cosmos._change_feed.aio.change_feed_fetcher import ChangeFeedFetcherV1, ChangeFeedFetcherV2
from azure.cosmos._change_feed.change_feed_state import ChangeFeedState, ChangeFeedStateVersion


Expand Down Expand Up @@ -60,7 +59,7 @@ def __init__(
self._options = options
self._fetch_function = fetch_function
self._collection_link = collection_link
self._change_feed_fetcher: Optional[ChangeFeedFetcher] = None
self._change_feed_fetcher: Optional[Union[ChangeFeedFetcherV1, ChangeFeedFetcherV2]] = None

if self._options.get("changeFeedStateContext") is None:
raise ValueError("Missing changeFeedStateContext in feed options")
Expand Down Expand Up @@ -90,9 +89,9 @@ def __init__(

async def _unpack(
self,
block: AsyncIterator[List[Dict[str, Any]]]
) -> Tuple[Optional[str], AsyncIterator[List[Dict[str, Any]]]]:
continuation = None
block: List[Dict[str, Any]]
) -> Tuple[Optional[str], List[Dict[str, Any]]]:
continuation: Optional[str] = None
if self._client.last_response_headers:
continuation = self._client.last_response_headers.get('etag')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

"""Iterable change feed results in the Azure Cosmos database service.
"""
from typing import Dict, Any, Tuple, List, Optional, Callable, cast
from typing import Dict, Any, Tuple, List, Optional, Callable, cast, Union

from azure.core.paging import PageIterator

Expand Down Expand Up @@ -57,7 +57,7 @@ def __init__(
self._options = options
self._fetch_function = fetch_function
self._collection_link = collection_link
self._change_feed_fetcher: Optional[ChangeFeedFetcher] = None
self._change_feed_fetcher: Optional[Union[ChangeFeedFetcherV1, ChangeFeedFetcherV2]] = None

if self._options.get("changeFeedStateContext") is None:
raise ValueError("Missing changeFeedStateContext in feed options")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from ._auth_policy import CosmosBearerTokenCredentialPolicy
from ._base import _set_properties_cache
from ._change_feed.change_feed_iterable import ChangeFeedIterable
from ._change_feed.change_feed_state import ChangeFeedState
from ._constants import _Constants as Constants
from ._cosmos_http_logging_policy import CosmosHttpLoggingPolicy
from ._range_partition_resolver import RangePartitionResolver
Expand Down Expand Up @@ -3024,8 +3025,9 @@ def __GetBodiesFromQueryResult(result: Dict[str, Any]) -> List[Dict[str, Any]]:
partition_key_range_id
)

if options.get("changeFeedState") is not None:
options.get("changeFeedState").populate_request_headers(self._routing_map_provider, headers)
change_feed_state: Optional[ChangeFeedState] = options.get("changeFeedState")
if change_feed_state is not None:
change_feed_state.populate_request_headers(self._routing_map_provider, headers)

result, last_response_headers = self.__Get(path, request_params, headers, **kwargs)
self.last_response_headers = last_response_headers
Expand Down
38 changes: 19 additions & 19 deletions sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,11 +499,11 @@ def query_items_change_feed(
*,
max_item_count: Optional[int] = None,
start_time: Optional[Union[datetime, Literal["Now", "Beginning"]]] = None,
partition_key: PartitionKeyType,
priority: Optional[Literal["High", "Low"]] = None,
**kwargs: Any
) -> AsyncItemPaged[Dict[str, Any]]:
"""Get a sorted list of items that were changed in the entire container,
in the order in which they were modified.
"""Get a sorted list of items that were changed, in the order in which they were modified.

:keyword int max_item_count: Max number of items to be returned in the enumeration operation.
:keyword start_time: The start time to start processing chang feed items.
Expand All @@ -512,6 +512,9 @@ def query_items_change_feed(
~datetime.datetime: processing change feed from a point of time. Provided value will be converted to UTC.
By default, it is start from current ("Now")
:type start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]]
:keyword partition_key: The partition key that is used to define the scope
(logical partition or a subset of a container)
:type partition_key: Union[str, int, float, bool, List[Union[str, int, float, bool]]]
:keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each
request. Once the user has reached their provisioned throughput, low priority requests are throttled
before high priority requests start getting throttled. Feature must first be enabled at the account level.
Expand All @@ -525,24 +528,22 @@ def query_items_change_feed(
def query_items_change_feed(
self,
*,
feed_range: str,
max_item_count: Optional[int] = None,
start_time: Optional[Union[datetime, Literal["Now", "Beginning"]]] = None,
partition_key: PartitionKeyType,
priority: Optional[Literal["High", "Low"]] = None,
**kwargs: Any
) -> AsyncItemPaged[Dict[str, Any]]:
"""Get a sorted list of items that were changed, in the order in which they were modified.

:keyword str feed_range: The feed range that is used to define the scope.
:keyword int max_item_count: Max number of items to be returned in the enumeration operation.
:keyword start_time: The start time to start processing chang feed items.
Beginning: Processing the change feed items from the beginning of the change feed.
Now: Processing change feed from the current time, so only events for all future changes will be retrieved.
~datetime.datetime: processing change feed from a point of time. Provided value will be converted to UTC.
By default, it is start from current ("Now")
:type start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]]
:keyword partition_key: The partition key that is used to define the scope
(logical partition or a subset of a container)
:type partition_key: Union[str, int, float, bool, List[Union[str, int, float, bool]]]
:keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each
request. Once the user has reached their provisioned throughput, low priority requests are throttled
before high priority requests start getting throttled. Feature must first be enabled at the account level.
Expand All @@ -556,52 +557,51 @@ def query_items_change_feed(
def query_items_change_feed(
self,
*,
feed_range: str,
continuation: str,
max_item_count: Optional[int] = None,
start_time: Optional[Union[datetime, Literal["Now", "Beginning"]]] = None,
priority: Optional[Literal["High", "Low"]] = None,
**kwargs: Any
) -> AsyncItemPaged[Dict[str, Any]]:
"""Get a sorted list of items that were changed, in the order in which they were modified.

:keyword str feed_range: The feed range that is used to define the scope.
:keyword str continuation: The continuation token retrieved from previous response.
:keyword int max_item_count: Max number of items to be returned in the enumeration operation.
:keyword start_time: The start time to start processing chang feed items.
Beginning: Processing the change feed items from the beginning of the change feed.
Now: Processing change feed from the current time, so only events for all future changes will be retrieved.
~datetime.datetime: processing change feed from a point of time. Provided value will be converted to UTC.
By default, it is start from current ("Now")
:type start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]]
:keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each
request. Once the user has reached their provisioned throughput, low priority requests are throttled
before high priority requests start getting throttled. Feature must first be enabled at the account level.
:keyword Callable response_hook: A callable invoked with the response metadata.
:returns: An AsyncItemPaged of items (dicts).
:rtype: AsyncItemPaged[Dict[str, Any]]
"""
# pylint: enable=line-too-long
...

@overload
def query_items_change_feed(
self,
*,
continuation: str,
max_item_count: Optional[int] = None,
start_time: Optional[Union[datetime, Literal["Now", "Beginning"]]] = None,
priority: Optional[Literal["High", "Low"]] = None,
**kwargs: Any
) -> AsyncItemPaged[Dict[str, Any]]:
"""Get a sorted list of items that were changed, in the order in which they were modified.
"""Get a sorted list of items that were changed in the entire container,
in the order in which they were modified.

:keyword str continuation: The continuation token retrieved from previous response.
:keyword int max_item_count: Max number of items to be returned in the enumeration operation.
:keyword start_time: The start time to start processing chang feed items.
Beginning: Processing the change feed items from the beginning of the change feed.
Now: Processing change feed from the current time, so only events for all future changes will be retrieved.
~datetime.datetime: processing change feed from a point of time. Provided value will be converted to UTC.
By default, it is start from current ("Now")
:type start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]]
:keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each
request. Once the user has reached their provisioned throughput, low priority requests are throttled
before high priority requests start getting throttled. Feature must first be enabled at the account level.
:keyword Callable response_hook: A callable invoked with the response metadata.
:returns: An AsyncItemPaged of items (dicts).
:rtype: AsyncItemPaged[Dict[str, Any]]
"""
# pylint: enable=line-too-long
...

@distributed_trace
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
from .._base import _set_properties_cache
from .. import documents
from .._change_feed.aio.change_feed_iterable import ChangeFeedIterable
from .._change_feed.change_feed_state import ChangeFeedState
from .._routing import routing_range
from ..documents import ConnectionPolicy, DatabaseAccount
from .._constants import _Constants as Constants
Expand Down Expand Up @@ -2813,8 +2814,9 @@ def __GetBodiesFromQueryResult(result: Dict[str, Any]) -> List[Dict[str, Any]]:
)
headers = base.GetHeaders(self, initial_headers, "get", path, id_, typ, options, partition_key_range_id)

if options.get("changeFeedState") is not None:
await options.get("changeFeedState").populate_request_headers_async(self._routing_map_provider, headers)
change_feed_state: Optional[ChangeFeedState] = options.get("changeFeedState")
if change_feed_state is not None:
await change_feed_state.populate_request_headers_async(self._routing_map_provider, headers)

result, self.last_response_headers = await self.__Get(path, request_params, headers, **kwargs)
if response_hook:
Expand Down
2 changes: 1 addition & 1 deletion sdk/cosmos/azure-cosmos/azure/cosmos/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ def query_items( # pylint:disable=docstring-missing-param
feed_options["populateIndexMetrics"] = populate_index_metrics
if partition_key is not None:
partition_key_value = self._set_partition_key(partition_key)
if self.__is_prefix_partitionkey(partition_key_value):
if self.__is_prefix_partitionkey(partition_key):
kwargs["isPrefixPartitionQuery"] = True
properties = self._get_properties()
kwargs["partitionKeyDefinition"] = properties["partitionKey"]
Expand Down