Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
resolve comments
  • Loading branch information
annie-mac committed Aug 28, 2024
commit cecdfa5ed2862d8ec14b4a14ae0b7c828fa53a37
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,13 @@
database service.
"""
import base64
import copy
import json
from abc import ABC, abstractmethod
from typing import Dict, Any, List
from typing import Dict, Any, List, Callable, Tuple, Awaitable

from azure.cosmos import http_constants, exceptions
from azure.cosmos._change_feed.change_feed_start_from import ChangeFeedStartFromPointInTime
from azure.cosmos._change_feed.change_feed_state import ChangeFeedStateV1, ChangeFeedStateV2
from azure.cosmos._change_feed.change_feed_start_from import ChangeFeedStartFromType
from azure.cosmos._change_feed.change_feed_state import ChangeFeedStateV2, ChangeFeedStateVersion
from azure.cosmos.aio import _retry_utility_async
from azure.cosmos.exceptions import CosmosHttpResponseError

Expand All @@ -39,7 +38,7 @@
class ChangeFeedFetcher(ABC):

@abstractmethod
async def fetch_next_block(self):
async def fetch_next_block(self) -> List[Dict[str, Any]]:
pass

class ChangeFeedFetcherV1(ChangeFeedFetcher):
Expand All @@ -53,38 +52,38 @@ def __init__(
client,
resource_link: str,
feed_options: Dict[str, Any],
fetch_function):
fetch_function: Callable[[Dict[str, Any]], Awaitable[Tuple[List[Dict[str, Any]], Dict[str, Any]]]]
) -> None:

self._client = client
self._feed_options = feed_options

self._change_feed_state = self._feed_options.pop("changeFeedState")
if not isinstance(self._change_feed_state, ChangeFeedStateV1):
if self._change_feed_state.version != ChangeFeedStateVersion.V1:
raise ValueError(f"ChangeFeedFetcherV1 can not handle change feed state version"
f" {type(self._change_feed_state)}")

self._resource_link = resource_link
self._fetch_function = fetch_function

async def fetch_next_block(self):
async def fetch_next_block(self) -> List[Dict[str, Any]]:
"""Returns a block of results.

:return: List of results.
:rtype: list
"""
async def callback():
return await self.fetch_change_feed_items(self._fetch_function)
return await self.fetch_change_feed_items()

return await _retry_utility_async.ExecuteAsync(self._client, self._client._global_endpoint_manager, callback)

async def fetch_change_feed_items(self, fetch_function) -> List[Dict[str, Any]]:
new_options = copy.deepcopy(self._feed_options)
new_options["changeFeedState"] = self._change_feed_state
async def fetch_change_feed_items(self) -> List[Dict[str, Any]]:
self._feed_options["changeFeedState"] = self._change_feed_state

self._change_feed_state.populate_feed_options(new_options)
self._change_feed_state.populate_feed_options(self._feed_options)
is_s_time_first_fetch = self._change_feed_state._continuation is None
while True:
(fetched_items, response_headers) = await fetch_function(new_options)
(fetched_items, response_headers) = await self._fetch_function(self._feed_options)
continuation_key = http_constants.HttpHeaders.ETag
# In change feed queries, the continuation token is always populated. The hasNext() test is whether
# there is any items in the response or not.
Expand All @@ -96,7 +95,7 @@ async def fetch_change_feed_items(self, fetch_function) -> List[Dict[str, Any]]:

# When processing from point in time, there will be no initial results being returned,
# so we will retry with the new continuation token again
if (isinstance(self._change_feed_state._change_feed_start_from, ChangeFeedStartFromPointInTime)
if (self._change_feed_state._change_feed_start_from.version == ChangeFeedStartFromType.POINT_IN_TIME
and is_s_time_first_fetch):
is_s_time_first_fetch = False
else:
Expand All @@ -113,28 +112,29 @@ def __init__(
client,
resource_link: str,
feed_options: Dict[str, Any],
fetch_function):
fetch_function: Callable[[Dict[str, Any]], Awaitable[Tuple[List[Dict[str, Any]], Dict[str, Any]]]]
) -> None:

self._client = client
self._feed_options = feed_options

self._change_feed_state: ChangeFeedStateV2 = self._feed_options.pop("changeFeedState")
if not isinstance(self._change_feed_state, ChangeFeedStateV2):
if self._change_feed_state.version != ChangeFeedStateVersion.V2:
raise ValueError(f"ChangeFeedFetcherV2 can not handle change feed state version "
f"{type(self._change_feed_state)}")
f"{type(self._change_feed_state.version)}")

self._resource_link = resource_link
self._fetch_function = fetch_function

async def fetch_next_block(self):
async def fetch_next_block(self) -> List[Dict[str, Any]]:
"""Returns a block of results.

