Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
4c4b078
make consistent_hash_ring private
bryevdv Jul 11, 2019
038db50
make default_retry_policy private
bryevdv Jul 11, 2019
51d4466
make endpoint_discovery_retry_policy private
bryevdv Jul 11, 2019
b1719fe
make hash_partition_resolver private
bryevdv Jul 11, 2019
46e7fed
make location_cache private
bryevdv Jul 11, 2019
bff6cc9
make murmur_hash private
bryevdv Jul 11, 2019
0e100af
make range private
bryevdv Jul 11, 2019
3d2e65c
make range_partition_resolver private
bryevdv Jul 11, 2019
3e43f69
make vector_session_token private
bryevdv Jul 16, 2019
60ad7fb
make resource_throttle_retry_policy private
bryevdv Jul 16, 2019
68fd7a9
make retry_utility private
bryevdv Jul 16, 2019
8e9274c
make utils private
bryevdv Jul 16, 2019
6dad678
make routing private
bryevdv Jul 17, 2019
6b1a641
make execution_context private
bryevdv Jul 18, 2019
ce36df2
make cosmos_client_connection private
bryevdv Jul 18, 2019
2063355
make retry_options private
bryevdv Jul 18, 2019
8e84029
make query_iterable private
bryevdv Jul 18, 2019
eedb532
make constants private
bryevdv Jul 19, 2019
56c074a
make synchronized_request private
bryevdv Jul 19, 2019
109e496
make session_retry_policy private
bryevdv Jul 19, 2019
c94dc6d
make partition private
bryevdv Jul 19, 2019
4502740
make global_endpoint_manager private
bryevdv Jul 19, 2019
0a60acf
make runtime_constants private
bryevdv Jul 19, 2019
4fb54ac
make session private
bryevdv Jul 19, 2019
2a41dfc
make request_object private
bryevdv Jul 19, 2019
444d5d5
make base private
bryevdv Jul 23, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
make routing private
  • Loading branch information
bryevdv committed Jul 17, 2019
commit 6dad678e91cfc082805b2d62e9defe1de1397122
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
"""

import bisect
from azure.cosmos.routing import routing_range
from azure.cosmos.routing.routing_range import _PartitionKeyRange
from azure.cosmos._routing import routing_range
from azure.cosmos._routing.routing_range import PartitionKeyRange
from six.moves import xrange

class _CollectionRoutingMap(object):
class CollectionRoutingMap(object):
"""Stores partition key ranges in an efficient way with some additional information and provides
convenience methods for working with set of ranges.
"""
Expand All @@ -40,7 +40,7 @@ def __init__(self, range_by_id, range_by_info, ordered_partition_key_ranges, ord
self._rangeByInfo = range_by_info
self._orderedPartitionKeyRanges = ordered_partition_key_ranges

self._orderedRanges = [routing_range._Range(pkr[_PartitionKeyRange.MinInclusive], pkr[_PartitionKeyRange.MaxExclusive], True, False) for pkr in ordered_partition_key_ranges]
self._orderedRanges = [routing_range.Range(pkr[PartitionKeyRange.MinInclusive], pkr[PartitionKeyRange.MaxExclusive], True, False) for pkr in ordered_partition_key_ranges]
self._orderedPartitionInfo = ordered_partition_info
self._collectionUniqueId = collection_unique_id

Expand All @@ -51,15 +51,15 @@ def CompleteRoutingMap(cls, partition_key_range_info_tupple_list, collection_uni

sortedRanges = []
for r in partition_key_range_info_tupple_list:
rangeById[r[0][_PartitionKeyRange.Id]] = r
rangeById[r[0][PartitionKeyRange.Id]] = r
rangeByInfo[r[1]] = r[0]
sortedRanges.append(r)

sortedRanges.sort(key = lambda r: r[0][_PartitionKeyRange.MinInclusive])
sortedRanges.sort(key = lambda r: r[0][PartitionKeyRange.MinInclusive])
partitionKeyOrderedRange = [r[0] for r in sortedRanges]
orderedPartitionInfo = [r[1] for r in sortedRanges]

if not _CollectionRoutingMap.is_complete_set_of_range(partitionKeyOrderedRange): return None
if not CollectionRoutingMap.is_complete_set_of_range(partitionKeyOrderedRange): return None
return cls(rangeById, rangeByInfo, partitionKeyOrderedRange, orderedPartitionInfo, collection_unique_id)

def get_ordered_partition_key_ranges(self):
Expand All @@ -80,10 +80,10 @@ def get_range_by_effective_partition_key(self, effective_partition_key_value):
The partition key range.
:rtype: dict
"""
if _CollectionRoutingMap.MinimumInclusiveEffectivePartitionKey == effective_partition_key_value:
if CollectionRoutingMap.MinimumInclusiveEffectivePartitionKey == effective_partition_key_value:
return self._orderedPartitionKeyRanges[0]

