Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 17 additions & 18 deletions sdk/cosmos/azure-cosmos/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export ACCOUNT_KEY=$(az cosmosdb list-keys --resource-group $RES_GROUP --name $A
Once you've populated the `ACCOUNT_URI` and `ACCOUNT_KEY` environment variables, you can create the [CosmosClient][ref_cosmosclient].

```Python
from azure.cosmos import CosmosClient, PartitionKey, errors
from azure.cosmos import CosmosClient, PartitionKey, exceptions

import os
url = os.environ['ACCOUNT_URI']
Expand Down Expand Up @@ -104,7 +104,7 @@ After authenticating your [CosmosClient][ref_cosmosclient], you can work with an
database_name = 'testDatabase'
try:
database = client.create_database(database_name)
except errors.CosmosResourceExistsError:
except exceptions.CosmosResourceExistsError:
database = client.get_database_client(database_name)
```

Expand All @@ -116,9 +116,9 @@ This example creates a container with default settings. If a container with the
container_name = 'products'
try:
container = database.create_container(id=container_name, partition_key=PartitionKey(path="/productName"))
except errors.CosmosResourceExistsError:
except exceptions.CosmosResourceExistsError:
container = database.get_container_client(container_name)
except errors.CosmosHttpResponseError:
except exceptions.CosmosHttpResponseError:
raise
```

Expand Down Expand Up @@ -264,7 +264,7 @@ the client level to enable it for all requests.

### General

When you interact with Cosmos DB using the Python SDK, errors returned by the service correspond to the same HTTP status codes returned for REST API requests:
When you interact with Cosmos DB using the Python SDK, exceptions returned by the service correspond to the same HTTP status codes returned for REST API requests:

[HTTP Status Codes for Azure Cosmos DB][cosmos_http_status_codes]

Expand All @@ -273,7 +273,7 @@ For example, if you try to create a container using an ID (name) that's already
```Python
try:
database.create_container(id=container_name, partition_key=PartitionKey(path="/productName")
except errors.CosmosResourceExistsError:
except exceptions.CosmosResourceExistsError:
print("""Error creating container
HTTP status code 409: The ID (name) provided for the container is already in use.
The container name must be unique within the database.""")
Expand Down Expand Up @@ -305,23 +305,22 @@ For more extensive documentation on the Cosmos DB service, see the [Azure Cosmos
[cosmos_sql_queries]: https://docs.microsoft.com/azure/cosmos-db/how-to-sql-query
[cosmos_ttl]: https://docs.microsoft.com/azure/cosmos-db/time-to-live
[python]: https://www.python.org/downloads/
[ref_container_delete_item]: https://azure.github.io/azure-sdk-for-python/ref/azure.cosmos.html#azure.cosmos.ContainerProxy.delete_item
[ref_container_query_items]: https://azure.github.io/azure-sdk-for-python/ref/azure.cosmos.html#azure.cosmos.ContainerProxy.query_items
[ref_container_upsert_item]: https://azure.github.io/azure-sdk-for-python/ref/azure.cosmos.html#azure.cosmos.ContainerProxy.upsert_item
[ref_container]: https://azure.github.io/azure-sdk-for-python/ref/azure.cosmos.html#azure.cosmos.ContainerProxy
[ref_cosmos_sdk]: https://azure.github.io/azure-sdk-for-python/ref/azure.cosmos.html
[ref_cosmosclient_create_database]: https://azure.github.io/azure-sdk-for-python/ref/azure.cosmos.html#azure.cosmos.CosmosClient.create_database
[ref_cosmosclient]: https://azure.github.io/azure-sdk-for-python/ref/azure.cosmos.html#azure.cosmos.CosmosClient
[ref_database]: https://azure.github.io/azure-sdk-for-python/ref/azure.cosmos.html#azure.cosmos.DatabaseProxy
[ref_httpfailure]: https://azure.github.io/azure-sdk-for-python/ref/azure.cosmos.errors.html#azure.cosmos.errors.CosmosHttpResponseError
[sample_database_mgmt]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/cosmos/azure-cosmos/samples/DatabaseManagement
[sample_document_mgmt]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/cosmos/azure-cosmos/samples/DocumentManagement
[ref_container_delete_item]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-cosmos/4.0.0b5/azure.cosmos.html#azure.cosmos.ContainerProxy.delete_item
[ref_container_query_items]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-cosmos/4.0.0b5/azure.cosmos.html#azure.cosmos.ContainerProxy.query_items
[ref_container_upsert_item]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-cosmos/4.0.0b5/azure.cosmos.html#azure.cosmos.ContainerProxy.upsert_item
[ref_container]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-cosmos/4.0.0b5/azure.cosmos.html#azure.cosmos.ContainerProxy
[ref_cosmos_sdk]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-cosmos/4.0.0b5/azure.cosmos.html
[ref_cosmosclient_create_database]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-cosmos/4.0.0b5/azure.cosmos.html#azure.cosmos.CosmosClient.create_database
[ref_cosmosclient]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-cosmos/4.0.0b5/azure.cosmos.html#azure.cosmos.CosmosClient
[ref_database]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-cosmos/4.0.0b5/azure.cosmos.html#azure.cosmos.DatabaseProxy
[ref_httpfailure]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-cosmos/4.0.0b5/azure.cosmos.html#azure.cosmos.exceptions.CosmosHttpResponseError
[sample_database_mgmt]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/cosmos/azure-cosmos/samples/database_management.py
[sample_document_mgmt]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/cosmos/azure-cosmos/samples/document_management.py
[sample_examples_misc]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/cosmos/azure-cosmos/samples/examples.py
[source_code]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/cosmos/azure-cosmos
[venv]: https://docs.python.org/3/library/venv.html
[virtualenv]: https://virtualenv.pypa.io


# Contributing

This project welcomes contributions and suggestions. Most contributions require you to agree to a
Expand Down
2 changes: 1 addition & 1 deletion sdk/cosmos/azure-cosmos/azure/cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

from ._version import VERSION
from ._retry_utility import ConnectionRetryPolicy
from .container import ContainerProxy
from .cosmos_client import CosmosClient
Expand All @@ -40,7 +41,6 @@
)
from .partition_key import PartitionKey
from .permission import Permission
from .version import VERSION

__all__ = (
"CosmosClient",
Expand Down
54 changes: 49 additions & 5 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import six
from six.moves.urllib.parse import quote as urllib_quote

from azure.core import MatchConditions

from . import auth
from . import documents
from . import partition_key
Expand Down Expand Up @@ -63,20 +65,50 @@
'continuation': 'continuation',
'is_start_from_beginning': 'isStartFromBeginning',
'populate_partition_key_range_statistics': 'populatePartitionKeyRangeStatistics',
'populate_quota_info': 'populateQuotaInfo'
'populate_quota_info': 'populateQuotaInfo',
'content_type': 'contentType',
'is_query_plan_request': 'isQueryPlanRequest',
'supported_query_features': 'supportedQueryFeatures',
'query_version': 'queryVersion'
}

def _get_match_headers(kwargs):
# type: (Dict[str, Any]) -> Tuple(Optional[str], Optional[str])
if_match = kwargs.pop('if_match', None)
if_none_match = kwargs.pop('if_none_match', None)
match_condition = kwargs.pop('match_condition', None)
if match_condition == MatchConditions.IfNotModified:
if_match = kwargs.pop('etag', None)
if not if_match:
raise ValueError("'match_condition' specified without 'etag'.")
elif match_condition == MatchConditions.IfPresent:
if_match = '*'
elif match_condition == MatchConditions.IfModified:
if_none_match = kwargs.pop('etag', None)
if not if_none_match:
raise ValueError("'match_condition' specified without 'etag'.")
elif match_condition == MatchConditions.IfMissing:
if_none_match = '*'
elif match_condition is None:
if 'etag' in kwargs:
raise ValueError("'etag' specified without 'match_condition'.")
else:
raise TypeError("Invalid match condition: {}".format(match_condition))
return if_match, if_none_match


def build_options(kwargs):
# type: (Dict[str, Any]) -> Dict[str, Any]
options = kwargs.pop('request_options', kwargs.pop('feed_options', {}))
for key, value in _COMMON_OPTIONS.items():
if key in kwargs:
options[value] = kwargs.pop(key)

if 'if_match' in kwargs:
options['accessCondition'] = {'type': 'IfMatch', 'condition': kwargs.pop('if_match')}
if 'if_none_match' in kwargs:
options['accessCondition'] = {'type': 'IfNoneMatch', 'condition': kwargs.pop('if_none_match')}
if_match, if_none_match = _get_match_headers(kwargs)
if if_match:
options['accessCondition'] = {'type': 'IfMatch', 'condition': if_match}
if if_none_match:
options['accessCondition'] = {'type': 'IfNoneMatch', 'condition': if_none_match}
return options


Expand Down Expand Up @@ -178,6 +210,18 @@ def GetHeaders( # pylint: disable=too-many-statements,too-many-branches
if options.get("offerThroughput"):
headers[http_constants.HttpHeaders.OfferThroughput] = options["offerThroughput"]

if options.get("contentType"):
headers[http_constants.HttpHeaders.ContentType] = options['contentType']

if options.get("isQueryPlanRequest"):
headers[http_constants.HttpHeaders.IsQueryPlanRequest] = options['isQueryPlanRequest']

if options.get("supportedQueryFeatures"):
headers[http_constants.HttpHeaders.SupportedQueryFeatures] = options['supportedQueryFeatures']

if options.get("queryVersion"):
headers[http_constants.HttpHeaders.QueryVersion] = options['queryVersion']

if "partitionKey" in options:
# if partitionKey value is Undefined, serialize it as [{}] to be consistent with other SDKs.
if options.get("partitionKey") is partition_key._Undefined:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2625,6 +2625,7 @@ def __QueryFeed(
options=None,
partition_key_range_id=None,
response_hook=None,
is_query_plan=False,
**kwargs
):
"""Query for more than one Azure Cosmos resources.
Expand All @@ -2639,6 +2640,9 @@ def __QueryFeed(
The request options for the request.
:param str partition_key_range_id:
Specifies partition key range id.
:param function response_hook:
:param bool is_query_plan:
Specififes if the call is to fetch query plan

:rtype:
list
Expand All @@ -2664,7 +2668,8 @@ def __GetBodiesFromQueryResult(result):
# Copy to make sure that default_headers won't be changed.
if query is None:
# Query operations will use ReadEndpoint even though it uses GET(for feed requests)
request_params = _request_object.RequestObject(typ, documents._OperationType.ReadFeed)
request_params = _request_object.RequestObject(typ,
documents._OperationType.QueryPlan if is_query_plan else documents._OperationType.ReadFeed)
headers = base.GetHeaders(self, initial_headers, "get", path, id_, typ, options, partition_key_range_id)
result, self.last_response_headers = self.__Get(path, request_params, headers, **kwargs)
if response_hook:
Expand All @@ -2674,6 +2679,9 @@ def __GetBodiesFromQueryResult(result):
query = self.__CheckAndUnifyQueryFormat(query)

initial_headers[http_constants.HttpHeaders.IsQuery] = "true"
if not is_query_plan:
initial_headers[http_constants.HttpHeaders.IsQuery] = "true"

if (
self._query_compatibility_mode == CosmosClientConnection._QueryCompatibilityMode.Default
or self._query_compatibility_mode == CosmosClientConnection._QueryCompatibilityMode.Query
Expand All @@ -2694,6 +2702,36 @@ def __GetBodiesFromQueryResult(result):

return __GetBodiesFromQueryResult(result)

def _GetQueryPlanThroughGateway(self, query, resource_link, **kwargs):
supported_query_features = (documents._QueryFeature.Aggregate + "," +
documents._QueryFeature.CompositeAggregate + "," +
documents._QueryFeature.Distinct + "," +
documents._QueryFeature.MultipleOrderBy + "," +
documents._QueryFeature.OffsetAndLimit + "," +
documents._QueryFeature.OrderBy + "," +
documents._QueryFeature.Top)

options = {
"contentType": runtime_constants.MediaTypes.Json,
"isQueryPlanRequest": True,
"supportedQueryFeatures": supported_query_features,
"queryVersion": http_constants.Versions.QueryVersion
}

resource_link = base.TrimBeginningAndEndingSlashes(resource_link)
path = base.GetPathFromLink(resource_link, "docs")
resource_id = base.GetResourceIdOrFullNameFromLink(resource_link)

return self.__QueryFeed(path,
"docs",
resource_id,
lambda r: r,
None,
query,
options,
is_query_plan=True,
**kwargs)

def __CheckAndUnifyQueryFormat(self, query_body):
"""Checks and unifies the format of the query body.

Expand Down
5 changes: 3 additions & 2 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_default_retry_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ def __init__(self, *args):
def needsRetry(self, error_code):
if error_code in DefaultRetryPolicy.CONNECTION_ERROR_CODES:
if self.args:
if (self.args[3].method == "GET") or (http_constants.HttpHeaders.IsQuery in self.args[3].headers):
if (self.args[3].method == "GET") or (http_constants.HttpHeaders.IsQuery in self.args[3].headers) \
or (http_constants.HttpHeaders.IsQueryPlanRequest in self.args[3].headers):
return True
return False
return True
Expand All @@ -66,7 +67,7 @@ def needsRetry(self, error_code):
def ShouldRetry(self, exception):
"""Returns true if should retry based on the passed-in exception.

:param (errors.CosmosHttpResponseError instance) exception:
:param (exceptions.CosmosHttpResponseError instance) exception:

:rtype:
boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def __init__(self, connection_policy, global_endpoint_manager, *args):
def ShouldRetry(self, exception): # pylint: disable=unused-argument
"""Returns true if should retry based on the passed-in exception.

:param (errors.CosmosHttpResponseError instance) exception:
:param (exceptions.CosmosHttpResponseError instance) exception:

:rtype:
boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from collections import deque
from .. import _retry_utility
from .. import http_constants
from .. import _base

# pylint: disable=protected-access

Expand Down Expand Up @@ -171,100 +170,3 @@ def __init__(self, client, options, fetch_function):
def _fetch_next_block(self):
while super(_DefaultQueryExecutionContext, self)._has_more_pages() and not self._buffer:
return self._fetch_items_helper_with_retries(self._fetch_function)


class _MultiCollectionQueryExecutionContext(_QueryExecutionContextBase):
"""
This class is used if it is client side partitioning
"""

def __init__(self, client, options, database_link, query, partition_key):
"""
Constructor
:param CosmosClient client:
:param dict options:
The request options for the request.
:param str database_link: database self link or ID based link
:param (str or dict) query:
Partition_key (str): partition key for the query

"""
super(_MultiCollectionQueryExecutionContext, self).__init__(client, options)

self._current_collection_index = 0
self._collection_links = []
self._collection_links_length = 0

self._query = query
self._client = client

partition_resolver = client.GetPartitionResolver(database_link)

if partition_resolver is None:
raise ValueError(client.PartitionResolverErrorMessage)

self._collection_links = partition_resolver.ResolveForRead(partition_key)

self._collection_links_length = len(self._collection_links)

if self._collection_links is None:
raise ValueError("_collection_links is None.")

if self._collection_links_length <= 0:
raise ValueError("_collection_links_length is not greater than 0.")

# Creating the QueryFeed for the first collection
path = _base.GetPathFromLink(self._collection_links[self._current_collection_index], "docs")
collection_id = _base.GetResourceIdOrFullNameFromLink(self._collection_links[self._current_collection_index])

self._current_collection_index += 1

def fetch_fn(options):
return client.QueryFeed(path, collection_id, query, options)

self._fetch_function = fetch_fn

def _has_more_pages(self):
return (
not self._has_started
or self._continuation
or (self._collection_links and self._current_collection_index < self._collection_links_length)
)

def _fetch_next_block(self):
"""Fetches the next block of query results.

This iterates fetches the next block of results from the current collection link.
Once the current collection results were exhausted. It moves to the next collection link.

:return:
List of fetched items.
:rtype: list
"""
# Fetch next block of results by executing the query against the current document collection
fetched_items = self._fetch_items_helper_with_retries(self._fetch_function)

# If there are multiple document collections to query for(in case of partitioning),
# keep looping through each one of them, creating separate feed queries for each
# collection and fetching the items
while not fetched_items:
if self._collection_links and self._current_collection_index < self._collection_links_length:
path = _base.GetPathFromLink(self._collection_links[self._current_collection_index], "docs")
collection_id = _base.GetResourceIdOrFullNameFromLink(
self._collection_links[self._current_collection_index]
)

self._continuation = None
self._has_started = False

def fetch_fn(options):
return self._client.QueryFeed(path, collection_id, self._query, options)

self._fetch_function = fetch_fn

fetched_items = self._fetch_items_helper_with_retries(self._fetch_function)
self._current_collection_index += 1
else:
break

return fetched_items
Loading