:return: List of results.
:rtype: list
"""

async def callback():
return await self.fetch_change_feed_items(self._fetch_function)
return await self.fetch_change_feed_items()

try:
return await _retry_utility_async.ExecuteAsync(
Expand All @@ -152,15 +152,14 @@ async def callback():

return await self.fetch_next_block()

async def fetch_change_feed_items(self, fetch_function) -> List[Dict[str, Any]]:
new_options = copy.deepcopy(self._feed_options)
new_options["changeFeedState"] = self._change_feed_state
async def fetch_change_feed_items(self) -> List[Dict[str, Any]]:
self._feed_options["changeFeedState"] = self._change_feed_state

self._change_feed_state.populate_feed_options(new_options)
self._change_feed_state.populate_feed_options(self._feed_options)

is_s_time_first_fetch = True
while True:
(fetched_items, response_headers) = await fetch_function(new_options)
(fetched_items, response_headers) = await self._fetch_function(self._feed_options)

continuation_key = http_constants.HttpHeaders.ETag
# In change feed queries, the continuation token is always populated. The hasNext() test is whether
Expand All @@ -180,8 +179,7 @@ async def fetch_change_feed_items(self, fetch_function) -> List[Dict[str, Any]]:
self._change_feed_state.apply_server_response_continuation(
response_headers.get(continuation_key))

#TODO: can this part logic be simplified
if (isinstance(self._change_feed_state._change_feed_start_from, ChangeFeedStartFromPointInTime)
if (self._change_feed_state._change_feed_start_from.version == ChangeFeedStartFromType.POINT_IN_TIME
and is_s_time_first_fetch):
response_headers[continuation_key] = self._get_base64_encoded_continuation()
is_s_time_first_fetch = False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@

"""Iterable change feed results in the Azure Cosmos database service.
"""
from typing import Dict, Any
from typing import Dict, Any, Optional, Callable, Coroutine, Tuple, List, AsyncIterator

from azure.core.async_paging import AsyncPageIterator

from azure.cosmos import PartitionKey
from azure.cosmos._change_feed.aio.change_feed_fetcher import ChangeFeedFetcherV1, ChangeFeedFetcherV2
from azure.cosmos._change_feed.change_feed_state import ChangeFeedState, ChangeFeedStateV1
from azure.cosmos._change_feed.change_feed_state import ChangeFeedState, ChangeFeedStateVersion
from azure.cosmos._utils import is_base64_encoded


Expand All @@ -42,21 +42,20 @@ class ChangeFeedIterable(AsyncPageIterator):
def __init__(
self,
client,
options,
fetch_function=None,
collection_link=None,
continuation_token=None,
):
options: Dict[str, Any],
fetch_function=Optional[Callable[[Dict[str, Any]], Coroutine[Tuple[List[Dict[str, Any]], Dict[str, Any]]]]],
collection_link=Optional[str],
continuation_token=Optional[str],
) -> None:
"""Instantiates a ChangeFeedIterable for non-client side partitioning queries.

ChangeFeedFetcher will be used as the internal query execution
context.

:param CosmosClient client: Instance of document client.
:param dict options: The request options for the request.
:param method fetch_function:

:param CosmosClient client: Instance of document client.
:param dict options: The request options for the request.
:param fetch_function: The fetch function.
:param collection_link: The collection resource link.
:param continuation_token: The continuation token passed in from by_page
"""

self._client = client
self.retry_options = client.connection_policy.RetryOptions
self._options = options
Expand Down Expand Up @@ -90,7 +89,7 @@ def __init__(

super(ChangeFeedIterable, self).__init__(self._fetch_next, self._unpack, continuation_token=continuation_token)

async def _unpack(self, block):
async def _unpack(self, block) -> Tuple[str, AsyncIterator[List[Dict[str, Any]]]]:
continuation = None
if self._client.last_response_headers:
continuation = self._client.last_response_headers.get('etag')
Expand All @@ -99,12 +98,9 @@ async def _unpack(self, block):
self._did_a_call_already = False
return continuation, block

async def _fetch_next(self, *args): # pylint: disable=unused-argument
async def _fetch_next(self, *args) -> List[Dict[str, Any]]: # pylint: disable=unused-argument
"""Return a block of results with respecting retry policy.

This method only exists for backward compatibility reasons. (Because
QueryIterable has exposed fetch_next_block api).

:param Any args:
:return: List of results.
:rtype: list
Expand All @@ -117,7 +113,7 @@ async def _fetch_next(self, *args): # pylint: disable=unused-argument
raise StopAsyncIteration
return block

async def _initialize_change_feed_fetcher(self):
async def _initialize_change_feed_fetcher(self) -> None:
change_feed_state_context = self._options.pop("changeFeedStateContext")
conn_properties = await self._options.pop("containerProperties")
if change_feed_state_context.get("partitionKey"):
Expand All @@ -131,7 +127,7 @@ async def _initialize_change_feed_fetcher(self):
ChangeFeedState.from_json(self._collection_link, conn_properties["_rid"], change_feed_state_context)
self._options["changeFeedState"] = change_feed_state