if _CollectionRoutingMap.MaximumExclusiveEffectivePartitionKey == effective_partition_key_value:
if CollectionRoutingMap.MaximumExclusiveEffectivePartitionKey == effective_partition_key_value:
return None

sortedLow = [(r.min, not r.isMinInclusive) for r in self._orderedRanges]
Expand Down Expand Up @@ -118,7 +118,7 @@ def get_overlapping_ranges(self, provided_partition_key_ranges):
:rtype: list
"""

if isinstance(provided_partition_key_ranges, routing_range._Range):
if isinstance(provided_partition_key_ranges, routing_range.Range):
return self.get_overlapping_ranges([provided_partition_key_ranges])

minToPartitionRange = {}
Expand All @@ -135,14 +135,14 @@ def get_overlapping_ranges(self, provided_partition_key_ranges):
maxIndex = maxIndex - 1

for i in xrange(minIndex, maxIndex + 1):
if routing_range._Range.overlaps(self._orderedRanges[i], providedRange):
minToPartitionRange[self._orderedPartitionKeyRanges[i][_PartitionKeyRange.MinInclusive]] = self._orderedPartitionKeyRanges[i]
if routing_range.Range.overlaps(self._orderedRanges[i], providedRange):
minToPartitionRange[self._orderedPartitionKeyRanges[i][PartitionKeyRange.MinInclusive]] = self._orderedPartitionKeyRanges[i]


overlapping_partition_key_ranges = list(minToPartitionRange.values())

def getKey(r):
return r[_PartitionKeyRange.MinInclusive]
return r[PartitionKeyRange.MinInclusive]
overlapping_partition_key_ranges.sort(key = getKey)
return overlapping_partition_key_ranges

Expand All @@ -153,16 +153,16 @@ def is_complete_set_of_range(ordered_partition_key_range_list):

firstRange = ordered_partition_key_range_list[0]
lastRange = ordered_partition_key_range_list[-1]
isComplete = (firstRange[_PartitionKeyRange.MinInclusive] == _CollectionRoutingMap.MinimumInclusiveEffectivePartitionKey)
isComplete &= (lastRange[_PartitionKeyRange.MaxExclusive] == _CollectionRoutingMap.MaximumExclusiveEffectivePartitionKey)
isComplete = (firstRange[PartitionKeyRange.MinInclusive] == CollectionRoutingMap.MinimumInclusiveEffectivePartitionKey)
isComplete &= (lastRange[PartitionKeyRange.MaxExclusive] == CollectionRoutingMap.MaximumExclusiveEffectivePartitionKey)

for i in range(1, len(ordered_partition_key_range_list)):
previousRange = ordered_partition_key_range_list[i - 1]
currentRange = ordered_partition_key_range_list[i]
isComplete &= previousRange[_PartitionKeyRange.MaxExclusive] == currentRange[_PartitionKeyRange.MinInclusive]
isComplete &= previousRange[PartitionKeyRange.MaxExclusive] == currentRange[PartitionKeyRange.MinInclusive]

if not isComplete:
if previousRange[_PartitionKeyRange.MaxExclusive] > currentRange[_PartitionKeyRange.MinInclusive]:
if previousRange[PartitionKeyRange.MaxExclusive] > currentRange[PartitionKeyRange.MinInclusive]:
raise ValueError("Ranges overlap")
break

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
"""

from .. import base
from .collection_routing_map import _CollectionRoutingMap
from .collection_routing_map import CollectionRoutingMap
from . import routing_range
from .routing_range import _PartitionKeyRange
from .routing_range import PartitionKeyRange

