Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
c811dd4
Add feed_range to query_items API
allenkim0129 Jun 11, 2025
40e61dd
Added overload methods for 'query_items' API
allenkim0129 Jun 18, 2025
53cf357
Added Tests
allenkim0129 Jun 19, 2025
609274f
Added feed_range to query_item for async
allenkim0129 Jun 19, 2025
9dadc7d
Added samples for query_items with feed_range
allenkim0129 Jun 23, 2025
2a89be8
Merge branch 'main' into users/allekim/addFeedRangeForQuery
allenkim0129 Jun 23, 2025
9958b70
Fix pylint error
allenkim0129 Jun 23, 2025
c64f832
Updated CHANGELOG.md
allenkim0129 Jun 23, 2025
de1446f
Fix test error
allenkim0129 Jun 23, 2025
fed65bf
Fix test error
allenkim0129 Jun 24, 2025
643b16f
Changed to run feed_range async tests on emulator only
allenkim0129 Jun 24, 2025
5940578
Fix tests
allenkim0129 Jun 25, 2025
a06526b
Fix tests
allenkim0129 Jun 25, 2025
973c4a8
Fix tests
allenkim0129 Jun 25, 2025
2a71b63
Fix tests with positional_args
allenkim0129 Jun 26, 2025
1b02d46
Addressing comments
allenkim0129 Jul 3, 2025
da8ac14
Fix pylint error
allenkim0129 Jul 7, 2025
db5b0c5
Changed to non-public APIs for internal classes/methods
allenkim0129 Jul 11, 2025
ba07870
Changed to non-public APIs for internal classes/methods
allenkim0129 Jul 11, 2025
28f1518
Changed to non-public APIs for internal classes/methods
allenkim0129 Jul 11, 2025
4a4a7bc
Pylint error
allenkim0129 Jul 11, 2025
b289b66
Pylint error
allenkim0129 Jul 14, 2025
2b1aae6
Address comments
allenkim0129 Jul 15, 2025
f61a987
Merge branch 'main' into users/allekim/addFeedRangeForQuery
allenkim0129 Jul 16, 2025
fd8ceb7
Add newline at the end _utils.py
allenkim0129 Jul 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.13.0b3 (Unreleased)