if isinstance(change_feed_state, ChangeFeedStateV1):
if change_feed_state.version != ChangeFeedStateVersion.V1:
self._change_feed_fetcher = ChangeFeedFetcherV1(
self._client,
self._collection_link,
Expand All @@ -148,11 +144,11 @@ async def _initialize_change_feed_fetcher(self):

def _validate_change_feed_state_context(self, change_feed_state_context: Dict[str, Any]) -> None:

if change_feed_state_context.get("continuationPkRangeId"):
if change_feed_state_context.get("continuationPkRangeId") is not None:
# if continuation token is in v1 format, throw exception if feed_range is set
if change_feed_state_context.get("feedRange"):
if change_feed_state_context.get("feedRange") is not None:
raise ValueError("feed_range and continuation are incompatible")
elif change_feed_state_context.get("continuationFeedRange"):
elif change_feed_state_context.get("continuationFeedRange") is not None:
# if continuation token is in v2 format, since the token itself contains the full change feed state
# so we will ignore other parameters (including incompatible parameters) if they passed in
pass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,13 @@
database service.
"""
import base64
import copy
import json
from abc import ABC, abstractmethod
from typing import Dict, Any, List
from typing import Dict, Any, List, Callable, Tuple

from azure.cosmos import _retry_utility, http_constants, exceptions
from azure.cosmos._change_feed.change_feed_start_from import ChangeFeedStartFromPointInTime
from azure.cosmos._change_feed.change_feed_state import ChangeFeedStateV1, ChangeFeedStateV2
from azure.cosmos._change_feed.change_feed_start_from import ChangeFeedStartFromType
from azure.cosmos._change_feed.change_feed_state import ChangeFeedStateV1, ChangeFeedStateV2, ChangeFeedStateVersion
from azure.cosmos.exceptions import CosmosHttpResponseError

# pylint: disable=protected-access
Expand All @@ -52,38 +51,38 @@ def __init__(
client,
resource_link: str,
feed_options: Dict[str, Any],
fetch_function):
fetch_function: Callable[[Dict[str, Any]], Tuple[List[Dict[str, Any]], Dict[str, Any]]]
) -> None:

self._client = client
self._feed_options = feed_options

self._change_feed_state: ChangeFeedStateV1 = self._feed_options.pop("changeFeedState")
if not isinstance(self._change_feed_state, ChangeFeedStateV1):
if self._change_feed_state.version != ChangeFeedStateVersion.V1:
raise ValueError(f"ChangeFeedFetcherV1 can not handle change feed state version"
f" {type(self._change_feed_state)}")

self._resource_link = resource_link
self._fetch_function = fetch_function

def fetch_next_block(self):
def fetch_next_block(self) -> List[Dict[str, Any]]:
"""Returns a block of results.

:return: List of results.
:rtype: list
"""
def callback():
return self.fetch_change_feed_items(self._fetch_function)
return self.fetch_change_feed_items()

return _retry_utility.Execute(self._client, self._client._global_endpoint_manager, callback)

def fetch_change_feed_items(self, fetch_function) -> List[Dict[str, Any]]:
new_options = copy.deepcopy(self._feed_options)
new_options["changeFeedState"] = self._change_feed_state
def fetch_change_feed_items(self) -> List[Dict[str, Any]]:
self._feed_options["changeFeedState"] = self._change_feed_state

self._change_feed_state.populate_feed_options(new_options)
self._change_feed_state.populate_feed_options(self._feed_options)
is_s_time_first_fetch = self._change_feed_state._continuation is None
while True:
(fetched_items, response_headers) = fetch_function(new_options)
(fetched_items, response_headers) = self._fetch_function(self._feed_options)
continuation_key = http_constants.HttpHeaders.ETag
# In change feed queries, the continuation token is always populated. The hasNext() test is whether
# there is any items in the response or not.
Expand All @@ -95,7 +94,7 @@ def fetch_change_feed_items(self, fetch_function) -> List[Dict[str, Any]]:

# When processing from point in time, there will be no initial results being returned,
# so we will retry with the new continuation token again
if (isinstance(self._change_feed_state._change_feed_start_from, ChangeFeedStartFromPointInTime)
if (self._change_feed_state._change_feed_start_from.version == ChangeFeedStartFromType.POINT_IN_TIME
and is_s_time_first_fetch):
is_s_time_first_fetch = False
else:
Expand All @@ -112,28 +111,28 @@ def __init__(
client,
resource_link: str,
feed_options: Dict[str, Any],
fetch_function):
fetch_function: Callable[[Dict[str, Any]], Tuple[List[Dict[str, Any]], Dict[str, Any]]]):

self._client = client
self._feed_options = feed_options

self._change_feed_state: ChangeFeedStateV2 = self._feed_options.pop("changeFeedState")
if not isinstance(self._change_feed_state, ChangeFeedStateV2):
if self._change_feed_state.version != ChangeFeedStateVersion.V2:
raise ValueError(f"ChangeFeedFetcherV2 can not handle change feed state version "
f"{type(self._change_feed_state)}")

self._resource_link = resource_link
self._fetch_function = fetch_function

def fetch_next_block(self):
def fetch_next_block(self) -> List[Dict[str, Any]]:
"""Returns a block of results.

:return: List of results.
:rtype: list
"""

def callback():
return self.fetch_change_feed_items(self._fetch_function)
return self.fetch_change_feed_items()

try:
return _retry_utility.Execute(self._client, self._client._global_endpoint_manager, callback)
Expand All @@ -146,15 +145,14 @@ def callback():

return self.fetch_next_block()

def fetch_change_feed_items(self, fetch_function) -> List[Dict[str, Any]]:
new_options = copy.deepcopy(self._feed_options)
new_options["changeFeedState"] = self._change_feed_state
def fetch_change_feed_items(self) -> List[Dict[str, Any]]:
self._feed_options["changeFeedState"] = self._change_feed_state

self._change_feed_state.populate_feed_options(new_options)
self._change_feed_state.populate_feed_options(self._feed_options)

is_s_time_first_fetch = self._change_feed_state._continuation.current_token.token is None
while True:
(fetched_items, response_headers) = fetch_function(new_options)
(fetched_items, response_headers) = self._fetch_function(self._feed_options)

continuation_key = http_constants.HttpHeaders.ETag
# In change feed queries, the continuation token is always populated.
Expand All @@ -174,7 +172,7 @@ def fetch_change_feed_items(self, fetch_function) -> List[Dict[str, Any]]:
self._change_feed_state.apply_server_response_continuation(
response_headers.get(continuation_key))

if (isinstance(self._change_feed_state._change_feed_start_from, ChangeFeedStartFromPointInTime)
if (self._change_feed_state._change_feed_start_from.version == ChangeFeedStartFromType.POINT_IN_TIME
and is_s_time_first_fetch):
response_headers[continuation_key] = self._get_base64_encoded_continuation()
is_s_time_first_fetch = False
Expand Down
Loading