class _PartitionKeyRangeCache(object):
class PartitionKeyRangeCache(object):
'''
_PartitionKeyRangeCache provides list of effective partition key ranges for a collection.
PartitionKeyRangeCache provides list of effective partition key ranges for a collection.
This implementation loads and caches the collection routing map per collection on demand.

'''
Expand Down Expand Up @@ -67,26 +67,26 @@ def get_overlapping_ranges(self, collection_link, partition_key_ranges):
# for large collections, a split may complete between the read partition key ranges query page responses,
# causing the partitionKeyRanges to have both the children ranges and their parents. Therefore, we need
# to discard the parent ranges to have a valid routing map.
collection_pk_ranges = _PartitionKeyRangeCache._discard_parent_ranges(collection_pk_ranges)
collection_routing_map = _CollectionRoutingMap.CompleteRoutingMap([(r, True) for r in collection_pk_ranges], collection_id)
collection_pk_ranges = PartitionKeyRangeCache._discard_parent_ranges(collection_pk_ranges)
collection_routing_map = CollectionRoutingMap.CompleteRoutingMap([(r, True) for r in collection_pk_ranges], collection_id)
self._collection_routing_map_by_item[collection_id] = collection_routing_map
return collection_routing_map.get_overlapping_ranges(partition_key_ranges)

@staticmethod
def _discard_parent_ranges(partitionKeyRanges):
parentIds = set()
for r in partitionKeyRanges:
if isinstance(r, dict) and _PartitionKeyRange.Parents in r:
for parentId in r[_PartitionKeyRange.Parents]:
if isinstance(r, dict) and PartitionKeyRange.Parents in r:
for parentId in r[PartitionKeyRange.Parents]:
parentIds.add(parentId)
return (r for r in partitionKeyRanges if r[_PartitionKeyRange.Id] not in parentIds)
return (r for r in partitionKeyRanges if r[PartitionKeyRange.Id] not in parentIds)

class _SmartRoutingMapProvider(_PartitionKeyRangeCache):
class SmartRoutingMapProvider(PartitionKeyRangeCache):
"""
Efficiently uses PartitionKeyRangeCach and minimizes the unnecessary invocation of _CollectionRoutingMap.get_overlapping_ranges()
Efficiently uses PartitionKeyRangeCach and minimizes the unnecessary invocation of CollectionRoutingMap.get_overlapping_ranges()
"""
def __init__(self, client):
super(_SmartRoutingMapProvider, self).__init__(client)
super(SmartRoutingMapProvider, self).__init__(client)


