Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix pylint
  • Loading branch information
annie-mac committed Aug 20, 2024
commit 36990ef428145a56660b0dc784f62250ef67145f
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ def __init__(

self._change_feed_state: ChangeFeedStateV2 = self._feed_options.pop("changeFeedState")
if not isinstance(self._change_feed_state, ChangeFeedStateV2):
raise ValueError(f"ChangeFeedFetcherV2 can not handle change feed state version {type(self._change_feed_state)}")
raise ValueError(f"ChangeFeedFetcherV2 can not handle change feed state version "
f"{type(self._change_feed_state)}")

self._resource_link = resource_link
self._fetch_function = fetch_function
Expand Down Expand Up @@ -170,29 +171,29 @@ async def fetch_change_feed_items(self, fetch_function) -> List[Dict[str, Any]]:
response_headers[continuation_key] = self._get_base64_encoded_continuation()
break

# when there is no items being returned, we will decide to retry based on:
# 1. When processing from point in time, there will be no initial results being returned,
# so we will retry with the new continuation token
# 2. if the feed range of the changeFeedState span multiple physical partitions
# then we will read from the next feed range until we have looped through all physical partitions
self._change_feed_state.apply_not_modified_response()
self._change_feed_state.apply_server_response_continuation(
response_headers.get(continuation_key))
# when there is no items being returned, we will decide to retry based on:
# 1. When processing from point in time, there will be no initial results being returned,
# so we will retry with the new continuation token
# 2. if the feed range of the changeFeedState span multiple physical partitions
# then we will read from the next feed range until we have looped through all physical partitions
self._change_feed_state.apply_not_modified_response()
self._change_feed_state.apply_server_response_continuation(
response_headers.get(continuation_key))

#TODO: can this part logic be simplified
if (isinstance(self._change_feed_state._change_feed_start_from, ChangeFeedStartFromPointInTime)
and is_s_time_first_fetch):
response_headers[continuation_key] = self._get_base64_encoded_continuation()
is_s_time_first_fetch = False
should_retry = True
else:
self._change_feed_state._continuation._move_to_next_token()
response_headers[continuation_key] = self._get_base64_encoded_continuation()
should_retry = self._change_feed_state.should_retry_on_not_modified_response()
is_s_time_first_fetch = False

#TODO: can this part logic be simplified
if (isinstance(self._change_feed_state._change_feed_start_from, ChangeFeedStartFromPointInTime)
and is_s_time_first_fetch):
response_headers[continuation_key] = self._get_base64_encoded_continuation()
is_s_time_first_fetch = False
should_retry = True
else:
self._change_feed_state._continuation._move_to_next_token()
response_headers[continuation_key] = self._get_base64_encoded_continuation()
should_retry = self._change_feed_state.should_retry_on_not_modified_response()
is_s_time_first_fetch = False

if not should_retry:
break
if not should_retry:
break

return fetched_items

Expand All @@ -203,4 +204,3 @@ def _get_base64_encoded_continuation(self) -> str:
base64_bytes = base64.b64encode(json_bytes)
# Convert the Base64 bytes to a string
return base64_bytes.decode('utf-8')

Original file line number Diff line number Diff line change
Expand Up @@ -197,4 +197,3 @@ def _get_base64_encoded_continuation(self) -> str:
base64_bytes = base64.b64encode(json_bytes)
# Convert the Base64 bytes to a string
return base64_bytes.decode('utf-8')

Original file line number Diff line number Diff line change
Expand Up @@ -157,4 +157,3 @@ def _validate_change_feed_state_context(self, change_feed_state_context: Dict[st
raise ValueError(
"partition_key_range_id, partition_key, feed_range are exclusive parameters,"
" please only set one of them")

Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,3 @@ def from_json(cls, data: Dict[str, Any]) -> 'ChangeFeedStartFromPointInTime':

point_in_time = datetime.fromtimestamp(point_in_time_ms).astimezone(timezone.utc)
return ChangeFeedStartFromPointInTime(point_in_time)



Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ def __init__(
container_rid: str,
change_feed_start_from: ChangeFeedStartFromInternal,
partition_key_range_id: Optional[str] = None,
partition_key: Optional[Union[str, int, float, bool, List[Union[str, int, float, bool]], _Empty, _Undefined]] = None,
continuation: Optional[str] = None): # pylint: disable=line-too-long
partition_key: Optional[Union[str, int, float, bool, List[Union[str, int, float, bool]], _Empty, _Undefined]] = None, # pylint: disable=line-too-long
continuation: Optional[str] = None):

self._container_link = container_link
self._container_rid = container_rid
Expand Down Expand Up @@ -142,22 +142,26 @@ def populate_request_headers(
request_headers[http_constants.HttpHeaders.AIM] = http_constants.HttpHeaders.IncrementalFeedHeaderValue

# When a merge happens, the child partition will contain documents ordered by LSN but the _ts/creation time
# of the documents may not be sequential. So when reading the changeFeed by LSN, it is possible to encounter documents with lower _ts.
# In order to guarantee we always get the documents after customer's point start time, we will need to always pass the start time in the header.
# of the documents may not be sequential. So when reading the changeFeed by LSN,
# it is possible to encounter documents with lower _ts.
# In order to guarantee we always get the documents after customer's point start time,
# we will need to always pass the start time in the header.
self._change_feed_start_from.populate_request_headers(request_headers)
if self._continuation:
request_headers[http_constants.HttpHeaders.IfNoneMatch] = self._continuation

async def populate_request_headers_async(
self,
routing_provider: AsyncSmartRoutingMapProvider,
request_headers: Dict[str, Any]) -> None:
async_routing_provider: AsyncSmartRoutingMapProvider,
request_headers: Dict[str, Any]) -> None: # pylint: disable=unused-argument

request_headers[http_constants.HttpHeaders.AIM] = http_constants.HttpHeaders.IncrementalFeedHeaderValue

# When a merge happens, the child partition will contain documents ordered by LSN but the _ts/creation time
# of the documents may not be sequential. So when reading the changeFeed by LSN, it is possible to encounter documents with lower _ts.
# In order to guarantee we always get the documents after customer's point start time, we will need to always pass the start time in the header.
# of the documents may not be sequential.
# So when reading the changeFeed by LSN, it is possible to encounter documents with lower _ts.
# In order to guarantee we always get the documents after customer's point start time,
# we will need to always pass the start time in the header.
self._change_feed_start_from.populate_request_headers(request_headers)
if self._continuation:
request_headers[http_constants.HttpHeaders.IfNoneMatch] = self._continuation
Expand Down Expand Up @@ -190,7 +194,6 @@ def __init__(
self._container_rid = container_rid
self._feed_range = feed_range
self._change_feed_start_from = change_feed_start_from
self._continuation = continuation
if self._continuation is None:
composite_continuation_token_queue: Deque = collections.deque()
composite_continuation_token_queue.append(
Expand All @@ -202,6 +205,8 @@ def __init__(
self._container_rid,
self._feed_range,
composite_continuation_token_queue)
else:
self._continuation = continuation

@property
def container_rid(self) -> str :
Expand Down Expand Up @@ -368,7 +373,7 @@ def from_initial_state(
cls,
container_link: str,
collection_rid: str,
change_feed_state_context: dict[str, Any]) -> 'ChangeFeedStateV2':
change_feed_state_context: Dict[str, Any]) -> 'ChangeFeedStateV2':

if is_key_exists_and_not_none(change_feed_state_context, "feedRange"):
feed_range_str = base64.b64decode(change_feed_state_context["feedRange"]).decode('utf-8')
Expand Down Expand Up @@ -400,6 +405,3 @@ def from_initial_state(
feed_range=feed_range,
change_feed_start_from=change_feed_start_from,
continuation=None)



Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def to_dict(self) -> Dict[str, Any]:
if isinstance(self._pk_value, _Empty):
return { self.type_property_name: [] }
if isinstance(self._pk_value, list):
return { self.type_property_name: [item for item in self._pk_value] }
return { self.type_property_name: list(self._pk_value) }

return { self.type_property_name: self._pk_value }

Expand All @@ -75,10 +75,10 @@ def from_json(cls, data: Dict[str, Any], feed_range: Range) -> 'FeedRangePartiti
pk_value = data.get(cls.type_property_name)
if not pk_value:
return cls(_Empty(), feed_range)
if pk_value is [{}]:
if pk_value == [{}]:
return cls(_Undefined(), feed_range)
if isinstance(pk_value, list):
return cls([item for item in pk_value], feed_range)
return cls(list(pk_value), feed_range)
return cls(data[cls.type_property_name], feed_range)

raise ValueError(f"Can not parse FeedRangePartitionKey from the json,"
Expand Down Expand Up @@ -107,4 +107,4 @@ def from_json(cls, data: Dict[str, Any]) -> 'FeedRangeEpk':
if is_key_exists_and_not_none(data, cls.type_property_name):
feed_range = Range.ParseFromDict(data.get(cls.type_property_name))
return cls(feed_range)
raise ValueError(f"Can not parse FeedRangeEPK from the json, there is no property {cls.type_property_name}")
raise ValueError(f"Can not parse FeedRangeEPK from the json, there is no property {cls.type_property_name}")
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,3 @@ def apply_not_modified_response(self) -> None:
@property
def feed_range(self) -> FeedRange:
return self._feed_range

16 changes: 11 additions & 5 deletions sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,8 @@ def query_items_change_feed(
partition_key: Optional[PartitionKeyType] = None,
priority: Optional[Literal["High", "Low"]] = None,
**kwargs: Any
) -> AsyncItemPaged[Dict[str, Any]]: # pylint: disable=line-too-long
) -> AsyncItemPaged[Dict[str, Any]]:
# pylint: disable=line-too-long
"""Get a sorted list of items that were changed, in the order in which they were modified.

:keyword int max_item_count: Max number of items to be returned in the enumeration operation.
Expand All @@ -510,6 +511,7 @@ def query_items_change_feed(
:returns: An AsyncItemPaged of items (dicts).
:rtype: AsyncItemPaged[Dict[str, Any]]
"""
# pylint: enable=line-too-long
...

@overload
Expand All @@ -521,7 +523,8 @@ def query_items_change_feed(
start_time: Optional[Union[datetime, Literal["Now", "Beginning"]]] = None,
priority: Optional[Literal["High", "Low"]] = None,
**kwargs: Any
) -> AsyncItemPaged[Dict[str, Any]]: # pylint: disable=line-too-long
) -> AsyncItemPaged[Dict[str, Any]]:
# pylint: disable=line-too-long
"""Get a sorted list of items that were changed, in the order in which they were modified.

:keyword str feed_range: The feed range that is used to define the scope. By default, the scope will be the entire container.
Expand All @@ -537,6 +540,7 @@ def query_items_change_feed(
:returns: An AsyncItemPaged of items (dicts).
:rtype: AsyncItemPaged[Dict[str, Any]]
"""
# pylint: enable=line-too-long
...

@overload
Expand All @@ -547,7 +551,8 @@ def query_items_change_feed(
max_item_count: Optional[int] = None,
priority: Optional[Literal["High", "Low"]] = None,
**kwargs: Any
) -> AsyncItemPaged[Dict[str, Any]]: # pylint: disable=line-too-long
) -> AsyncItemPaged[Dict[str, Any]]:
# pylint: disable=line-too-long
"""Get a sorted list of items that were changed, in the order in which they were modified.

:keyword str continuation: The continuation token retrieved from previous response.
Expand All @@ -558,15 +563,16 @@ def query_items_change_feed(
:returns: An AsyncItemPaged of items (dicts).
:rtype: AsyncItemPaged[Dict[str, Any]]
"""
# pylint: enable=line-too-long
...

@distributed_trace
def query_items_change_feed(
self,
*args: Any,
**kwargs: Any
) -> AsyncItemPaged[Dict[str, Any]]: # pylint: disable=too-many-statements

) -> AsyncItemPaged[Dict[str, Any]]:
# pylint: disable=too-many-statements
if is_key_exists_and_not_none(kwargs, "priority"):
kwargs['priority'] = kwargs['priority']
feed_options = _build_options(kwargs)
Expand Down
29 changes: 18 additions & 11 deletions sdk/cosmos/azure-cosmos/azure/cosmos/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def _set_partition_key(

def _get_epk_range_for_partition_key(self, partition_key_value: PartitionKeyType) -> Range:
container_properties = self._get_properties()
partition_key_definition = container_properties.get("partitionKey")
partition_key_definition = container_properties["partitionKey"]
partition_key = PartitionKey(path=partition_key_definition["paths"], kind=partition_key_definition["kind"])

return partition_key._get_epk_range_for_partition_key(partition_key_value)
Expand Down Expand Up @@ -328,7 +328,8 @@ def query_items_change_feed(
partition_key: Optional[PartitionKeyType] = None,
priority: Optional[Literal["High", "Low"]] = None,
**kwargs: Any
) -> ItemPaged[Dict[str, Any]]: # pylint: disable=line-too-long
) -> ItemPaged[Dict[str, Any]]:
# pylint: disable=line-too-long
"""Get a sorted list of items that were changed, in the order in which they were modified.

:keyword int max_item_count: Max number of items to be returned in the enumeration operation.
Expand All @@ -346,6 +347,7 @@ def query_items_change_feed(
:returns: An Iterable of items (dicts).
:rtype: Iterable[Dict[str, Any]]
"""
# pylint: enable=line-too-long
...