#### Features Added
* Added feed range support in `query_items`. See [PR 41722](https://github.com/Azure/azure-sdk-for-python/pull/41722).

#### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from azure.core.pipeline.transport._base import HttpRequest

from . import http_constants
from .partition_key import _Empty, _Undefined
from .partition_key import _Empty, _Undefined, PartitionKeyKind


# pylint: disable=protected-access
Expand Down Expand Up @@ -88,7 +88,7 @@ def should_extract_partition_key(self, container_cache: Optional[Dict[str, Any]]
if self._headers and http_constants.HttpHeaders.PartitionKey in self._headers:
current_partition_key = self._headers[http_constants.HttpHeaders.PartitionKey]
partition_key_definition = container_cache["partitionKey"] if container_cache else None
if partition_key_definition and partition_key_definition["kind"] == "MultiHash":
if partition_key_definition and partition_key_definition["kind"] == PartitionKeyKind.MULTI_HASH:
# A null in the multihash partition key indicates a failure in extracting partition keys
# from the document definition
return 'null' in current_partition_key
Expand All @@ -110,7 +110,7 @@ def _extract_partition_key(self, client: Optional[Any], container_cache: Optiona
elif options and isinstance(options["partitionKey"], _Empty):
new_partition_key = []
# else serialize using json dumps method which apart from regular values will serialize None into null
elif partition_key_definition and partition_key_definition["kind"] == "MultiHash":
elif partition_key_definition and partition_key_definition["kind"] == PartitionKeyKind.MULTI_HASH:
new_partition_key = json.dumps(options["partitionKey"], separators=(',', ':'))
else:
new_partition_key = json.dumps([options["partitionKey"]])
Expand All @@ -131,7 +131,7 @@ async def _extract_partition_key_async(self, client: Optional[Any],
elif isinstance(options["partitionKey"], _Empty):
new_partition_key = []
# else serialize using json dumps method which apart from regular values will serialize None into null
elif partition_key_definition and partition_key_definition["kind"] == "MultiHash":
elif partition_key_definition and partition_key_definition["kind"] == PartitionKeyKind.MULTI_HASH:
new_partition_key = json.dumps(options["partitionKey"], separators=(',', ':'))
else:
new_partition_key = json.dumps([options["partitionKey"]])
Expand Down
47 changes: 21 additions & 26 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import os
import urllib.parse
import uuid
from typing import Callable, Dict, Any, Iterable, List, Mapping, Optional, Sequence, Tuple, Union, cast, Type
from typing import Callable, Dict, Any, Iterable, List, Mapping, Optional, Sequence, Tuple, Union, cast
from typing_extensions import TypedDict
from urllib3.util.retry import Retry

Expand Down Expand Up @@ -60,6 +60,7 @@
from ._base import _build_properties_cache
from ._change_feed.change_feed_iterable import ChangeFeedIterable
from ._change_feed.change_feed_state import ChangeFeedState
from ._change_feed.feed_range_internal import FeedRangeInternalEpk
from ._constants import _Constants as Constants
from ._cosmos_http_logging_policy import CosmosHttpLoggingPolicy
from ._cosmos_responses import CosmosDict, CosmosList
Expand All @@ -71,15 +72,12 @@
from .partition_key import (
_Undefined,
_Empty,
PartitionKey,
PartitionKeyKind,
PartitionKeyType,
SequentialPartitionKeyType,
_return_undefined_or_empty_partition_key,
NonePartitionKeyValue,
_get_partition_key_from_partition_key_definition
)

PartitionKeyType = Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]], Type[NonePartitionKeyValue]] # pylint: disable=line-too-long


class CredentialDict(TypedDict, total=False):
masterKey: str
resourceTokens: Mapping[str, Any]
Expand Down Expand Up @@ -3160,23 +3158,20 @@ def __GetBodiesFromQueryResult(result: Dict[str, Any]) -> List[Dict[str, Any]]:
request_params = RequestObject(resource_type, documents._OperationType.SqlQuery, req_headers)
request_params.set_excluded_location_from_options(options)

# check if query has prefix partition key
isPrefixPartitionQuery = kwargs.pop("isPrefixPartitionQuery", None)
if isPrefixPartitionQuery and "partitionKeyDefinition" in kwargs:
# Check if the over lapping ranges can be populated
feed_range_epk = None
if "feed_range" in kwargs:
feed_range = kwargs.pop("feed_range")
feed_range_epk = FeedRangeInternalEpk.from_json(feed_range).get_normalized_range()
elif "prefix_partition_key_object" in kwargs and "prefix_partition_key_value" in kwargs:
prefix_partition_key_obj = kwargs.pop("prefix_partition_key_object")
prefix_partition_key_value: SequentialPartitionKeyType = kwargs.pop("prefix_partition_key_value")
feed_range_epk = prefix_partition_key_obj.get_epk_range_for_prefix_partition_key(prefix_partition_key_value)

# If feed_range_epk exist, query with the range
if feed_range_epk is not None:
last_response_headers = CaseInsensitiveDict()
# here get the over lapping ranges
# Default to empty Dictionary, but unlikely to be empty as we first check if we have it in kwargs
pk_properties: Union[PartitionKey, Dict] = kwargs.pop("partitionKeyDefinition", {})
partition_key_definition = _get_partition_key_from_partition_key_definition(pk_properties)
partition_key_value: Sequence[
Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]]] = cast(
Sequence[Union[None, bool, int, float, str, _Undefined, Type[NonePartitionKeyValue]]],
pk_properties.get("partition_key")
)
feedrangeEPK = partition_key_definition._get_epk_range_for_prefix_partition_key(
partition_key_value
) # cspell:disable-line
over_lapping_ranges = self._routing_map_provider.get_overlapping_ranges(resource_id, [feedrangeEPK],
over_lapping_ranges = self._routing_map_provider.get_overlapping_ranges(resource_id, [feed_range_epk],
options)
# It is possible to get more than one over lapping range. We need to get the query results for each one
results: Dict[str, Any] = {}
Expand All @@ -3193,8 +3188,8 @@ def __GetBodiesFromQueryResult(result: Dict[str, Any]) -> List[Dict[str, Any]]:
single_range = routing_range.Range.PartitionKeyRangeToRange(over_lapping_range)
# Since the range min and max are all Upper Cased string Hex Values,
# we can compare the values lexicographically
EPK_sub_range = routing_range.Range(range_min=max(single_range.min, feedrangeEPK.min),
range_max=min(single_range.max, feedrangeEPK.max),
EPK_sub_range = routing_range.Range(range_min=max(single_range.min, feed_range_epk.min),
range_max=min(single_range.max, feed_range_epk.max),
isMinInclusive=True, isMaxInclusive=False)
if single_range.min == EPK_sub_range.min and EPK_sub_range.max == single_range.max:
# The Epk Sub Range spans exactly one physical partition
Expand Down Expand Up @@ -3339,7 +3334,7 @@ def _ExtractPartitionKey(
partitionKeyDefinition: Mapping[str, Any],
document: Mapping[str, Any]
) -> Union[List[Optional[Union[str, float, bool]]], str, float, bool, _Empty, _Undefined]:
if partitionKeyDefinition["kind"] == "MultiHash":
if partitionKeyDefinition["kind"] == PartitionKeyKind.MULTI_HASH:
ret: List[Optional[Union[str, float, bool]]] = []
for partition_key_level in partitionKeyDefinition["paths"]:
# Parses the paths into a list of token each representing a property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"""
from typing import TYPE_CHECKING, Optional

from azure.cosmos.partition_key import _get_partition_key_from_partition_key_definition
from azure.cosmos.partition_key import get_partition_key_from_definition
from azure.cosmos._global_partition_endpoint_manager_circuit_breaker_core import \
_GlobalPartitionEndpointManagerForCircuitBreakerCore

Expand Down Expand Up @@ -62,12 +62,12 @@ def create_pk_range_wrapper(self, request: RequestObject) -> Optional[PartitionK
# get relevant information from container cache to get the overlapping ranges
container_link = properties["container_link"]
partition_key_definition = properties["partitionKey"]
partition_key = _get_partition_key_from_partition_key_definition(partition_key_definition)
partition_key = get_partition_key_from_definition(partition_key_definition)

if HttpHeaders.PartitionKey in request.headers:
partition_key_value = request.headers[HttpHeaders.PartitionKey]
# get the partition key range for the given partition key
epk_range = [partition_key._get_epk_range_for_partition_key(partition_key_value)] # pylint: disable=protected-access
epk_range = [partition_key.get_epk_range_for_partition_key(partition_key_value)] # pylint: disable=protected-access
partition_ranges = (self.client._routing_map_provider # pylint: disable=protected-access
.get_overlapping_ranges(container_link, epk_range))
partition_range = Range.PartitionKeyRangeToRange(partition_ranges[0])
Expand Down
47 changes: 42 additions & 5 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@
import base64
import json
import time
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional, Tuple

from ._version import VERSION


def get_user_agent(suffix: Optional[str]) -> str:
def get_user_agent(suffix: Optional[str] = None) -> str:
os_name = safe_user_agent_header(platform.platform())
python_version = safe_user_agent_header(platform.python_version())
user_agent = "azsdk-python-cosmos/{} Python/{} ({})".format(VERSION, python_version, os_name)
if suffix:
user_agent += f" {suffix}"
return user_agent

def get_user_agent_async(suffix: Optional[str]) -> str:
def get_user_agent_async(suffix: Optional[str] = None) -> str:
os_name = safe_user_agent_header(platform.platform())
python_version = safe_user_agent_header(platform.python_version())
user_agent = "azsdk-python-cosmos-async/{} Python/{} ({})".format(VERSION, python_version, os_name)
Expand All @@ -49,7 +49,7 @@ def get_user_agent_async(suffix: Optional[str]) -> str:
return user_agent


def safe_user_agent_header(s: Optional[str]) -> str:
def safe_user_agent_header(s: Optional[str] = None) -> str:
if s is None:
s = "unknown"
# remove all white spaces
Expand All @@ -59,7 +59,7 @@ def safe_user_agent_header(s: Optional[str]) -> str:
return s


def get_index_metrics_info(delimited_string: Optional[str]) -> Dict[str, Any]:
def get_index_metrics_info(delimited_string: Optional[str] = None) -> Dict[str, Any]:
if delimited_string is None:
return {}
try:
Expand All @@ -76,3 +76,40 @@ def get_index_metrics_info(delimited_string: Optional[str]) -> Dict[str, Any]:

def current_time_millis() -> int:
return int(round(time.time() * 1000))

def add_args_to_kwargs(
arg_names: List[str],
args: Tuple[Any, ...],
kwargs: Dict[str, Any]
) -> None:
"""Add positional arguments(args) to keyword argument dictionary(kwargs) using names in arg_names as keys.
To be backward-compatible, some expected positional arguments has to be allowed. This method will verify number of
maximum positional arguments and add them to the keyword argument dictionary(kwargs)

:param List[str] arg_names: The names of positional arguments.
:param Tuple[Any, ...] args: The tuple of positional arguments.
:param Dict[str, Any] kwargs: The dictionary of keyword arguments as reference. This dictionary will be updated.
"""

if len(args) > len(arg_names):
raise ValueError(f"Positional argument is out of range. Expected {len(arg_names)} arguments, "
f"but got {len(args)} instead. Please review argument list in API documentation.")

for name, arg in zip(arg_names, args):
if name in kwargs:
raise ValueError(f"{name} cannot be used as positional and keyword argument at the same time.")
kwargs[name] = arg

def verify_exclusive_arguments(
exclusive_keys: List[str],
**kwargs: Dict[str, Any]) -> None:
"""Verify if exclusive arguments are present in kwargs.
For some Cosmos SDK APIs, some arguments are exclusive, or cannot be used at the same time. This method will verify
that and raise an error if exclusive arguments are present.

:param List[str] exclusive_keys: The names of exclusive arguments.
"""
keys_in_kwargs = [key for key in exclusive_keys if key in kwargs and kwargs[key] is not None]

if len(keys_in_kwargs) > 1:
raise ValueError(f"{', '.join(keys_in_kwargs)} are exclusive parameters, please only set one of them")
Loading
Loading