def _second_range_is_after_first_range(self, range1, range2):
Expand All @@ -112,20 +112,20 @@ def _subtract_range(self, r, partition_key_range):
Evaluates and returns r - partition_key_range
:param dict partition_key_range:
Partition key range.
:param routing_range._Range r: query range.
:param routing_range.Range r: query range.
:return:
The subtract r - partition_key_range.
:rtype: routing_range._Range
:rtype: routing_range.Range
"""

left = max(partition_key_range[routing_range._PartitionKeyRange.MaxExclusive], r.min)
left = max(partition_key_range[routing_range.PartitionKeyRange.MaxExclusive], r.min)

if left == r.min:
leftInclusive = r.isMinInclusive
else:
leftInclusive = False

queryRange = routing_range._Range(left, r.max, leftInclusive,
queryRange = routing_range.Range(left, r.max, leftInclusive,
r.isMaxInclusive)
return queryRange

Expand All @@ -136,7 +136,7 @@ def get_overlapping_ranges(self, collection_link, sorted_ranges):

:param str collection_link:
The collection link.
:param (list of routing_range._Range) sorted_ranges: The sorted list of non-overlapping ranges.
:param (list of routing_range.Range) sorted_ranges: The sorted list of non-overlapping ranges.
:return:
List of partition key ranges.
:rtype: list of dict
Expand All @@ -163,11 +163,11 @@ def get_overlapping_ranges(self, collection_link, sorted_ranges):
else:
queryRange = currentProvidedRange

overlappingRanges = _PartitionKeyRangeCache.get_overlapping_ranges(self, collection_link, queryRange)
overlappingRanges = PartitionKeyRangeCache.get_overlapping_ranges(self, collection_link, queryRange)
assert len(overlappingRanges), ("code bug: returned overlapping ranges for queryRange {} is empty".format(queryRange))
target_partition_key_ranges.extend(overlappingRanges)

lastKnownTargetRange = routing_range._Range.PartitionKeyRangeToRange(target_partition_key_ranges[-1])
lastKnownTargetRange = routing_range.Range.PartitionKeyRangeToRange(target_partition_key_ranges[-1])

# the overlapping ranges must contain the requested range
assert currentProvidedRange.max <= lastKnownTargetRange.max, "code bug: returned overlapping ranges {} does not contain the requested range {}".format(overlappingRanges, queryRange)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
"""Internal class for partition key range implementation in the Azure Cosmos database service.
"""

class _PartitionKeyRange(object):
class PartitionKeyRange(object):
"""Partition Key Range Constants"""
MinInclusive = 'minInclusive'
MaxExclusive = 'maxExclusive'
Id = 'id'
Parents = 'parents'

class _Range(object):
class Range(object):
"""description of class"""

MinPath = 'min'
Expand Down Expand Up @@ -58,13 +58,13 @@ def contains(self, value):

@classmethod
def PartitionKeyRangeToRange(cls, partition_key_range):
self = cls(partition_key_range[_PartitionKeyRange.MinInclusive], partition_key_range[_PartitionKeyRange.MaxExclusive],
self = cls(partition_key_range[PartitionKeyRange.MinInclusive], partition_key_range[PartitionKeyRange.MaxExclusive],
True, False)
return self

@classmethod
def ParseFromDict(cls, range_as_dict):
self = cls(range_as_dict[_Range.MinPath], range_as_dict[_Range.MaxPath], range_as_dict[_Range.IsMinInclusivePath], range_as_dict[_Range.IsMaxInclusivePath])
self = cls(range_as_dict[Range.MinPath], range_as_dict[Range.MaxPath], range_as_dict[Range.IsMinInclusivePath], range_as_dict[Range.IsMaxInclusivePath])
return self

def isSingleValue(self):
Expand Down Expand Up @@ -97,8 +97,8 @@ def overlaps(range1, range2):
if range1 is None or range2 is None: return False
if range1.isEmpty() or range2.isEmpty(): return False

cmp1 = _Range._compare_helper(range1.min, range2.max)
cmp2 = _Range._compare_helper(range2.min, range1.max)
cmp1 = Range._compare_helper(range1.min, range2.max)
cmp2 = Range._compare_helper(range2.min, range1.max)

if (cmp1 <= 0 or cmp2 <= 0):
if ((cmp1 == 0 and not(range1.isMinInclusive and range2.isMaxInclusive)) or (cmp2 == 0 and not(range2.isMinInclusive and range1.isMaxInclusive))):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from . import request_object
from . import synchronized_request
from . import global_endpoint_manager
from .routing import routing_map_provider as routing_map_provider
from ._routing import routing_map_provider as routing_map_provider
from . import session
from . import _utils
from .partition_key import _Undefined, _Empty
Expand Down Expand Up @@ -150,7 +150,7 @@ def __init__(self,
self._query_compatibility_mode = CosmosClientConnection._QueryCompatibilityMode.Default

# Routing map provider
self._routing_map_provider = routing_map_provider._SmartRoutingMapProvider(self)
self._routing_map_provider = routing_map_provider.SmartRoutingMapProvider(self)

database_account = self._global_endpoint_manager._GetDatabaseAccount()
self._global_endpoint_manager.force_refresh(database_account)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import heapq
from azure.cosmos.execution_context.base_execution_context import _QueryExecutionContextBase
from azure.cosmos.execution_context import document_producer
from azure.cosmos.routing import routing_range
from azure.cosmos._routing import routing_range

class _MultiExecutionContextAggregator(_QueryExecutionContextBase):
"""This class is capable of queries which requires rewriting based on
Expand Down Expand Up @@ -147,4 +147,4 @@ def _createTargetPartitionQueryExecutionContext(self, partition_key_target_range
def _get_target_parition_key_range(self):

query_ranges = self._partitioned_query_ex_info.get_query_ranges()
return self._routing_provider.get_overlapping_ranges(self._resource_link, [routing_range._Range.ParseFromDict(range_as_dict) for range_as_dict in query_ranges])
return self._routing_provider.get_overlapping_ranges(self._resource_link, [routing_range.Range.ParseFromDict(range_as_dict) for range_as_dict in query_ranges])
Loading