diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 97196b8e96be..37de4f448659 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -10,6 +10,8 @@ This version and all future versions will support Python 3.13. * Added option to disable write payload on writes. See [PR 37365](https://github.com/Azure/azure-sdk-for-python/pull/37365) * Added get feed ranges API. See [PR 37687](https://github.com/Azure/azure-sdk-for-python/pull/37687) * Added feed range support in `query_items_change_feed`. See [PR 37687](https://github.com/Azure/azure-sdk-for-python/pull/37687) +* Added **provisional** helper APIs for managing session tokens. See [PR 36971](https://github.com/Azure/azure-sdk-for-python/pull/36971) +* Added ability to get feed range for a partition key. See [PR 36971](https://github.com/Azure/azure-sdk-for-python/pull/36971) #### Breaking Changes * Item-level point operations will now return `CosmosDict` and `CosmosList` response types. diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/routing_range.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/routing_range.py index a2d789f20644..452bc32e5b34 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/routing_range.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/routing_range.py @@ -186,7 +186,6 @@ def _compare_helper(a, b): @staticmethod def overlaps(range1, range2): - if range1 is None or range2 is None: return False if range1.isEmpty() or range2.isEmpty(): @@ -195,10 +194,35 @@ def overlaps(range1, range2): cmp1 = Range._compare_helper(range1.min, range2.max) cmp2 = Range._compare_helper(range2.min, range1.max) - if cmp1 <= 0 or cmp2 <= 0: + if cmp1 <= 0 and cmp2 <= 0: if (cmp1 == 0 and not (range1.isMinInclusive and range2.isMaxInclusive)) or ( cmp2 == 0 and not (range2.isMinInclusive and range1.isMaxInclusive) ): return False return True return False + + def can_merge(self, other: 'Range') -> bool: + if self.isSingleValue() and other.isSingleValue(): + return self.min == other.min + # if share the same boundary, they can merge + overlap_boundary1 = self.max == other.min and self.isMaxInclusive or other.isMinInclusive + overlap_boundary2 = other.max == self.min and other.isMaxInclusive or self.isMinInclusive + if overlap_boundary1 or overlap_boundary2: + return True + return self.overlaps(self, other) + + def merge(self, other: 'Range') -> 'Range': + if not self.can_merge(other): + raise ValueError("Ranges do not overlap") + min_val = self.min if self.min < other.min else other.min + max_val = self.max if self.max > other.max else other.max + is_min_inclusive = self.isMinInclusive if self.min < other.min else other.isMinInclusive + is_max_inclusive = self.isMaxInclusive if self.max > other.max else other.isMaxInclusive + return Range(min_val, max_val, is_min_inclusive, is_max_inclusive) + + def is_subset(self, parent_range: 'Range') -> bool: + normalized_parent_range = parent_range.to_normalized_range() + normalized_child_range = self.to_normalized_range() + return (normalized_parent_range.min <= normalized_child_range.min and + normalized_parent_range.max >= normalized_child_range.max) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_session_token_helpers.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_session_token_helpers.py new file mode 100644 index 000000000000..36a31328e20d --- /dev/null +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_session_token_helpers.py @@ -0,0 +1,228 @@ +# The MIT License (MIT) +# Copyright (c) 2014 Microsoft Corporation + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +"""Internal Helper functions for manipulating session tokens. +""" +from typing import Tuple, List, Dict, Any + +from azure.cosmos._routing.routing_range import Range +from azure.cosmos._vector_session_token import VectorSessionToken +from ._change_feed.feed_range_internal import FeedRangeInternalEpk + +# pylint: disable=protected-access + + +# ex inputs and outputs: +# 1. "1:1#51", "1:1#55" -> "1:1#55" +# 2. "0:1#57", "1:1#52" -> "0:1#57" +# 3. "1:1#57#3=54", "2:1#52#3=51" -> "1:1#57#3=54" +# 4. "1:1#57#3=54", "1:1#58#3=53" -> "1:1#58#3=54" +def merge_session_tokens_with_same_range(session_token1: str, session_token2: str) -> str: + pk_range_id1, vector_session_token1 = parse_session_token(session_token1) + pk_range_id2, vector_session_token2 = parse_session_token(session_token2) + pk_range_id = pk_range_id1 + # The partition key range id could be different in this scenario + # + # Ex. get_updated_session_token([(("AA", "BB"), "1:1#51")], ("AA", "DD")) -> "1:1#51" + # Then we input this back into get_updated_session_token after a merge happened + # get_updated_session_token([(("AA", "DD"), "1:1#51"), (("AA", "DD"), "0:1#55")], ("AA", "DD")) -> "0:1#55" + if pk_range_id1 != pk_range_id2: + pk_range_id = pk_range_id1 \ + if vector_session_token1.global_lsn > vector_session_token2.global_lsn else pk_range_id2 + vector_session_token = vector_session_token1.merge(vector_session_token2) + return pk_range_id + ":" + vector_session_token.session_token + +def is_compound_session_token(session_token: str) -> bool: + return "," in session_token + +def parse_session_token(session_token: str) -> Tuple[str, VectorSessionToken]: + tokens = session_token.split(":") + return tokens[0], VectorSessionToken.create(tokens[1]) + +def split_compound_session_tokens(compound_session_tokens: List[Tuple[Range, str]]) -> List[str]: + session_tokens = [] + for _, session_token in compound_session_tokens: + if is_compound_session_token(session_token): + tokens = session_token.split(",") + for token in tokens: + session_tokens.append(token) + else: + session_tokens.append(session_token) + return session_tokens + +# ex inputs: +# ["1:1#51", "1:1#55", "1:1#57", "2:1#42", "2:1#45", "2:1#47"] -> ["1:1#57", "2:1#47"] +def merge_session_tokens_for_same_partition(session_tokens: List[str]) -> List[str]: + pk_session_tokens: Dict[str, List[str]] = {} + for session_token in session_tokens: + pk_range_id, _ = parse_session_token(session_token) + if pk_range_id in pk_session_tokens: + pk_session_tokens[pk_range_id].append(session_token) + else: + pk_session_tokens[pk_range_id] = [session_token] + + processed_session_tokens = [] + for session_tokens_same_pk in pk_session_tokens.values(): + pk_range_id, vector_session_token = parse_session_token(session_tokens_same_pk[0]) + for session_token in session_tokens_same_pk[1:]: + _, vector_session_token_1 = parse_session_token(session_token) + vector_session_token = vector_session_token.merge(vector_session_token_1) + processed_session_tokens.append(pk_range_id + ":" + vector_session_token.session_token) + + return processed_session_tokens + +# ex inputs: +# merge scenario +# 1. [(("AA", "BB"), "1:1#51"), (("BB", "DD"), "2:1#51"), (("AA", "DD"), "3:1#55")] -> +# [("AA", "DD"), "3:1#55"] +# split scenario +# 2. [(("AA", "BB"), "1:1#57"), (("BB", "DD"), "2:1#58"), (("AA", "DD"), "0:1#55")] -> +# [("AA", "DD"), "1:1#57,2:1#58"] +# 3. [(("AA", "BB"), "4:1#57"), (("BB", "DD"), "1:1#52"), (("AA", "DD"), "3:1#55")] -> +# [("AA", "DD"), "4:1#57,1:1#52,3:1#55"] +# goal here is to detect any obvious merges or splits that happened +# compound session tokens are not considered will just pass them along +def merge_ranges_with_subsets(overlapping_ranges: List[Tuple[Range, str]]) -> List[Tuple[Range, str]]: + processed_ranges = [] + while len(overlapping_ranges) != 0: # pylint: disable=too-many-nested-blocks + feed_range_cmp, session_token_cmp = overlapping_ranges[0] + # compound session tokens are not considered for merging + if is_compound_session_token(session_token_cmp): + processed_ranges.append(overlapping_ranges[0]) + overlapping_ranges.remove(overlapping_ranges[0]) + continue + _, vector_session_token_cmp = parse_session_token(session_token_cmp) + subsets = [] + # finding the subset feed ranges of the current feed range + for j in range(1, len(overlapping_ranges)): + feed_range = overlapping_ranges[j][0] + if not is_compound_session_token(overlapping_ranges[j][1]) and \ + feed_range.is_subset(feed_range_cmp): + subsets.append(overlapping_ranges[j] + (j,)) + + # go through subsets to see if can create current feed range from the subsets + not_found = True + j = 0 + while not_found and j < len(subsets): + merged_range = subsets[j][0] + session_tokens = [subsets[j][1]] + merged_indices = [subsets[j][2]] + if len(subsets) == 1: + _, vector_session_token = parse_session_token(session_tokens[0]) + if vector_session_token_cmp.global_lsn > vector_session_token.global_lsn: + overlapping_ranges.remove(overlapping_ranges[merged_indices[0]]) + else: + for k, subset in enumerate(subsets): + if j == k: + continue + if merged_range.can_merge(subset[0]): + merged_range = merged_range.merge(subset[0]) + session_tokens.append(subset[1]) + merged_indices.append(subset[2]) + if feed_range_cmp == merged_range: + # if feed range can be created from the subsets + # take the subsets if their global lsn is larger + # else take the current feed range + children_more_updated = True + parent_more_updated = True + for session_token in session_tokens: + _, vector_session_token = parse_session_token(session_token) + if vector_session_token_cmp.global_lsn > vector_session_token.global_lsn: + children_more_updated = False + else: + parent_more_updated = False + feed_ranges_to_remove = [overlapping_ranges[i] for i in merged_indices] + for feed_range_to_remove in feed_ranges_to_remove: + overlapping_ranges.remove(feed_range_to_remove) + if children_more_updated: + overlapping_ranges.append((merged_range, ','.join(map(str, session_tokens)))) + overlapping_ranges.remove(overlapping_ranges[0]) + elif not parent_more_updated and not children_more_updated: + session_tokens.append(session_token_cmp) + overlapping_ranges.append((merged_range, ','.join(map(str, session_tokens)))) + not_found = False + break + + j += 1 + + processed_ranges.append(overlapping_ranges[0]) + overlapping_ranges.remove(overlapping_ranges[0]) + return processed_ranges + +def get_latest_session_token(feed_ranges_to_session_tokens: List[Tuple[Dict[str, Any], str]], + target_feed_range: Dict[str, Any]): + + target_feed_range_epk = FeedRangeInternalEpk.from_json(target_feed_range) + target_feed_range_normalized = target_feed_range_epk.get_normalized_range() + # filter out tuples that overlap with target_feed_range and normalizes all the ranges + overlapping_ranges = [] + for feed_range_to_session_token in feed_ranges_to_session_tokens: + feed_range_epk = FeedRangeInternalEpk.from_json(feed_range_to_session_token[0]) + if Range.overlaps(target_feed_range_normalized, + feed_range_epk.get_normalized_range()): + overlapping_ranges.append((feed_range_epk.get_normalized_range(), + feed_range_to_session_token[1])) + + if len(overlapping_ranges) == 0: + raise ValueError('There were no overlapping feed ranges with the target.') + + # merge any session tokens that are the same exact feed range + i = 0 + j = 1 + while i < len(overlapping_ranges) and j < len(overlapping_ranges): + cur_feed_range = overlapping_ranges[i][0] + session_token = overlapping_ranges[i][1] + session_token_1 = overlapping_ranges[j][1] + if (not is_compound_session_token(session_token) and + not is_compound_session_token(session_token_1) and + cur_feed_range == overlapping_ranges[j][0]): + session_token = merge_session_tokens_with_same_range(session_token, session_token_1) + feed_ranges_to_remove = [overlapping_ranges[i], overlapping_ranges[j]] + for feed_range_to_remove in feed_ranges_to_remove: + overlapping_ranges.remove(feed_range_to_remove) + overlapping_ranges.append((cur_feed_range, session_token)) + i, j = 0, 1 + else: + j += 1 + if j == len(overlapping_ranges): + i += 1 + j = i + 1 + + # checking for merging of feed ranges that can be created from other feed ranges + processed_ranges = merge_ranges_with_subsets(overlapping_ranges) + + # break up session tokens that are compound + remaining_session_tokens = split_compound_session_tokens(processed_ranges) + + if len(remaining_session_tokens) == 1: + return remaining_session_tokens[0] + # merging any session tokens with same physical partition key range id + remaining_session_tokens = merge_session_tokens_for_same_partition(remaining_session_tokens) + + updated_session_token = "" + # compound the remaining session tokens + for i, remaining_session_token in enumerate(remaining_session_tokens): + if i == len(remaining_session_tokens) - 1: + updated_session_token += remaining_session_token + else: + updated_session_token += remaining_session_token + "," + + return updated_session_token diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py index 1a12b1b80946..b37e3cf17c6f 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py @@ -44,6 +44,7 @@ _set_properties_cache ) from .._routing.routing_range import Range +from .._session_token_helpers import get_latest_session_token from ..offer import ThroughputProperties from ..partition_key import ( NonePartitionKeyValue, @@ -1304,9 +1305,9 @@ async def read_feed_ranges( :returns: A list representing the feed ranges in base64 encoded string :rtype: Iterable[Dict[str, Any]] - .. note:: - For each feed range, even through a Dict has been returned, but in the future, the structure may change. - Please just treat it as opaque and do not take any dependent on it. + .. warning:: + The structure of the dict representation of a feed range may vary, including which keys + are present. It therefore should only be treated as an opaque value. """ if force_refresh is True: @@ -1321,5 +1322,59 @@ async def read_feed_ranges( feed_ranges = [FeedRangeInternalEpk(Range.PartitionKeyRangeToRange(partitionKeyRange)).to_dict() for partitionKeyRange in partition_key_ranges] - return (feed_range for feed_range in feed_ranges) + + async def get_latest_session_token( + self, + feed_ranges_to_session_tokens: List[Tuple[Dict[str, Any], str]], + target_feed_range: Dict[str, Any] + ) -> str: + """ **provisional** This method is still in preview and may be subject to breaking changes. + + Gets the the most up to date session token from the list of session token and feed + range tuples for a specific target feed range. The feed range can be obtained from a partition key + or by reading the container feed ranges. This should only be used if maintaining own session token or else + the CosmosClient instance will keep track of session token. Session tokens and feed ranges are + scoped to a container. Only input session tokens and feed ranges obtained from the same container. + :param feed_ranges_to_session_tokens: List of feed range and session token tuples. + :type feed_ranges_to_session_tokens: List[Tuple[Dict[str, Any], str]] + :param target_feed_range: feed range to get most up to date session token. + :type target_feed_range: Dict[str, Any] + :returns: a session token + :rtype: str + """ + return get_latest_session_token(feed_ranges_to_session_tokens, target_feed_range) + + async def feed_range_from_partition_key(self, partition_key: PartitionKeyType) -> Dict[str, Any]: + """ Gets the feed range for a given partition key. + :param partition_key: partition key to get feed range. + :type partition_key: PartitionKeyType + :returns: a feed range + :rtype: Dict[str, Any] + + .. warning:: + The structure of the dict representation of a feed range may vary, including which keys + are present. It therefore should only be treated as an opaque value. + + """ + return FeedRangeInternalEpk(await self._get_epk_range_for_partition_key(partition_key)).to_dict() + + async def is_feed_range_subset(self, parent_feed_range: Dict[str, Any], + child_feed_range: Dict[str, Any]) -> bool: + """Checks if child feed range is a subset of parent feed range. + :param parent_feed_range: left feed range + :type parent_feed_range: Dict[str, Any] + :param child_feed_range: right feed range + :type child_feed_range: Dict[str, Any] + :returns: a boolean indicating if child feed range is a subset of parent feed range + :rtype: bool + + .. warning:: + The structure of the dict representation of a feed range may vary, including which keys + are present. It therefore should only be treated as an opaque value. + + """ + parent_feed_range_epk = FeedRangeInternalEpk.from_json(parent_feed_range) + child_feed_range_epk = FeedRangeInternalEpk.from_json(child_feed_range) + return child_feed_range_epk.get_normalized_range().is_subset( + parent_feed_range_epk.get_normalized_range()) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py index 7038938e6497..f0c27b449e63 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py @@ -42,6 +42,7 @@ from ._cosmos_client_connection import CosmosClientConnection from ._cosmos_responses import CosmosDict, CosmosList from ._routing.routing_range import Range +from ._session_token_helpers import get_latest_session_token from .offer import Offer, ThroughputProperties from .partition_key import ( NonePartitionKeyValue, @@ -1374,9 +1375,9 @@ def read_feed_ranges( :returns: A list representing the feed ranges in base64 encoded string :rtype: Iterable[Dict[str, Any]] - .. note:: - For each feed range, even through a Dict has been returned, but in the future, the structure may change. - Please just treat it as opaque and do not take any dependent on it. + .. warning:: + The structure of the dict representation of a feed range may vary, including which keys + are present. It therefore should only be treated as an opaque value. """ if force_refresh is True: @@ -1391,3 +1392,57 @@ def read_feed_ranges( feed_ranges = [FeedRangeInternalEpk(Range.PartitionKeyRangeToRange(partitionKeyRange)).to_dict() for partitionKeyRange in partition_key_ranges] return (feed_range for feed_range in feed_ranges) + + def get_latest_session_token( + self, + feed_ranges_to_session_tokens: List[Tuple[Dict[str, Any], str]], + target_feed_range: Dict[str, Any] + ) -> str: + """ **provisional** This method is still in preview and may be subject to breaking changes. + + Gets the the most up to date session token from the list of session token and feed + range tuples for a specific target feed range. The feed range can be obtained from a partition key + or by reading the container feed ranges. This should only be used if maintaining own session token or else + the CosmosClient instance will keep track of session token. Session tokens and feed ranges are + scoped to a container. Only input session tokens and feed ranges obtained from the same container. + :param feed_ranges_to_session_tokens: List of feed range and session token tuples. + :type feed_ranges_to_session_tokens: List[Tuple[Dict[str, Any], str]] + :param target_feed_range: feed range to get most up to date session token. + :type target_feed_range: Dict[str, Any] + :returns: a session token + :rtype: str + """ + return get_latest_session_token(feed_ranges_to_session_tokens, target_feed_range) + + def feed_range_from_partition_key(self, partition_key: PartitionKeyType) -> Dict[str, Any]: + """ Gets the feed range for a given partition key. + :param partition_key: partition key to get feed range. + :type partition_key: PartitionKeyType + :returns: a feed range + :rtype: Dict[str, Any] + + .. warning:: + The structure of the dict representation of a feed range may vary, including which keys + are present. It therefore should only be treated as an opaque value. + + """ + return FeedRangeInternalEpk(self._get_epk_range_for_partition_key(partition_key)).to_dict() + + def is_feed_range_subset(self, parent_feed_range: Dict[str, Any], child_feed_range: Dict[str, Any]) -> bool: + """ Checks if child feed range is a subset of parent feed range. + :param parent_feed_range: left feed range + :type parent_feed_range: Dict[str, Any] + :param child_feed_range: right feed range + :type child_feed_range: Dict[str, Any] + :returns: a boolean indicating if child feed range is a subset of parent feed range + :rtype: bool + + .. warning:: + The structure of the dict representation of a feed range may vary, including which keys + are present. It therefore should only be treated as an opaque value. + + """ + parent_feed_range_epk = FeedRangeInternalEpk.from_json(parent_feed_range) + child_feed_range_epk = FeedRangeInternalEpk.from_json(child_feed_range) + return child_feed_range_epk.get_normalized_range().is_subset( + parent_feed_range_epk.get_normalized_range()) diff --git a/sdk/cosmos/azure-cosmos/samples/examples.py b/sdk/cosmos/azure-cosmos/samples/examples.py index d6f56ef4a5c4..5b12a6870574 100644 --- a/sdk/cosmos/azure-cosmos/samples/examples.py +++ b/sdk/cosmos/azure-cosmos/samples/examples.py @@ -11,6 +11,7 @@ # All interaction with Cosmos DB starts with an instance of the CosmosClient # [START create_client] from azure.cosmos import exceptions, CosmosClient, PartitionKey +from typing import Dict, Any import os @@ -262,6 +263,21 @@ feed_ranges = list(container.read_feed_ranges()) # [END read_feed_ranges] +# Get a feed range from a partition key. +# [START feed_range_from_partition_key ] +feed_range_from_pk = container.feed_range_from_partition_key(["GA", "Atlanta", 30363]) +# [END feed_range_from_partition_key] + +# Figure out if a feed range is a subset of another feed range. +# This example sees in which feed range from the container a feed range from a partition key is part of. +# [START is_feed_range_subset] +parent_feed_range = {} +for feed_range in feed_ranges: + if container.is_feed_range_subset(feed_range, feed_range_from_pk): + parent_feed_range = feed_range + break +# [END is_feed_range_subset] + # Query a sorted list of items that were changed for one feed range # [START query_items_change_feed] for item in container.query_items_change_feed(feed_range=feed_ranges[0]): diff --git a/sdk/cosmos/azure-cosmos/samples/examples_async.py b/sdk/cosmos/azure-cosmos/samples/examples_async.py index c40183a3228b..9b9adb606fdb 100644 --- a/sdk/cosmos/azure-cosmos/samples/examples_async.py +++ b/sdk/cosmos/azure-cosmos/samples/examples_async.py @@ -268,6 +268,22 @@ async def examples_async(): feed_ranges = list(await container.read_feed_ranges()) # [END read_feed_ranges] + # Get a feed range from a partition key. + # [START feed_range_from_partition_key ] + feed_range_from_pk = await container.feed_range_from_partition_key(["GA", "Atlanta", 30363]) + # [END feed_range_from_partition_key] + + # Figure out if a feed range is a subset of another feed range. + # This example sees in which feed range from the container a feed range from a partition key is part of. + # [START is_feed_range_subset] + parent_feed_range = {} + for feed_range in feed_ranges: + if await container.is_feed_range_subset(feed_range, feed_range_from_pk): + parent_feed_range = feed_range + break + # [END is_feed_range_subset] + + # Query a sorted list of items that were changed for one feed range. # The asynchronous client returns asynchronous iterators for its query methods; # as such, we iterate over it by using an async for loop diff --git a/sdk/cosmos/azure-cosmos/samples/session_token_management.py b/sdk/cosmos/azure-cosmos/samples/session_token_management.py new file mode 100644 index 000000000000..5915f5fa50b7 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/samples/session_token_management.py @@ -0,0 +1,147 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See LICENSE.txt in the project root for +# license information. +# ------------------------------------------------------------------------- +import json +import random +import uuid + +from azure.cosmos import PartitionKey +from azure.cosmos import CosmosClient +import azure.cosmos.exceptions as exceptions + +import config +from azure.identity import DefaultAzureCredential +from azure.cosmos.http_constants import HttpHeaders + +# ---------------------------------------------------------------------------------------------------------- +# Prerequisites - +# +# 1. An Azure Cosmos account - +# https://azure.microsoft.com/en-us/documentation/articles/documentdb-create-account/ +# +# 2. Microsoft Azure Cosmos PyPi package - +# https://pypi.python.org/pypi/azure-cosmos/ +# ---------------------------------------------------------------------------------------------------------- +# Sample - demonstrates how to manage session tokens. By default, the SDK manages session tokens for you. These samples +# are for use cases where you want to manage session tokens yourself. +# +# 1. Storing session tokens in a cache by feed range from the partition key. +# +# 2. Storing session tokens in a cache by feed range from the container. +# +# ---------------------------------------------------------------------------------------------------------- +# Note - +# +# Running this sample will create (and delete) multiple Containers on your account. +# Each time a Container is created the account will be billed for 1 hour of usage based on +# the provisioned throughput (RU/s) of that account. +# ---------------------------------------------------------------------------------------------------------- + +HOST = config.settings['host'] +CREDENTIAL = DefaultAzureCredential() +DATABASE_ID = config.settings['database_id'] +CONTAINER_ID = config.settings['container_id'] + +def storing_session_tokens_pk(container): + print('1. Storing session tokens in a cache by feed range from the partition key.') + + + cache = {} + + # Everything below is just a simulation of what could be run on different machines and clients + # to store session tokens in a cache by feed range from the partition key. + # The cache is a Dict here for simplicity but in a real-world scenario, it would be some service. + feed_ranges_and_session_tokens = [] + previous_session_token = "" + + # populating cache with session tokens + for i in range(5): + item = { + 'id': 'item' + str(uuid.uuid4()), + 'name': 'sample', + 'pk': 'A' + str(random.randint(1, 10)) + } + target_feed_range = container.feed_range_from_partition_key(item['pk']) + response = container.create_item(item, session_token=previous_session_token) + session_token = response.get_response_headers()[HttpHeaders.SessionToken] + # adding everything in the cache in case consolidation is possible + for feed_range_json, session_token_cache in cache.items(): + feed_range = json.loads(feed_range_json) + feed_ranges_and_session_tokens.append((feed_range, session_token_cache)) + feed_ranges_and_session_tokens.append((target_feed_range, session_token)) + latest_session_token = container.get_latest_session_token(feed_ranges_and_session_tokens, target_feed_range) + # only doing this for the key to be immutable + feed_range_json = json.dumps(target_feed_range) + cache[feed_range_json] = latest_session_token + previous_session_token = session_token + + +def storing_session_tokens_container_feed_ranges(container): + print('2. Storing session tokens in a cache by feed range from the container.') + + # The cache is a dictionary here for simplicity but in a real-world scenario, it would be some service. + cache = {} + + # Everything below is just a simulation of what could be run on different machines and clients + # to store session tokens in a cache by feed range from the partition key. + feed_ranges_and_session_tokens = [] + previous_session_token = "" + feed_ranges = list(container.read_feed_ranges()) + + # populating cache with session tokens + for i in range(5): + item = { + 'id': 'item' + str(uuid.uuid4()), + 'name': 'sample', + 'pk': 'A' + str(random.randint(1, 10)) + } + feed_range_from_pk = container.feed_range_from_partition_key(item['pk']) + response = container.create_item(item, session_token=previous_session_token) + session_token = response.get_response_headers()[HttpHeaders.SessionToken] + # adding everything in the cache in case consolidation is possible + + for feed_range_json, session_token_cache in cache.items(): + feed_range = json.loads(feed_range_json) + feed_ranges_and_session_tokens.append((feed_range, session_token_cache)) + target_feed_range = next( + (feed_range for feed_range in feed_ranges if container.is_feed_range_subset(feed_range, feed_range_from_pk)), + {} + ) + feed_ranges_and_session_tokens.append((target_feed_range, session_token)) + latest_session_token = container.get_latest_session_token(feed_ranges_and_session_tokens, target_feed_range) + # only doing this for the key to be immutable + feed_range_json = json.dumps(target_feed_range) + cache[feed_range_json] = latest_session_token + previous_session_token = session_token + + +def run_sample(): + with CosmosClient(HOST, CREDENTIAL) as client: + try: + db = client.create_database_if_not_exists(id=DATABASE_ID) + container = db.create_container_if_not_exists(id=CONTAINER_ID, partition_key=PartitionKey('/pk')) + + # example of storing session tokens in cache by feed range from the partition key + storing_session_tokens_pk(container) + + # example of storing session tokens in cache by feed range from the container + storing_session_tokens_container_feed_ranges(container) + + # cleanup database after sample + try: + client.delete_database(db) + + except exceptions.CosmosResourceNotFoundError: + pass + + except exceptions.CosmosHttpResponseError as e: + print('\nrun_sample has caught an error. {0}'.format(e.message)) + + finally: + print("\nrun_sample done") + + +if __name__ == '__main__': + run_sample() diff --git a/sdk/cosmos/azure-cosmos/samples/session_token_management_async.py b/sdk/cosmos/azure-cosmos/samples/session_token_management_async.py new file mode 100644 index 000000000000..ff9a17946d69 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/samples/session_token_management_async.py @@ -0,0 +1,149 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See LICENSE.txt in the project root for +# license information. +# ------------------------------------------------------------------------- +import json +import random +import uuid + +from azure.cosmos import PartitionKey +from azure.cosmos.aio import CosmosClient +import azure.cosmos.exceptions as exceptions + +import asyncio +import config +from azure.identity import DefaultAzureCredential +from azure.cosmos.http_constants import HttpHeaders + +# ---------------------------------------------------------------------------------------------------------- +# Prerequisites - +# +# 1. An Azure Cosmos account - +# https://azure.microsoft.com/en-us/documentation/articles/documentdb-create-account/ +# +# 2. Microsoft Azure Cosmos PyPi package - +# https://pypi.python.org/pypi/azure-cosmos/ +# ---------------------------------------------------------------------------------------------------------- +# Sample - demonstrates how to manage session tokens. By default, the SDK manages session tokens for you. These samples +# are for use cases where you want to manage session tokens yourself. +# +# 1. Storing session tokens in a cache by feed range from the partition key. +# +# 2. Storing session tokens in a cache by feed range from the container. +# +# ---------------------------------------------------------------------------------------------------------- +# Note - +# +# Running this sample will create (and delete) multiple Containers on your account. +# Each time a Container is created the account will be billed for 1 hour of usage based on +# the provisioned throughput (RU/s) of that account. +# ---------------------------------------------------------------------------------------------------------- + +HOST = config.settings['host'] +CREDENTIAL = DefaultAzureCredential() +DATABASE_ID = config.settings['database_id'] +CONTAINER_ID = config.settings['container_id'] + +async def storing_session_tokens_pk(container): + print('1. Storing session tokens in a cache by feed range from the partition key.') + + + cache = {} + + # Everything below is just a simulation of what could be run on different machines and clients + # to store session tokens in a cache by feed range from the partition key. + # The cache is a Dict here for simplicity but in a real-world scenario, it would be some service. + feed_ranges_and_session_tokens = [] + previous_session_token = "" + + # populating cache with session tokens + for i in range(5): + item = { + 'id': 'item' + str(uuid.uuid4()), + 'name': 'sample', + 'pk': 'A' + str(random.randint(1, 10)) + } + target_feed_range = await container.feed_range_from_partition_key(item['pk']) + response = await container.create_item(item, session_token=previous_session_token) + session_token = response.get_response_headers()[HttpHeaders.SessionToken] + # adding everything in the cache in case consolidation is possible + for feed_range_json, session_token_cache in cache.items(): + feed_range = json.loads(feed_range_json) + feed_ranges_and_session_tokens.append((feed_range, session_token_cache)) + feed_ranges_and_session_tokens.append((target_feed_range, session_token)) + latest_session_token = await container.get_latest_session_token(feed_ranges_and_session_tokens, target_feed_range) + # only doing this for the key to be immutable + feed_range_json = json.dumps(target_feed_range) + cache[feed_range_json] = latest_session_token + previous_session_token = session_token + + +async def storing_session_tokens_container_feed_ranges(container): + print('2. Storing session tokens in a cache by feed range from the container.') + + # The cache is a dictionary here for simplicity but in a real-world scenario, it would be some service. + cache = {} + + # Everything below is just a simulation of what could be run on different machines and clients + # to store session tokens in a cache by feed range from the partition key. + feed_ranges_and_session_tokens = [] + previous_session_token = "" + feed_ranges = list(await container.read_feed_ranges()) + + # populating cache with session tokens + for i in range(5): + item = { + 'id': 'item' + str(uuid.uuid4()), + 'name': 'sample', + 'pk': 'A' + str(random.randint(1, 10)) + } + feed_range_from_pk = await container.feed_range_from_partition_key(item['pk']) + response = await container.create_item(item, session_token=previous_session_token) + session_token = response.get_response_headers()[HttpHeaders.SessionToken] + # adding everything in the cache in case consolidation is possible + + for feed_range_json, session_token_cache in cache.items(): + feed_range = json.loads(feed_range_json) + feed_ranges_and_session_tokens.append((feed_range, session_token_cache)) + target_feed_range = {} + for feed_range in feed_ranges: + if await container.is_feed_range_subset(feed_range, feed_range_from_pk): + target_feed_range = feed_range + break + feed_ranges_and_session_tokens.append((target_feed_range, session_token)) + latest_session_token = await container.get_latest_session_token(feed_ranges_and_session_tokens, target_feed_range) + # only doing this for the key to be immutable + feed_range_json = json.dumps(target_feed_range) + cache[feed_range_json] = latest_session_token + previous_session_token = session_token + + +async def run_sample(): + async with CosmosClient(HOST, CREDENTIAL) as client: + try: + db = await client.create_database_if_not_exists(id=DATABASE_ID) + container = await db.create_container_if_not_exists(id=CONTAINER_ID, partition_key=PartitionKey('/pk')) + + # example of storing session tokens in cache by feed range from the partition key + await storing_session_tokens_pk(container) + + # example of storing session tokens in cache by feed range from the container + await storing_session_tokens_container_feed_ranges(container) + + # cleanup database after sample + try: + await client.delete_database(db) + + except exceptions.CosmosResourceNotFoundError: + pass + + except exceptions.CosmosHttpResponseError as e: + print('\nrun_sample has caught an error. {0}'.format(e.message)) + + finally: + print("\nrun_sample done") + + +if __name__ == '__main__': + asyncio.run(run_sample()) diff --git a/sdk/cosmos/azure-cosmos/test/test_change_feed_split_async.py b/sdk/cosmos/azure-cosmos/test/test_change_feed_split_async.py index 60f7b2810884..7c13f4682d02 100644 --- a/sdk/cosmos/azure-cosmos/test/test_change_feed_split_async.py +++ b/sdk/cosmos/azure-cosmos/test/test_change_feed_split_async.py @@ -88,7 +88,7 @@ async def test_query_change_feed_with_split_async(self): actual_ids.append(item['id']) assert actual_ids == expected_ids - self.created_database.delete_container(created_collection.id) + await self.created_database.delete_container(created_collection.id) if __name__ == '__main__': unittest.main() \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos/test/test_feed_range.py b/sdk/cosmos/azure-cosmos/test/test_feed_range.py new file mode 100644 index 000000000000..f44c39e034a9 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/test/test_feed_range.py @@ -0,0 +1,123 @@ +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation. All rights reserved. +import unittest +import uuid + +import pytest + +import azure.cosmos.cosmos_client as cosmos_client +import azure.cosmos.partition_key as partition_key +import test_config + +from azure.cosmos._change_feed.feed_range_internal import FeedRangeInternalEpk +from azure.cosmos._routing.routing_range import Range + +@pytest.fixture(scope="class") +def setup(): + if (TestFeedRange.masterKey == '[YOUR_KEY_HERE]' or + TestFeedRange.host == '[YOUR_ENDPOINT_HERE]'): + raise Exception( + "You must specify your Azure Cosmos account values for " + "'masterKey' and 'host' at the top of this class to run the " + "tests.") + test_client = cosmos_client.CosmosClient(TestFeedRange.host, test_config.TestConfig.masterKey), + created_db = test_client[0].get_database_client(TestFeedRange.TEST_DATABASE_ID) + return { + "created_db": created_db, + "created_collection": created_db.get_container_client(TestFeedRange.TEST_CONTAINER_ID) + } + +test_subset_ranges = [(Range("", "FF", True, False), + Range("3F", "7F", True, False), + True), + (Range("3F", "7F", True, False), + Range("", "FF", True, False), + False), + (Range("3F", "7F", True, False), + Range("", "5F", True, False), + False), + (Range("3F", "7F", True, True), + Range("3F", "7F", True, True), + True), + (Range("3F", "7F", False, True), + Range("3F", "7F", True, True), + False), + (Range("3F", "7F", True, False), + Range("3F", "7F", True, True), + False), + (Range("3F", "7F", True, False), + Range("", "2F", True, False), + False), + (Range("3F", "3F", True, True), + Range("3F", "3F", True, True), + True), + (Range("3F", "3F", True, True), + Range("4F", "4F", True, True), + False) + ] + + +test_overlaps_ranges = [(Range("", "FF", True, False), + Range("3F", "7F", True, False), + True), + (Range("3F", "7F", True, False), + Range("", "FF", True, False), + True), + (Range("3F", "7F", True, False), + Range("", "5F", True, False), + True), + (Range("3F", "7F", True, False), + Range("3F", "7F", True, False), + True), + (Range("3F", "7F", True, False), + Range("", "2F", True, False), + False), + (Range("3F", "7F", True, False), + Range("6F", "FF", True, False), + True), + (Range("AA", "BB", True, False), + Range("CC", "FF", True, False), + False)] + +@pytest.mark.cosmosEmulator +@pytest.mark.unittest +@pytest.mark.usefixtures("setup") +class TestFeedRange: + """Tests to verify methods for operations on feed ranges + """ + + host = test_config.TestConfig.host + masterKey = test_config.TestConfig.masterKey + TEST_DATABASE_ID = test_config.TestConfig.TEST_DATABASE_ID + TEST_CONTAINER_ID = test_config.TestConfig.TEST_MULTI_PARTITION_CONTAINER_ID + + + def test_partition_key_to_feed_range(self, setup): + created_container = setup["created_db"].create_container( + id='container_' + str(uuid.uuid4()), + partition_key=partition_key.PartitionKey(path="/id") + ) + feed_range = created_container.feed_range_from_partition_key("1") + feed_range_epk = FeedRangeInternalEpk.from_json(feed_range) + assert feed_range_epk.get_normalized_range() == Range("3C80B1B7310BB39F29CC4EA05BDD461E", + "3c80b1b7310bb39f29cc4ea05bdd461f", True, False) + setup["created_db"].delete_container(created_container) + + @pytest.mark.parametrize("parent_feed_range, child_feed_range, is_subset", test_subset_ranges) + def test_feed_range_is_subset(self, setup, parent_feed_range, child_feed_range, is_subset): + epk_parent_feed_range = FeedRangeInternalEpk(parent_feed_range).to_dict() + epk_child_feed_range = FeedRangeInternalEpk(child_feed_range).to_dict() + assert setup["created_collection"].is_feed_range_subset(epk_parent_feed_range, epk_child_feed_range) == is_subset + + def test_feed_range_is_subset_from_pk(self, setup): + epk_parent_feed_range = FeedRangeInternalEpk( + Range("", "FF", True, False)).to_dict() + epk_child_feed_range = setup["created_collection"].feed_range_from_partition_key("1") + assert setup["created_collection"].is_feed_range_subset(epk_parent_feed_range, epk_child_feed_range) + + @pytest.mark.parametrize("range1, range2, overlaps", test_overlaps_ranges) + def test_overlaps(self, setup, range1, range2, overlaps): + assert Range.overlaps(range1, range2) == overlaps + +if __name__ == '__main__': + unittest.main() diff --git a/sdk/cosmos/azure-cosmos/test/test_feed_range_async.py b/sdk/cosmos/azure-cosmos/test/test_feed_range_async.py new file mode 100644 index 000000000000..55d91ef8e662 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/test/test_feed_range_async.py @@ -0,0 +1,64 @@ +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation. All rights reserved. +import unittest +import uuid + +import pytest +import pytest_asyncio + +import azure.cosmos.partition_key as partition_key +import test_config +from azure.cosmos._change_feed.feed_range_internal import FeedRangeInternalEpk +from azure.cosmos._routing.routing_range import Range +from azure.cosmos.aio import CosmosClient + + +@pytest_asyncio.fixture() +def setup(): + if (TestFeedRangeAsync.masterKey == '[YOUR_KEY_HERE]' or + TestFeedRangeAsync.host == '[YOUR_ENDPOINT_HERE]'): + raise Exception( + "You must specify your Azure Cosmos account values for " + "'masterKey' and 'host' at the top of this class to run the " + "tests.") + test_client = CosmosClient(TestFeedRangeAsync.host, test_config.TestConfig.masterKey), + created_db = test_client[0].get_database_client(TestFeedRangeAsync.TEST_DATABASE_ID) + return { + "created_db": created_db, + "created_collection": created_db.get_container_client(TestFeedRangeAsync.TEST_CONTAINER_ID) + } + +@pytest.mark.cosmosEmulator +@pytest.mark.asyncio +@pytest.mark.usefixtures("setup") +class TestFeedRangeAsync: + """Tests to verify methods for operations on feed ranges + """ + + host = test_config.TestConfig.host + masterKey = test_config.TestConfig.masterKey + TEST_DATABASE_ID = test_config.TestConfig.TEST_DATABASE_ID + TEST_CONTAINER_ID = test_config.TestConfig.TEST_MULTI_PARTITION_CONTAINER_ID + + + async def test_partition_key_to_feed_range(self, setup): + created_container = await setup["created_db"].create_container( + id='container_' + str(uuid.uuid4()), + partition_key=partition_key.PartitionKey(path="/id") + ) + feed_range = await created_container.feed_range_from_partition_key("1") + feed_range_epk = FeedRangeInternalEpk.from_json(feed_range) + assert feed_range_epk.get_normalized_range() == Range("3C80B1B7310BB39F29CC4EA05BDD461E", + "3c80b1b7310bb39f29cc4ea05bdd461f", True, False) + await setup["created_db"].delete_container(created_container) + + async def test_feed_range_is_subset_from_pk(self, setup): + epk_parent_feed_range = FeedRangeInternalEpk(Range("", + "FF", + True, + False)).to_dict() + epk_child_feed_range = await setup["created_collection"].feed_range_from_partition_key("1") + assert setup["created_collection"].is_feed_range_subset(epk_parent_feed_range, epk_child_feed_range) + +if __name__ == '__main__': + unittest.main() diff --git a/sdk/cosmos/azure-cosmos/test/test_latest_session_token.py b/sdk/cosmos/azure-cosmos/test/test_latest_session_token.py new file mode 100644 index 000000000000..7095c22b2279 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/test/test_latest_session_token.py @@ -0,0 +1,214 @@ +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation. All rights reserved. +import random +import time +import unittest +import uuid + + +import azure.cosmos.cosmos_client as cosmos_client +import test_config +from azure.cosmos import DatabaseProxy, PartitionKey +from azure.cosmos._change_feed.feed_range_internal import FeedRangeInternalEpk +from azure.cosmos._session_token_helpers import is_compound_session_token, parse_session_token +from azure.cosmos.http_constants import HttpHeaders + + +def create_item(hpk): + if hpk: + item = { + 'id': 'item' + str(uuid.uuid4()), + 'name': 'sample', + 'state': 'CA', + 'city': 'LA' + str(random.randint(1, 10)), + 'zipcode': '90001' + } + else: + item = { + 'id': 'item' + str(uuid.uuid4()), + 'name': 'sample', + 'pk': 'A' + str(random.randint(1, 10)) + } + return item + + +class TestLatestSessionToken(unittest.TestCase): + """Test for session token helpers""" + + created_db: DatabaseProxy = None + client: cosmos_client.CosmosClient = None + host = test_config.TestConfig.host + masterKey = test_config.TestConfig.masterKey + configs = test_config.TestConfig + TEST_DATABASE_ID = configs.TEST_DATABASE_ID + + @classmethod + def setUpClass(cls): + cls.client = cosmos_client.CosmosClient(cls.host, cls.masterKey) + cls.database = cls.client.get_database_client(cls.TEST_DATABASE_ID) + + + def test_latest_session_token_from_logical_pk(self): + container = self.database.create_container("test_updated_session_token_from_logical_pk" + str(uuid.uuid4()), + PartitionKey(path="/pk"), + offer_throughput=400) + feed_ranges_and_session_tokens = [] + previous_session_token = "" + target_pk = 'A1' + target_feed_range = container.feed_range_from_partition_key(target_pk) + target_session_token, previous_session_token = self.create_items_logical_pk(container, target_feed_range, + previous_session_token, + feed_ranges_and_session_tokens) + session_token = container.get_latest_session_token(feed_ranges_and_session_tokens, target_feed_range) + + assert session_token == target_session_token + feed_ranges_and_session_tokens.append((target_feed_range, session_token)) + + self.trigger_split(container, 11000) + + target_session_token, _ = self.create_items_logical_pk(container, target_feed_range, session_token, + feed_ranges_and_session_tokens) + target_feed_range = container.feed_range_from_partition_key(target_pk) + session_token = container.get_latest_session_token(feed_ranges_and_session_tokens, target_feed_range) + + assert session_token == target_session_token + self.database.delete_container(container.id) + + def test_latest_session_token_from_physical_pk(self): + container = self.database.create_container("test_updated_session_token_from_physical_pk" + str(uuid.uuid4()), + PartitionKey(path="/pk"), + offer_throughput=400) + feed_ranges_and_session_tokens = [] + previous_session_token = "" + pk_feed_range = container.feed_range_from_partition_key('A1') + target_session_token, target_feed_range, previous_session_token = self.create_items_physical_pk(container, pk_feed_range, + previous_session_token, + feed_ranges_and_session_tokens) + + session_token = container.get_latest_session_token(feed_ranges_and_session_tokens, target_feed_range) + assert session_token == target_session_token + + self.trigger_split(container, 11000) + + _, target_feed_range, previous_session_token = self.create_items_physical_pk(container, pk_feed_range, + session_token, + feed_ranges_and_session_tokens) + + session_token = container.get_latest_session_token(feed_ranges_and_session_tokens, target_feed_range) + assert is_compound_session_token(session_token) + session_tokens = session_token.split(",") + assert len(session_tokens) == 2 + pk_range_id1, session_token1 = parse_session_token(session_tokens[0]) + pk_range_id2, session_token2 = parse_session_token(session_tokens[1]) + pk_range_ids = [pk_range_id1, pk_range_id2] + + assert 320 == (session_token1.global_lsn + session_token2.global_lsn) + assert '1' in pk_range_ids + assert '2' in pk_range_ids + self.database.delete_container(container.id) + + def test_latest_session_token_hpk(self): + container = self.database.create_container("test_updated_session_token_hpk" + str(uuid.uuid4()), + PartitionKey(path=["/state", "/city", "/zipcode"], kind="MultiHash"), + offer_throughput=400) + feed_ranges_and_session_tokens = [] + previous_session_token = "" + pk = ['CA', 'LA1', '90001'] + pk_feed_range = container.feed_range_from_partition_key(pk) + target_session_token, target_feed_range, previous_session_token = self.create_items_physical_pk(container, + pk_feed_range, + previous_session_token, + feed_ranges_and_session_tokens, + True) + + session_token = container.get_latest_session_token(feed_ranges_and_session_tokens, target_feed_range) + assert session_token == target_session_token + self.database.delete_container(container.id) + + + def test_latest_session_token_logical_hpk(self): + container = self.database.create_container("test_updated_session_token_from_logical_hpk" + str(uuid.uuid4()), + PartitionKey(path=["/state", "/city", "/zipcode"], kind="MultiHash"), + offer_throughput=400) + feed_ranges_and_session_tokens = [] + previous_session_token = "" + target_pk = ['CA', 'LA1', '90001'] + target_feed_range = container.feed_range_from_partition_key(target_pk) + target_session_token, previous_session_token = self.create_items_logical_pk(container, target_feed_range, + previous_session_token, + feed_ranges_and_session_tokens, + True) + session_token = container.get_latest_session_token(feed_ranges_and_session_tokens, target_feed_range) + + assert session_token == target_session_token + self.database.delete_container(container.id) + + + @staticmethod + def trigger_split(container, throughput): + print("Triggering a split in session token helpers") + container.replace_throughput(throughput) + print("changed offer to 11k") + print("--------------------------------") + print("Waiting for split to complete") + start_time = time.time() + + while True: + offer = container.get_throughput() + if offer.properties['content'].get('isOfferReplacePending', False): + if time.time() - start_time > 60 * 25: # timeout test at 25 minutes + unittest.skip("Partition split didn't complete in time.") + else: + print("Waiting for split to complete") + time.sleep(60) + else: + break + print("Split in session token helpers has completed") + + @staticmethod + def create_items_logical_pk(container, target_pk_range, previous_session_token, feed_ranges_and_session_tokens, hpk=False): + target_session_token = "" + for i in range(100): + item = create_item(hpk) + response = container.create_item(item, session_token=previous_session_token) + session_token = response.get_response_headers()[HttpHeaders.SessionToken] + pk = item['pk'] if not hpk else [item['state'], item['city'], item['zipcode']] + pk_range = container.feed_range_from_partition_key(pk) + pk_feed_range_epk = FeedRangeInternalEpk.from_json(pk_range) + target_feed_range_epk = FeedRangeInternalEpk.from_json(target_pk_range) + if (pk_feed_range_epk.get_normalized_range() == + target_feed_range_epk.get_normalized_range()): + target_session_token = session_token + previous_session_token = session_token + feed_ranges_and_session_tokens.append((pk_range, + session_token)) + return target_session_token, previous_session_token + + @staticmethod + def create_items_physical_pk(container, pk_feed_range, previous_session_token, feed_ranges_and_session_tokens, hpk=False): + target_session_token = "" + container_feed_ranges = list(container.read_feed_ranges()) + target_feed_range = None + for feed_range in container_feed_ranges: + if container.is_feed_range_subset(feed_range, pk_feed_range): + target_feed_range = feed_range + break + + for i in range(100): + item = create_item(hpk) + response = container.create_item(item, session_token=previous_session_token) + session_token = response.get_response_headers()[HttpHeaders.SessionToken] + if hpk: + pk = [item['state'], item['city'], item['zipcode']] + curr_feed_range = container.feed_range_from_partition_key(pk) + else: + curr_feed_range = container.feed_range_from_partition_key(item['pk']) + if container.is_feed_range_subset(target_feed_range, curr_feed_range): + target_session_token = session_token + previous_session_token = session_token + feed_ranges_and_session_tokens.append((curr_feed_range, session_token)) + + return target_session_token, target_feed_range, previous_session_token + +if __name__ == '__main__': + unittest.main() diff --git a/sdk/cosmos/azure-cosmos/test/test_latest_session_token_async.py b/sdk/cosmos/azure-cosmos/test/test_latest_session_token_async.py new file mode 100644 index 000000000000..b99751af82e4 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/test/test_latest_session_token_async.py @@ -0,0 +1,217 @@ +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation. All rights reserved. +import random +import time +import unittest +import uuid + + +import test_config +from azure.cosmos import PartitionKey +from azure.cosmos._change_feed.feed_range_internal import FeedRangeInternalEpk +from azure.cosmos._session_token_helpers import is_compound_session_token, parse_session_token +from azure.cosmos.aio import DatabaseProxy +from azure.cosmos.aio import CosmosClient +from azure.cosmos.http_constants import HttpHeaders + + +def create_item(hpk): + if hpk: + item = { + 'id': 'item' + str(uuid.uuid4()), + 'name': 'sample', + 'state': 'CA', + 'city': 'LA' + str(random.randint(1, 10)), + 'zipcode': '90001' + } + else: + item = { + 'id': 'item' + str(uuid.uuid4()), + 'name': 'sample', + 'pk': 'A' + str(random.randint(1, 10)) + } + return item + + +class TestLatestSessionTokenAsync(unittest.IsolatedAsyncioTestCase): + """Test for session token helpers""" + + created_db: DatabaseProxy = None + client: CosmosClient = None + host = test_config.TestConfig.host + masterKey = test_config.TestConfig.masterKey + configs = test_config.TestConfig + TEST_DATABASE_ID = configs.TEST_DATABASE_ID + + async def asyncSetUp(self): + self.client = CosmosClient(self.host, self.masterKey) + self.database = self.client.get_database_client(self.TEST_DATABASE_ID) + + async def tearDown(self): + await self.client.delete_database(self.database.id) + await self.client.close() + + async def test_latest_session_token_from_logical_pk(self): + container = await self.database.create_container("test_updated_session_token_from_logical_pk" + str(uuid.uuid4()), + PartitionKey(path="/pk"), + offer_throughput=400) + feed_ranges_and_session_tokens = [] + previous_session_token = "" + target_pk = 'A1' + target_feed_range = await container.feed_range_from_partition_key(target_pk) + target_session_token, previous_session_token = await self.create_items_logical_pk(container, target_feed_range, + previous_session_token, + feed_ranges_and_session_tokens) + session_token = await container.get_latest_session_token(feed_ranges_and_session_tokens, target_feed_range) + + assert session_token == target_session_token + feed_ranges_and_session_tokens.append((target_feed_range, session_token)) + + await self.trigger_split(container, 11000) + + target_session_token, _ = await self.create_items_logical_pk(container, target_feed_range, session_token, + feed_ranges_and_session_tokens) + target_feed_range = await container.feed_range_from_partition_key(target_pk) + session_token = await container.get_latest_session_token(feed_ranges_and_session_tokens, target_feed_range) + + assert session_token == target_session_token + await self.database.delete_container(container.id) + + async def test_latest_session_token_from_physical_pk(self): + container = await self.database.create_container("test_updated_session_token_from_physical_pk" + str(uuid.uuid4()), + PartitionKey(path="/pk"), + offer_throughput=400) + feed_ranges_and_session_tokens = [] + previous_session_token = "" + pk_feed_range = await container.feed_range_from_partition_key('A1') + target_session_token, target_feed_range, previous_session_token = await self.create_items_physical_pk(container, pk_feed_range, + previous_session_token, + feed_ranges_and_session_tokens) + + session_token = await container.get_latest_session_token(feed_ranges_and_session_tokens, target_feed_range) + assert session_token == target_session_token + + await self.trigger_split(container, 11000) + + _, target_feed_range, previous_session_token = await self.create_items_physical_pk(container, pk_feed_range, + session_token, + feed_ranges_and_session_tokens) + + session_token = await container.get_latest_session_token(feed_ranges_and_session_tokens, target_feed_range) + assert is_compound_session_token(session_token) + session_tokens = session_token.split(",") + assert len(session_tokens) == 2 + pk_range_id1, session_token1 = parse_session_token(session_tokens[0]) + pk_range_id2, session_token2 = parse_session_token(session_tokens[1]) + pk_range_ids = [pk_range_id1, pk_range_id2] + + assert 320 == (session_token1.global_lsn + session_token2.global_lsn) + assert '1' in pk_range_ids + assert '2' in pk_range_ids + await self.database.delete_container(container.id) + + async def test_latest_session_token_hpk(self): + container = await self.database.create_container("test_updated_session_token_hpk" + str(uuid.uuid4()), + PartitionKey(path=["/state", "/city", "/zipcode"], kind="MultiHash"), + offer_throughput=400) + feed_ranges_and_session_tokens = [] + previous_session_token = "" + pk = ['CA', 'LA1', '90001'] + pk_feed_range = await container.feed_range_from_partition_key(pk) + target_session_token, target_feed_range, previous_session_token = await self.create_items_physical_pk(container, + pk_feed_range, + previous_session_token, + feed_ranges_and_session_tokens, + True) + + session_token = await container.get_latest_session_token(feed_ranges_and_session_tokens, target_feed_range) + assert session_token == target_session_token + await self.database.delete_container(container.id) + + + async def test_latest_session_token_logical_hpk(self): + container = await self.database.create_container("test_updated_session_token_from_logical_hpk" + str(uuid.uuid4()), + PartitionKey(path=["/state", "/city", "/zipcode"], kind="MultiHash"), + offer_throughput=400) + feed_ranges_and_session_tokens = [] + previous_session_token = "" + target_pk = ['CA', 'LA1', '90001'] + target_feed_range = await container.feed_range_from_partition_key(target_pk) + target_session_token, previous_session_token = await self.create_items_logical_pk(container, target_feed_range, + previous_session_token, + feed_ranges_and_session_tokens, + True) + session_token = await container.get_latest_session_token(feed_ranges_and_session_tokens, target_feed_range) + + assert session_token == target_session_token + await self.database.delete_container(container.id) + + + @staticmethod + async def trigger_split(container, throughput): + print("Triggering a split in session token helpers") + await container.replace_throughput(throughput) + print("changed offer to 11k") + print("--------------------------------") + print("Waiting for split to complete") + start_time = time.time() + + while True: + offer = await container.get_throughput() + if offer.properties['content'].get('isOfferReplacePending', False): + if time.time() - start_time > 60 * 25: # timeout test at 25 minutes + unittest.skip("Partition split didn't complete in time.") + else: + print("Waiting for split to complete") + time.sleep(60) + else: + break + print("Split in session token helpers has completed") + + @staticmethod + async def create_items_logical_pk(container, target_pk_range, previous_session_token, feed_ranges_and_session_tokens, hpk=False): + target_session_token = "" + for i in range(100): + item = create_item(hpk) + response = await container.create_item(item, session_token=previous_session_token) + session_token = response.get_response_headers()[HttpHeaders.SessionToken] + pk = item['pk'] if not hpk else [item['state'], item['city'], item['zipcode']] + pk_feed_range = await container.feed_range_from_partition_key(pk) + pk_feed_range_epk = FeedRangeInternalEpk.from_json(pk_feed_range) + target_feed_range_epk = FeedRangeInternalEpk.from_json(target_pk_range) + if (pk_feed_range_epk.get_normalized_range() == + target_feed_range_epk.get_normalized_range()): + target_session_token = session_token + previous_session_token = session_token + feed_ranges_and_session_tokens.append((pk_feed_range, + session_token)) + return target_session_token, previous_session_token + + @staticmethod + async def create_items_physical_pk(container, pk_feed_range, previous_session_token, feed_ranges_and_session_tokens, hpk=False): + target_session_token = "" + container_feed_ranges = list(await container.read_feed_ranges()) + target_feed_range = None + for feed_range in container_feed_ranges: + if await container.is_feed_range_subset(feed_range, pk_feed_range): + target_feed_range = feed_range + break + + for i in range(100): + item = create_item(hpk) + response = await container.create_item(item, session_token=previous_session_token) + session_token = response.get_response_headers()[HttpHeaders.SessionToken] + if hpk: + pk = [item['state'], item['city'], item['zipcode']] + curr_feed_range = await container.feed_range_from_partition_key(pk) + else: + curr_feed_range = await container.feed_range_from_partition_key(item['pk']) + if await container.is_feed_range_subset(target_feed_range, curr_feed_range): + target_session_token = session_token + previous_session_token = session_token + feed_ranges_and_session_tokens.append((curr_feed_range, session_token)) + + return target_session_token, target_feed_range, previous_session_token + +if __name__ == '__main__': + unittest.main() diff --git a/sdk/cosmos/azure-cosmos/test/test_session_token_helpers.py b/sdk/cosmos/azure-cosmos/test/test_session_token_helpers.py new file mode 100644 index 000000000000..d733c213e788 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/test/test_session_token_helpers.py @@ -0,0 +1,174 @@ +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation. All rights reserved. + +import random +import unittest + +import pytest + +import azure.cosmos.cosmos_client as cosmos_client +import test_config +from azure.cosmos import DatabaseProxy +from azure.cosmos._change_feed.feed_range_internal import FeedRangeInternalEpk +from azure.cosmos._routing.routing_range import Range + +COLLECTION = "created_collection" +DATABASE = "created_db" +@pytest.fixture(scope="class") +def setup(): + if (TestSessionTokenHelpers.masterKey == '[YOUR_KEY_HERE]' or + TestSessionTokenHelpers.host == '[YOUR_ENDPOINT_HERE]'): + raise Exception( + "You must specify your Azure Cosmos account values for " + "'masterKey' and 'host' at the top of this class to run the " + "tests.") + test_client = cosmos_client.CosmosClient(TestSessionTokenHelpers.host, test_config.TestConfig.masterKey), + created_db = test_client[0].get_database_client(TestSessionTokenHelpers.TEST_DATABASE_ID) + return { + DATABASE: created_db, + COLLECTION: created_db.get_container_client(TestSessionTokenHelpers.TEST_COLLECTION_ID) + } + +def create_split_ranges(): + return [ # split with two children + ([(("AA", "DD"), "0:1#51#3=52"), (("AA", "BB"),"1:1#55#3=52"), (("BB", "DD"),"2:1#54#3=52")], + ("AA", "DD"), "1:1#55#3=52,2:1#54#3=52"), + # same range different partition key range ids + ([(("AA", "DD"), "1:1#51#3=52"), (("AA", "DD"),"0:1#55#3=52")], + ("AA", "DD"), "0:1#55#3=52"), + # split with one child + ([(("AA", "DD"), "0:1#51#3=52"), (("AA", "BB"),"1:1#55#3=52")], + ("AA", "DD"), "0:1#51#3=52,1:1#55#3=52"), + # Highest GLSN, which is 55 in this # cspell:disable-line + # ex. "1:1#55#3=52", is the one that will be returned because + # it is higher than all of the other feed range contained in the same + # range + ([(("AA", "DD"), "0:1#42#3=52"), (("AA", "BB"), "1:1#51#3=52"), + (("BB", "CC"),"1:1#53#3=52"), (("CC", "DD"),"1:1#55#3=52")], + ("AA", "DD"), "1:1#55#3=52"), + # Highest GLSN, which is 60 in this # cspell:disable-line + # ex. "1:1#60#3=52", is the one that will be returned because + # it is higher than all of the other feed range contained in the same + # range with some of them overlapping with each other + ([(("AA", "DD"), "0:1#60#3=52"), (("AA", "BB"), "1:1#51#3=52"), + (("BB", "CC"),"1:1#53#3=52"), (("CC", "DD"),"1:1#55#3=52")], + ("AA", "DD"), "0:1#60#3=52"), + # AA-DD can be created from the other ranges + # but the GLSN's are not all larger than the one # cspell:disable-line + # in the AA-DD range so we just compound as cannot make + # conclusions in this case + ([(("AA", "DD"), "0:1#60#3=52"), (("AA", "BB"), "1:1#51#3=52"), + (("BB", "CC"),"1:1#66#3=52"), (("CC", "DD"),"1:1#55#3=52")], + ("AA", "DD"), "0:1#60#3=52,1:1#66#3=52"), + # merge with one child + ([(("AA", "DD"), "3:1#55#3=52"), (("AA", "BB"),"1:1#51#3=52")], + ("AA", "DD"), "3:1#55#3=52"), + # merge with two children + ([(("AA", "DD"), "3:1#55#3=52"), (("AA", "BB"),"1:1#51#3=52"), (("BB", "DD"),"2:1#54#3=52")], + ("AA", "DD"), "3:1#55#3=52"), + # compound session token + ([(("AA", "DD"), "2:1#54#3=52,1:1#55#3=52"), (("AA", "BB"),"0:1#51#3=52")], + ("AA", "BB"), "2:1#54#3=52,1:1#55#3=52,0:1#51#3=52"), + # several compound session token with one range + ([(("AA", "DD"), "2:1#57#3=52,1:1#57#3=52"), (("AA", "DD"),"2:1#56#3=52,1:1#58#3=52")], + ("AA", "DD"), "2:1#57#3=52,1:1#58#3=52"), + # overlapping ranges + ([(("AA", "CC"), "0:1#54#3=52"), (("BB", "FF"),"2:1#51#3=52")], + ("AA", "EE"), "0:1#54#3=52,2:1#51#3=52"), + # different version numbers + ([(("AA", "BB"), "0:1#54#3=52"), (("AA", "BB"),"0:2#57#3=53")], + ("AA", "BB"), "0:2#57#3=53"), + # mixed scenarios + ([(("AA", "DD"), "3:1#60#3=53"), (("AA", "BB"), "1:1#54#3=52"), (("AA", "BB"), "1:1#52#3=53"), + (("BB", "CC"),"1:1#53#3=52"), (("BB", "CC"),"6:1#70#3=55,4:1#90#3=52"), + (("CC", "DD"),"1:1#55#3=52")], ("AA", "DD"), "3:1#60#3=53,6:1#70#3=55,4:1#90#3=52") + ] + +@pytest.mark.cosmosEmulator +@pytest.mark.unittest +@pytest.mark.usefixtures("setup") +class TestSessionTokenHelpers: + """Test for session token helpers""" + + created_db: DatabaseProxy = None + client: cosmos_client.CosmosClient = None + host = test_config.TestConfig.host + masterKey = test_config.TestConfig.masterKey + configs = test_config.TestConfig + TEST_DATABASE_ID = configs.TEST_DATABASE_ID + TEST_COLLECTION_ID = configs.TEST_SINGLE_PARTITION_CONTAINER_ID + + def test_get_session_token_update(self, setup): + feed_range = FeedRangeInternalEpk( + Range("AA", "BB", True, False)).to_dict() + session_token = "0:1#54#3=50" + feed_ranges_and_session_tokens = [(feed_range, session_token)] + session_token = "0:1#51#3=52" + feed_ranges_and_session_tokens.append((feed_range, session_token)) + session_token = setup[COLLECTION].get_latest_session_token(feed_ranges_and_session_tokens, feed_range) + assert session_token == "0:1#54#3=52" + + def test_many_session_tokens_update_same_range(self, setup): + feed_range = FeedRangeInternalEpk( + Range("AA", "BB", True, False)).to_dict() + feed_ranges_and_session_tokens = [] + for i in range(1000): + session_token = "0:1#" + str(random.randint(1, 100)) + "#3=" + str(random.randint(1, 100)) + feed_ranges_and_session_tokens.append((feed_range, session_token)) + session_token = "0:1#101#3=101" + feed_ranges_and_session_tokens.append((feed_range, session_token)) + updated_session_token = setup["created_collection"].get_latest_session_token(feed_ranges_and_session_tokens, + feed_range) + assert updated_session_token == session_token + + def test_many_session_tokens_update(self, setup): + feed_range = FeedRangeInternalEpk( + Range("AA", "BB", True, False)).to_dict() + feed_ranges_and_session_tokens = [] + for i in range(1000): + session_token = "0:1#" + str(random.randint(1, 100)) + "#3=" + str(random.randint(1, 100)) + feed_ranges_and_session_tokens.append((feed_range, session_token)) + + # adding irrelevant feed ranges + feed_range1 = FeedRangeInternalEpk( + Range("CC", "FF", True, False)).to_dict() + feed_range2 = FeedRangeInternalEpk( + Range("00", "55", True, False)).to_dict() + for i in range(1000): + session_token = "0:1#" + str(random.randint(1, 100)) + "#3=" + str(random.randint(1, 100)) + if i % 2 == 0: + feed_ranges_and_session_tokens.append((feed_range1, session_token)) + else: + feed_ranges_and_session_tokens.append((feed_range2, session_token)) + session_token = "0:1#101#3=101" + feed_ranges_and_session_tokens.append((feed_range, session_token)) + updated_session_token = setup["created_collection"].get_latest_session_token(feed_ranges_and_session_tokens, + feed_range) + assert updated_session_token == session_token + + @pytest.mark.parametrize("split_ranges, target_feed_range, expected_session_token", create_split_ranges()) + def test_simulated_splits_merges(self, setup, split_ranges, target_feed_range, expected_session_token): + actual_split_ranges = [] + for feed_range, session_token in split_ranges: + actual_split_ranges.append((FeedRangeInternalEpk(Range(feed_range[0], feed_range[1], + True, False)).to_dict(), session_token)) + target_feed_range = FeedRangeInternalEpk(Range(target_feed_range[0], target_feed_range[1][1], + True, False)).to_dict() + updated_session_token = setup[COLLECTION].get_latest_session_token(actual_split_ranges, target_feed_range) + assert updated_session_token == expected_session_token + + def test_invalid_feed_range(self, setup): + feed_range = FeedRangeInternalEpk( + Range("AA", "BB", True, False)).to_dict() + session_token = "0:1#54#3=50" + feed_ranges_and_session_tokens = [(feed_range, session_token)] + with pytest.raises(ValueError, match='There were no overlapping feed ranges with the target.'): + setup["created_collection"].get_latest_session_token(feed_ranges_and_session_tokens, + FeedRangeInternalEpk(Range( + "CC", + "FF", + True, + False)).to_dict()) + +if __name__ == '__main__': + unittest.main()