@overload
Expand All @@ -357,7 +359,9 @@ def query_items_change_feed(
start_time: Optional[Union[datetime, Literal["Now", "Beginning"]]] = None,
priority: Optional[Literal["High", "Low"]] = None,
**kwargs: Any
) -> ItemPaged[Dict[str, Any]]: # pylint: disable=line-too-long
) -> ItemPaged[Dict[str, Any]]:
# pylint: disable=line-too-long

"""Get a sorted list of items that were changed, in the order in which they were modified.

:keyword str feed_range: The feed range that is used to define the scope.
Expand All @@ -375,6 +379,7 @@ def query_items_change_feed(
:returns: An Iterable of items (dicts).
:rtype: Iterable[Dict[str, Any]]
"""
# pylint: enable=line-too-long
...

@overload
Expand All @@ -385,7 +390,8 @@ def query_items_change_feed(
max_item_count: Optional[int] = None,
priority: Optional[Literal["High", "Low"]] = None,
**kwargs: Any
) -> ItemPaged[Dict[str, Any]]: # pylint: disable=line-too-long
) -> ItemPaged[Dict[str, Any]]:
# pylint: disable=line-too-long
"""Get a sorted list of items that were changed, in the order in which they were modified.

:keyword str continuation: The continuation token retrieved from previous response.
Expand All @@ -396,15 +402,16 @@ def query_items_change_feed(
:returns: An Iterable of items (dicts).
:rtype: Iterable[Dict[str, Any]]
"""
# pylint: enable=line-too-long
...

