Skip to content
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
a47452e
sync PPAF
simorenoh Jun 15, 2025
b8228e7
async changes
simorenoh Jun 16, 2025
151a2fa
Update test_per_partition_automatic_failover_async.py
simorenoh Jun 16, 2025
b9e0a08
CI fixes
simorenoh Jun 16, 2025
e4d7046
changelog
simorenoh Jun 16, 2025
09e7163
broken link
simorenoh Jun 16, 2025
4e28f66
Update test_location_cache.py
simorenoh Jun 16, 2025
c5319e8
change PPAF detection logic
simorenoh Jun 16, 2025
eba6093
Update _global_partition_endpoint_manager_circuit_breaker_core.py
simorenoh Jun 16, 2025
2ec5c5d
Update _global_partition_endpoint_manager_circuit_breaker_core.py
simorenoh Jun 17, 2025
62d7be0
fix tests and remove environment variable
tvaron3 Jun 18, 2025
b57949d
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-python …
tvaron3 Jun 18, 2025
24b8415
fix tests
tvaron3 Jun 23, 2025
9595327
revert excluded locations change
tvaron3 Jul 2, 2025
8911ef5
fix analyze
tvaron3 Jul 3, 2025
25dbeb3
test excluded locations
tvaron3 Jul 7, 2025
d61a9a9
Add different error handling for 503 and 408s, update README
tvaron3 Jul 8, 2025
3f8ac23
Merge branch 'main' into cosmos-ppaf
simorenoh Jul 30, 2025
f1c69ed
mypy, cspell, pylint
simorenoh Jul 31, 2025
9306d15
remove tag from tests since config is service based
simorenoh Jul 31, 2025
bd07d83
add threshold-based retries for 408, 5xx errors
simorenoh Aug 7, 2025
80cc824
Merge branch 'main' into cosmos-ppaf
simorenoh Aug 8, 2025
2e5838c
update constant use, rollback session token PR change
simorenoh Aug 8, 2025
8b7d181
threshold based retries
simorenoh Aug 18, 2025
f25b660
Merge branch 'main' into cosmos-ppaf
simorenoh Aug 18, 2025
d8ed980
Update _base.py
simorenoh Aug 19, 2025
fcd5c60
cspell, test fixes
simorenoh Aug 19, 2025
93c76ad
Merge branch 'main' into cosmos-ppaf
simorenoh Aug 19, 2025
467a95d
Update _service_unavailable_retry_policy.py
simorenoh Aug 19, 2025
b9aa01c
mypy, pylint
simorenoh Aug 19, 2025
64f95e3
503 behavior change, use regional contexts
simorenoh Aug 21, 2025
d05fc5e
mypy, pylint, tests
simorenoh Aug 21, 2025
85b2007
special-casing 503s
simorenoh Aug 21, 2025
f8fa70a
small fix
simorenoh Aug 21, 2025
e5c5ac5
exclude region tests
simorenoh Aug 21, 2025
ccd9def
session retry tests
simorenoh Aug 22, 2025
1dccc5d
pylint, cspell
simorenoh Aug 22, 2025
ebf0b0d
Merge branch 'main' into cosmos-ppaf
simorenoh Aug 22, 2025
c2bb93a
change errors since 503 is now retried directly
simorenoh Aug 25, 2025
c3879d8
Update sdk/cosmos/azure-cosmos/README.md
simorenoh Aug 26, 2025
1d57bf2
address comments
simorenoh Aug 26, 2025
eec77e7
Update _service_unavailable_retry_policy.py
simorenoh Aug 26, 2025
4c2bf32
small test updates for 503 behavior
simorenoh Aug 26, 2025
05654a9
further comments
simorenoh Aug 27, 2025
f982d21
Update test_per_partition_circuit_breaker_sm_mrr.py
simorenoh Aug 27, 2025
d9ca7a4
test fixes
simorenoh Aug 27, 2025
f1dce5d
Update test_excluded_locations.py
simorenoh Aug 27, 2025
1582cf3
small improvement to region-finding
simorenoh Aug 29, 2025
8f7ec0c
pylint
simorenoh Aug 29, 2025
1c10349
Merge branch 'main' into cosmos-ppaf
simorenoh Aug 29, 2025
effb6d1
Update _global_partition_endpoint_manager_per_partition_automatic_fai…
simorenoh Aug 29, 2025
1e773f5
address comments, add threshold lock
simorenoh Aug 29, 2025
24a44d9
add more comments
simorenoh Aug 29, 2025
d07610a
Merge branch 'main' into cosmos-ppaf
simorenoh Sep 2, 2025
f984204
Merge branch 'main' into cosmos-ppaf
simorenoh Sep 4, 2025
c772092
edge cases
simorenoh Sep 19, 2025
143cf17
Merge branch 'main' into cosmos-ppaf
simorenoh Sep 19, 2025
ef9f73a
Merge branch 'main' into cosmos-ppaf
simorenoh Oct 2, 2025
3acda24
changes from testing
simorenoh Oct 7, 2025
9a6b17b
pylint
simorenoh Oct 7, 2025
c3e0035
Merge branch 'main' into cosmos-ppaf
simorenoh Oct 8, 2025
8f75444
fixes pylint/mypy
simorenoh Oct 8, 2025
0ccd9bf
mypy complaining about assigning str to none
simorenoh Oct 8, 2025
f4e4d65
testing changes - will roll back later
simorenoh Oct 8, 2025
4e276e1
Merge branch 'cosmos-ppaf' of https://github.com/Azure/azure-sdk-for-…
simorenoh Oct 8, 2025
8f87b13
Update _endpoint_discovery_retry_policy.py
simorenoh Oct 9, 2025
3e1f6be
Update _asynchronous_request.py
simorenoh Oct 17, 2025
42817fc
add user agent feature flags
simorenoh Oct 17, 2025
23f3b0d
Merge branch 'main' into cosmos-ppaf
simorenoh Oct 20, 2025
65f9e01
Update test_per_partition_automatic_failover_async.py
simorenoh Oct 20, 2025
e15e43d
move user agent logic
simorenoh Oct 24, 2025
0d7e887
sync and async match, remove print statements
simorenoh Oct 29, 2025
aa3b641
leftover timer
simorenoh Oct 29, 2025
799f6de
Update _retry_utility.py
simorenoh Oct 30, 2025
36249b4
use constants
simorenoh Oct 30, 2025
f5cd24b
Merge branch 'main' into cosmos-ppaf
simorenoh Oct 31, 2025
0495c7b
pylint
simorenoh Oct 31, 2025
335e10e
Merge branch 'main' into cosmos-ppaf
simorenoh Nov 17, 2025
2f004b7
Merge branch 'main' into cosmos-ppaf
simorenoh Nov 17, 2025
8639093
Update CHANGELOG.md
simorenoh Nov 17, 2025
5b3815f
react to comments
simorenoh Nov 19, 2025
e31d674
Update _retry_utility.py
simorenoh Nov 19, 2025
e55871c
mypy pylint
simorenoh Nov 19, 2025
0463a3f
test fixes
simorenoh Nov 20, 2025
cdfdc01
add lock to failure additions
simorenoh Nov 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#### Features Added
* Added feed range support in `query_items`. See [PR 41722](https://github.com/Azure/azure-sdk-for-python/pull/41722).
* Added support for Per Partition Automatic Failover. To enable this feature, you must follow the guide [here](https://learn.microsoft.com/azure/cosmos-db/how-to-configure-per-partition-automatic-failover). See [PR 41588](https://github.com/Azure/azure-sdk-for-python/pull/41588).

#### Bugs Fixed
* Fixed session container session token logic. The SDK will now only send the relevant partition-local session tokens for read document requests and write requests when multi-region writes are enabled, as opposed to the entire compound session token for the container for every document request. See [PR 41678](https://github.com/Azure/azure-sdk-for-python/pull/41678).
Expand Down
5 changes: 5 additions & 0 deletions sdk/cosmos/azure-cosmos/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -940,6 +940,11 @@ requests to another region:
- `AZURE_COSMOS_FAILURE_PERCENTAGE_TOLERATED`: Default is a `90` percent failure rate.
- After a partition reaches a 90 percent failure rate for all requests, the SDK will send requests routed to that partition to another region.

### Per Partition Automatic Failover (Public Preview)
Per partition automatic failover enables the SDK to automatically redirect write requests at the partition level to another region based on service-side signals. This feature is available
only for single write region accounts that have at least one read-only region. When per partition automatic failover is enabled, per partition circuit breaker and hedging is enabled by default, meaning
all its configurable options also apply to per partition automatic failover. To enable this feature, follow the guide [here](https://learn.microsoft.com/azure/cosmos-db/how-to-configure-per-partition-automatic-failover).

## Troubleshooting

### General
Expand Down
24 changes: 24 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import uuid
import re
import binascii
import os
from typing import Dict, Any, List, Mapping, Optional, Sequence, Union, Tuple, TYPE_CHECKING

from urllib.parse import quote as urllib_quote
Expand All @@ -45,9 +46,13 @@
if TYPE_CHECKING:
from ._cosmos_client_connection import CosmosClientConnection
from .aio._cosmos_client_connection_async import CosmosClientConnection as AsyncClientConnection
from ._global_partition_endpoint_manager_per_partition_automatic_failover import (
_GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover)
from ._request_object import RequestObject
from ._routing.routing_range import PartitionKeyRangeWrapper

# pylint: disable=protected-access
#cspell:ignore PPAF, ppaf

_COMMON_OPTIONS = {
'initial_headers': 'initialHeaders',
Expand Down Expand Up @@ -935,3 +940,22 @@ def _build_properties_cache(properties: Dict[str, Any], container_link: str) ->
"_self": properties.get("_self", None), "_rid": properties.get("_rid", None),
"partitionKey": properties.get("partitionKey", None), "container_link": container_link
}

def try_ppaf_failover_threshold(
global_endpoint_manager: "_GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover",
pk_range_wrapper: "PartitionKeyRangeWrapper",
request: "RequestObject"):
# If PPAF is enabled, we track consecutive failures for certain exceptions, and only fail over at a partition
# level after the threshold is reached
if request and global_endpoint_manager.is_per_partition_automatic_failover_applicable(request):
if (global_endpoint_manager.ppaf_thresholds_tracker.get_pk_failures(pk_range_wrapper)
>= int(os.environ.get(Constants.TIMEOUT_ERROR_THRESHOLD_PPAF,
Constants.TIMEOUT_ERROR_THRESHOLD_PPAF_DEFAULT))):
# If the PPAF threshold is reached, we reset the count and retry to the next region
global_endpoint_manager.ppaf_thresholds_tracker.clear_pk_failures(pk_range_wrapper)
partition_level_info = global_endpoint_manager.partition_range_to_failover_info[pk_range_wrapper]
location = global_endpoint_manager.location_cache.get_location_from_endpoint(
str(request.location_endpoint_to_route))
regional_context = (global_endpoint_manager.location_cache.
account_read_regional_routing_contexts_by_location.get(location).primary_endpoint)
partition_level_info.unavailable_regional_endpoints[location] = regional_context
6 changes: 6 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from typing import Dict
from typing_extensions import Literal
# cspell:ignore PPAF


class _Constants:
Expand All @@ -40,6 +41,7 @@ class _Constants:
DatabaseAccountEndpoint: Literal["databaseAccountEndpoint"] = "databaseAccountEndpoint"
DefaultEndpointsRefreshTime: int = 5 * 60 * 1000 # milliseconds
UnavailableEndpointDBATimeouts: int = 1 # seconds
EnablePerPartitionFailoverBehavior: Literal["enablePerPartitionFailoverBehavior"] = "enablePerPartitionFailoverBehavior" #pylint: disable=line-too-long

# ServiceDocument Resource
EnableMultipleWritableLocations: Literal["enableMultipleWriteLocations"] = "enableMultipleWriteLocations"
Expand Down Expand Up @@ -71,6 +73,10 @@ class _Constants:
FAILURE_PERCENTAGE_TOLERATED = "AZURE_COSMOS_FAILURE_PERCENTAGE_TOLERATED"
FAILURE_PERCENTAGE_TOLERATED_DEFAULT: int = 90
# -------------------------------------------------------------------------
# Only applicable when per partition automatic failover is enabled --------
TIMEOUT_ERROR_THRESHOLD_PPAF = "AZURE_COSMOS_TIMEOUT_ERROR_THRESHOLD_FOR_PPAF"
TIMEOUT_ERROR_THRESHOLD_PPAF_DEFAULT: int = 10
# -------------------------------------------------------------------------

# Error code translations
ERROR_TRANSLATIONS: Dict[int, str] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
HttpResponse # pylint: disable=no-legacy-azure-core-http-response-import

from . import _base as base
from ._global_partition_endpoint_manager_circuit_breaker import _GlobalPartitionEndpointManagerForCircuitBreaker
from ._global_partition_endpoint_manager_per_partition_automatic_failover import _GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover # pylint: disable=line-too-long
from . import _query_iterable as query_iterable
from . import _runtime_constants as runtime_constants
from . import _session
Expand Down Expand Up @@ -170,7 +170,7 @@ def __init__( # pylint: disable=too-many-statements
self.last_response_headers: CaseInsensitiveDict = CaseInsensitiveDict()

self.UseMultipleWriteLocations = False
self._global_endpoint_manager = _GlobalPartitionEndpointManagerForCircuitBreaker(self)
self._global_endpoint_manager = _GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover(self)

retry_policy = None
if isinstance(self.connection_policy.ConnectionRetryConfiguration, HTTPPolicy):
Expand Down Expand Up @@ -2677,12 +2677,15 @@ def GetDatabaseAccount(
database_account._ReadableLocations = result[Constants.ReadableLocations]
if Constants.EnableMultipleWritableLocations in result:
database_account._EnableMultipleWritableLocations = result[
Constants.EnableMultipleWritableLocations
]
Constants.EnableMultipleWritableLocations]

self.UseMultipleWriteLocations = (
self.connection_policy.UseMultipleWriteLocations and database_account._EnableMultipleWritableLocations
)

if Constants.EnablePerPartitionFailoverBehavior in result:
database_account._EnablePerPartitionFailoverBehavior = result[Constants.EnablePerPartitionFailoverBehavior]

if response_hook:
response_hook(last_response_headers, result)
return database_account
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,8 @@
Azure Cosmos database service.
"""

import logging
from azure.cosmos.documents import _OperationType

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
log_formatter = logging.Formatter("%(levelname)s:%(message)s")
log_handler = logging.StreamHandler()
log_handler.setFormatter(log_formatter)
logger.addHandler(log_handler)


class EndpointDiscoveryRetryPolicy(object):
"""The endpoint discovery retry policy class used for geo-replicated database accounts
to handle the write forbidden exceptions due to writable/readable location changes
Expand All @@ -43,8 +34,9 @@ class EndpointDiscoveryRetryPolicy(object):
Max_retry_attempt_count = 120
Retry_after_in_milliseconds = 1000

def __init__(self, connection_policy, global_endpoint_manager, *args):
def __init__(self, connection_policy, global_endpoint_manager, pk_range_wrapper, *args):
self.global_endpoint_manager = global_endpoint_manager
self.pk_range_wrapper = pk_range_wrapper
self._max_retry_attempt_count = EndpointDiscoveryRetryPolicy.Max_retry_attempt_count
self.failover_retry_count = 0
self.retry_after_in_milliseconds = EndpointDiscoveryRetryPolicy.Retry_after_in_milliseconds
Expand All @@ -70,6 +62,22 @@ def ShouldRetry(self, exception): # pylint: disable=unused-argument

self.failover_retry_count += 1

# set the refresh_needed flag to ensure that endpoint list is
# refreshed with new writable and readable locations
self.global_endpoint_manager.refresh_needed = True

# If per partition automatic failover is applicable, we mark the current endpoint as unavailable
# and resolve the service endpoint for the partition range - otherwise, continue the default retry logic
if self.global_endpoint_manager.is_per_partition_automatic_failover_applicable(self.request):
partition_level_info = self.global_endpoint_manager.partition_range_to_failover_info[self.pk_range_wrapper]
location = self.global_endpoint_manager.location_cache.get_location_from_endpoint(
str(self.request.location_endpoint_to_route))
location_endpoint = (self.global_endpoint_manager.location_cache.
account_read_regional_routing_contexts_by_location.get(location).primary_endpoint)
partition_level_info.unavailable_regional_endpoints[location] = location_endpoint
self.global_endpoint_manager.resolve_service_endpoint_for_partition(self.request, self.pk_range_wrapper)
return True

if self.request.location_endpoint_to_route:
if _OperationType.IsReadOnlyOperation(self.request.operation_type):
# Mark current read endpoint as unavailable
Expand All @@ -81,10 +89,6 @@ def ShouldRetry(self, exception): # pylint: disable=unused-argument
self.request.location_endpoint_to_route,
True)

# set the refresh_needed flag to ensure that endpoint list is
# refreshed with new writable and readable locations
self.global_endpoint_manager.refresh_needed = True

# clear previous location-based routing directive
self.request.clear_route_to_location()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
if TYPE_CHECKING:
from azure.cosmos._cosmos_client_connection import CosmosClientConnection

#cspell:ignore ppcb

class _GlobalPartitionEndpointManagerForCircuitBreaker(_GlobalEndpointManager):
"""
This internal class implements the logic for partition endpoint management for
Expand Down Expand Up @@ -93,16 +95,17 @@ def create_pk_range_wrapper(self, request: RequestObject) -> Optional[PartitionK

return PartitionKeyRangeWrapper(partition_range, container_rid)

def record_failure(
def record_ppcb_failure(
self,
request: RequestObject
) -> None:
request: RequestObject,
pk_range_wrapper: Optional[PartitionKeyRangeWrapper] = None)-> None:
if self.is_circuit_breaker_applicable(request):
pk_range_wrapper = self.create_pk_range_wrapper(request)
if pk_range_wrapper is None:
pk_range_wrapper = self.create_pk_range_wrapper(request)
if pk_range_wrapper:
self.global_partition_endpoint_manager_core.record_failure(request, pk_range_wrapper)

def resolve_service_endpoint_for_partition(
def _resolve_service_endpoint_for_partition_circuit_breaker(
self,
request: RequestObject,
pk_range_wrapper: Optional[PartitionKeyRangeWrapper]
Expand All @@ -113,11 +116,12 @@ def resolve_service_endpoint_for_partition(
pk_range_wrapper)
return self._resolve_service_endpoint(request)

def record_success(
def record_ppcb_success(
self,
request: RequestObject
) -> None:
if self.global_partition_endpoint_manager_core.is_circuit_breaker_applicable(request):
pk_range_wrapper = self.create_pk_range_wrapper(request)
request: RequestObject,
pk_range_wrapper: Optional[PartitionKeyRangeWrapper] = None) -> None:
if self.is_circuit_breaker_applicable(request):
if pk_range_wrapper is None:
pk_range_wrapper = self.create_pk_range_wrapper(request)
if pk_range_wrapper:
self.global_partition_endpoint_manager_core.record_success(request, pk_range_wrapper)
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

# pylint: disable=protected-access

"""Internal class for global endpoint manager for circuit breaker.
"""
import logging
Expand Down Expand Up @@ -60,7 +62,10 @@ def is_circuit_breaker_applicable(self, request: RequestObject) -> bool:
return False

circuit_breaker_enabled = os.environ.get(Constants.CIRCUIT_BREAKER_ENABLED_CONFIG,
Constants.CIRCUIT_BREAKER_ENABLED_CONFIG_DEFAULT) == "True"
Constants.CIRCUIT_BREAKER_ENABLED_CONFIG_DEFAULT).lower() == "true"
if not circuit_breaker_enabled and self.client._global_endpoint_manager is not None:
if self.client._global_endpoint_manager._database_account_cache is not None:
circuit_breaker_enabled = self.client._global_endpoint_manager._database_account_cache._EnablePerPartitionFailoverBehavior is True # pylint: disable=line-too-long
if not circuit_breaker_enabled:
return False

Expand Down
Loading
Loading