Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
2950e20
merge from main and resolve conflicts
Aug 14, 2024
7a1a1eb
remove async keyword from changeFeed query in aio package
Aug 18, 2024
b6c53fb
refactor
Aug 18, 2024
5f16b14
refactor
Aug 18, 2024
36990ef
fix pylint
Aug 20, 2024
3c569e8
added public surface methods
tvaron3 Aug 20, 2024
7479b0c
pylint fix
Aug 20, 2024
2e76620
fix
Aug 21, 2024
56bbb9e
added functionality for merging session tokens from logical pk
tvaron3 Aug 21, 2024
8c0aa46
fix mypy
Aug 21, 2024
28394b9
added tests for basic merge and split
tvaron3 Aug 21, 2024
25c3363
resolve comments
Aug 27, 2024
cecdfa5
resolve comments
Aug 28, 2024
65ed132
resolve comments
Aug 28, 2024
4bb30d2
resolve comments
Aug 28, 2024
5addcdc
fix pylint
Aug 29, 2024
59814d7
fix mypy
Aug 29, 2024
ec79b94
merge feed range changes
tvaron3 Aug 22, 2024
66c3f7b
fix tests
Sep 4, 2024
1e7a268
merged with feed range branch
tvaron3 Sep 4, 2024
997b6b0
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-python …
tvaron3 Sep 4, 2024
7eda72f
Merge branch 'main' into addFeedRangeSupportInChangeFeed
Sep 4, 2024
3a2e4e1
add tests
Sep 5, 2024
0883dac
fix pylint
Sep 5, 2024
b7d1210
Merge branch 'addFeedRangeSupportInChangeFeed' of https://github.com/…
tvaron3 Sep 5, 2024
195c47c
fix and resolve comments
Sep 6, 2024
246b1be
fix and resolve comments
Sep 6, 2024
10fe387
Added isSubsetFeedRange logic
tvaron3 Sep 9, 2024
6498311
Added request context to crud operations, session token helpers
tvaron3 Sep 11, 2024
5a13ddf
Merge branch 'addFeedRangeSupportInChangeFeed' of https://github.com/…
tvaron3 Sep 11, 2024
f5d0d7b
Merge branch 'main' into addFeedRangeSupportInChangeFeed
Sep 13, 2024
5cde59b
revert unnecessary change
Sep 13, 2024
a494346
Added more tests
tvaron3 Sep 20, 2024
0d75607
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-python …
tvaron3 Sep 20, 2024
c8c099f
Merge branch 'addFeedRangeSupportInChangeFeed' of https://github.com/…
tvaron3 Sep 20, 2024
ad3ae4f
Added more tests
tvaron3 Oct 5, 2024
8f466a1
merge with main
tvaron3 Oct 6, 2024
5249d0a
Changed tests to use new public feed range and more test coverage for…
tvaron3 Oct 6, 2024
40523f5
Added more tests
tvaron3 Oct 7, 2024
9f88b4e
Fix tests and add changelog
tvaron3 Oct 7, 2024
7c23e87
fix spell checks
tvaron3 Oct 7, 2024
4d0b058
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-python …
tvaron3 Oct 7, 2024
d7c598e
Added tests and pushed request context to client level
tvaron3 Oct 8, 2024
8698098
Added async methods and removed feed range from request context
tvaron3 Oct 8, 2024
c252d88
fix tests
tvaron3 Oct 9, 2024
51e721b
fix tests and pylint
tvaron3 Oct 9, 2024
923055b
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-python …
tvaron3 Oct 9, 2024
104e341
Reacting to comments
tvaron3 Oct 10, 2024
5552912
Reacting to comments
tvaron3 Oct 10, 2024
1bbbd0f
pylint and added hpk tests
tvaron3 Oct 10, 2024
a9299ab
reacting to comments
tvaron3 Oct 11, 2024
2155016
fix tests and mypy
tvaron3 Oct 11, 2024
0436355
fix mypy
tvaron3 Oct 11, 2024
103eb41
fix mypy
tvaron3 Oct 11, 2024
76451df
reacting to comments
tvaron3 Oct 15, 2024
7b0f4b7
reacting to comments
tvaron3 Oct 15, 2024
5d7b978
reacting to comments
tvaron3 Oct 15, 2024
d54992f
fix cspell
tvaron3 Oct 15, 2024
fa16830
rename method to get_latest_session_token
tvaron3 Oct 16, 2024
b2ac9d8
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-python …
tvaron3 Oct 17, 2024
6914a20
reacting to reverted feed range
tvaron3 Oct 17, 2024
ab9723a
change based on the api review
Oct 23, 2024
8a4305d
Reacting to API review and adding samples.
tvaron3 Oct 25, 2024
3a1f160
Reacting to API review and adding samples.
tvaron3 Oct 25, 2024
4bc16b1
Merge branch 'main' into tvaron3/sessionTokenHelper
tvaron3 Oct 25, 2024
900d001
Fixed pylint
tvaron3 Oct 25, 2024
96a165f
Merge branch 'tvaron3/sessionTokenHelper' of https://github.com/tvaro…
tvaron3 Oct 25, 2024
eab1822
Reacting to comments
tvaron3 Oct 28, 2024
97ffec7
Reacting to comments
tvaron3 Oct 28, 2024
2264465
Reacting to comments
tvaron3 Oct 29, 2024
35588fa
Reacting to comments
tvaron3 Oct 29, 2024
c42966f
Fix pydoc
tvaron3 Oct 30, 2024
786e357
Fix pydoc
tvaron3 Oct 31, 2024
0de21b4
reacting to comments
tvaron3 Oct 31, 2024
d32a6f1
reacting to comments
tvaron3 Oct 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 2 additions & 17 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,23 +284,8 @@ def GetHeaders( # pylint: disable=too-many-statements,too-many-branches
if options.get("disableRUPerMinuteUsage"):
headers[http_constants.HttpHeaders.DisableRUPerMinuteUsage] = options["disableRUPerMinuteUsage"]

if options.get("changeFeed") is True:
# On REST level, change feed is using IfNoneMatch/ETag instead of continuation.
if_none_match_value = None
if options.get("continuation"):
if_none_match_value = options["continuation"]
elif options.get("isStartFromBeginning") and not options["isStartFromBeginning"]:
if_none_match_value = "*"
elif options.get("startTime"):
start_time = options.get("startTime")
headers[http_constants.HttpHeaders.IfModified_since] = start_time
if if_none_match_value:
headers[http_constants.HttpHeaders.IfNoneMatch] = if_none_match_value

headers[http_constants.HttpHeaders.AIM] = http_constants.HttpHeaders.IncrementalFeedHeaderValue
else:
if options.get("continuation"):
headers[http_constants.HttpHeaders.Continuation] = options["continuation"]
if options.get("continuation"):
headers[http_constants.HttpHeaders.Continuation] = options["continuation"]

if options.get("populatePartitionKeyRangeStatistics"):
headers[http_constants.HttpHeaders.PopulatePartitionKeyRangeStatistics] = options[
Expand Down
20 changes: 20 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# The MIT License (MIT)
# Copyright (c) 2014 Microsoft Corporation

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
20 changes: 20 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/aio/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# The MIT License (MIT)
# Copyright (c) 2014 Microsoft Corporation

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
# The MIT License (MIT)
# Copyright (c) 2014 Microsoft Corporation

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

"""Internal class for processing change feed implementation in the Azure Cosmos
database service.
"""
import base64
import copy
import json
from abc import ABC, abstractmethod
from typing import Dict, Any, List

from azure.cosmos import http_constants, exceptions
from azure.cosmos._change_feed.change_feed_start_from import ChangeFeedStartFromPointInTime
from azure.cosmos._change_feed.change_feed_state import ChangeFeedStateV1, ChangeFeedStateV2
from azure.cosmos.aio import _retry_utility_async
from azure.cosmos.exceptions import CosmosHttpResponseError

# pylint: disable=protected-access

class ChangeFeedFetcher(ABC):

@abstractmethod
async def fetch_next_block(self):
pass

class ChangeFeedFetcherV1(ChangeFeedFetcher):
"""Internal class for change feed fetch v1 implementation.
This is used when partition key range id is used or when the supplied continuation token is in just simple etag.
Please note v1 does not support split or merge.

"""
def __init__(
self,
client,
resource_link: str,
feed_options: Dict[str, Any],
fetch_function):

self._client = client
self._feed_options = feed_options

self._change_feed_state = self._feed_options.pop("changeFeedState")
if not isinstance(self._change_feed_state, ChangeFeedStateV1):
raise ValueError(f"ChangeFeedFetcherV1 can not handle change feed state version"
f" {type(self._change_feed_state)}")

self._resource_link = resource_link
self._fetch_function = fetch_function

async def fetch_next_block(self):
"""Returns a block of results.

:return: List of results.
:rtype: list
"""
async def callback():
return await self.fetch_change_feed_items(self._fetch_function)

return await _retry_utility_async.ExecuteAsync(self._client, self._client._global_endpoint_manager, callback)

async def fetch_change_feed_items(self, fetch_function) -> List[Dict[str, Any]]:
new_options = copy.deepcopy(self._feed_options)
new_options["changeFeedState"] = self._change_feed_state

self._change_feed_state.populate_feed_options(new_options)
is_s_time_first_fetch = self._change_feed_state._continuation is None
while True:
(fetched_items, response_headers) = await fetch_function(new_options)
continuation_key = http_constants.HttpHeaders.ETag
# In change feed queries, the continuation token is always populated. The hasNext() test is whether
# there is any items in the response or not.
self._change_feed_state.apply_server_response_continuation(
response_headers.get(continuation_key))

if fetched_items:
break

# When processing from point in time, there will be no initial results being returned,
# so we will retry with the new continuation token again
if (isinstance(self._change_feed_state._change_feed_start_from, ChangeFeedStartFromPointInTime)
and is_s_time_first_fetch):
is_s_time_first_fetch = False
else:
break
return fetched_items


class ChangeFeedFetcherV2(object):
"""Internal class for change feed fetch v2 implementation.
"""

def __init__(
self,
client,
resource_link: str,
feed_options: Dict[str, Any],
fetch_function):

self._client = client
self._feed_options = feed_options

self._change_feed_state: ChangeFeedStateV2 = self._feed_options.pop("changeFeedState")
if not isinstance(self._change_feed_state, ChangeFeedStateV2):
raise ValueError(f"ChangeFeedFetcherV2 can not handle change feed state version "
f"{type(self._change_feed_state)}")

self._resource_link = resource_link
self._fetch_function = fetch_function

async def fetch_next_block(self):
"""Returns a block of results.

:return: List of results.
:rtype: list
"""

async def callback():
return await self.fetch_change_feed_items(self._fetch_function)

try:
return await _retry_utility_async.ExecuteAsync(
self._client,
self._client._global_endpoint_manager,
callback)
except CosmosHttpResponseError as e:
if exceptions._partition_range_is_gone(e) or exceptions._is_partition_split_or_merge(e):
# refresh change feed state
await self._change_feed_state.handle_feed_range_gone_async(
self._client._routing_map_provider,
self._resource_link)
else:
raise e

return await self.fetch_next_block()

async def fetch_change_feed_items(self, fetch_function) -> List[Dict[str, Any]]:
new_options = copy.deepcopy(self._feed_options)
new_options["changeFeedState"] = self._change_feed_state

self._change_feed_state.populate_feed_options(new_options)

is_s_time_first_fetch = True
while True:
(fetched_items, response_headers) = await fetch_function(new_options)

continuation_key = http_constants.HttpHeaders.ETag
# In change feed queries, the continuation token is always populated. The hasNext() test is whether
# there is any items in the response or not.
if fetched_items:
self._change_feed_state.apply_server_response_continuation(
response_headers.get(continuation_key))
response_headers[continuation_key] = self._get_base64_encoded_continuation()
break

# when there is no items being returned, we will decide to retry based on:
# 1. When processing from point in time, there will be no initial results being returned,
# so we will retry with the new continuation token
# 2. if the feed range of the changeFeedState span multiple physical partitions
# then we will read from the next feed range until we have looped through all physical partitions
self._change_feed_state.apply_not_modified_response()
self._change_feed_state.apply_server_response_continuation(
response_headers.get(continuation_key))

#TODO: can this part logic be simplified
if (isinstance(self._change_feed_state._change_feed_start_from, ChangeFeedStartFromPointInTime)
and is_s_time_first_fetch):
response_headers[continuation_key] = self._get_base64_encoded_continuation()
is_s_time_first_fetch = False
should_retry = True
else:
self._change_feed_state._continuation._move_to_next_token()
response_headers[continuation_key] = self._get_base64_encoded_continuation()
should_retry = self._change_feed_state.should_retry_on_not_modified_response()
is_s_time_first_fetch = False

if not should_retry:
break

return fetched_items

def _get_base64_encoded_continuation(self) -> str:
continuation_json = json.dumps(self._change_feed_state.to_dict())
json_bytes = continuation_json.encode('utf-8')
# Encode the bytes to a Base64 string
base64_bytes = base64.b64encode(json_bytes)
# Convert the Base64 bytes to a string
return base64_bytes.decode('utf-8')
Loading