@distributed_trace
def query_items_change_feed(
self,
*args: Any,
**kwargs: Any
) -> ItemPaged[Dict[str, Any]]: # pylint: disable=too-many-statements

) -> ItemPaged[Dict[str, Any]]:
# pylint: disable=too-many-statements
if is_key_exists_and_not_none(kwargs, "priority"):
kwargs['priority'] = kwargs['priority']
feed_options = build_options(kwargs)
Expand Down Expand Up @@ -1274,14 +1281,14 @@ def delete_all_items_by_partition_key(
self.client_connection.DeleteAllItemsByPartitionKey(
collection_link=self.container_link, options=request_options, **kwargs)

def read_feed_ranges(
def read_feed_ranges( # pylint: disable=unused-argument
self,
**kwargs: Any
) -> List[str]: # pylint: disable=unused-argument
) -> List[str]:
partition_key_ranges =\
self.client_connection._routing_map_provider.get_overlapping_ranges(
self.container_link,
# default to full range
[Range("", "FF", True, False)])
[Range("", "FF", True, False)]) # default to full range

return [routing_range.Range.PartitionKeyRangeToRange(partitionKeyRange).to_base64_encoded_string() for partitionKeyRange in partition_key_ranges]
return [routing_range.Range.PartitionKeyRangeToRange(partitionKeyRange).to_base64_encoded_string()
for partitionKeyRange in partition_key_ranges]