Skip to content
Prev Previous commit
Next Next commit
Merge remote-tracking branch 'upstream/master' into cosmos-pipeline
  • Loading branch information
annatisch committed Aug 27, 2019
commit 81d0a628569c892960af326fd63a5f3680adbe40
98 changes: 51 additions & 47 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import requests

import six
from typing import cast
from azure.core import PipelineClient
from azure.core.pipeline.policies import (
ContentDecodePolicy,
Expand Down Expand Up @@ -79,12 +78,9 @@ class _QueryCompatibilityMode:
_DefaultStringHashPrecision = 3
_DefaultStringRangePrecision = -1

def __init__(self,
url_connection,
auth,
connection_policy=None,
consistency_level=documents.ConsistencyLevel.Session,
**kwargs):
def __init__(
self, url_connection, auth, connection_policy=None, consistency_level=documents.ConsistencyLevel.Session
):
"""
:param str url_connection:
The URL for connecting to the DB server.
Expand Down Expand Up @@ -2405,16 +2401,18 @@ def __Get(self, path, request, headers):
tuple of (dict, dict)

"""
return synchronized_request.SynchronizedRequest(self,
request,
self._global_endpoint_manager,
self.connection_policy,
self.pipeline_client,
'GET',
path,
None,
None,
headers)
return synchronized_request.SynchronizedRequest(
self,
request,
self._global_endpoint_manager,
self.connection_policy,
self.pipeline_client,
"GET",
path,
None,
None,
headers,
)

def __Post(self, path, request, body, headers):
"""Azure Cosmos 'POST' http request.
Expand All @@ -2430,16 +2428,18 @@ def __Post(self, path, request, body, headers):
tuple of (dict, dict)

"""
return synchronized_request.SynchronizedRequest(self,
request,
self._global_endpoint_manager,
self.connection_policy,
self.pipeline_client,
'POST',
path,
body,
query_params=None,
headers=headers)
return synchronized_request.SynchronizedRequest(
self,
request,
self._global_endpoint_manager,
self.connection_policy,
self.pipeline_client,
"POST",
path,
body,
query_params=None,
headers=headers,
)

def __Put(self, path, request, body, headers):
"""Azure Cosmos 'PUT' http request.
Expand All @@ -2455,16 +2455,18 @@ def __Put(self, path, request, body, headers):
tuple of (dict, dict)

"""
return synchronized_request.SynchronizedRequest(self,
request,
self._global_endpoint_manager,
self.connection_policy,
self.pipeline_client,
'PUT',
path,
body,
query_params=None,
headers=headers)
return synchronized_request.SynchronizedRequest(
self,
request,
self._global_endpoint_manager,
self.connection_policy,
self.pipeline_client,
"PUT",
path,
body,
query_params=None,
headers=headers,
)

def __Delete(self, path, request, headers):
"""Azure Cosmos 'DELETE' http request.
Expand All @@ -2479,16 +2481,18 @@ def __Delete(self, path, request, headers):
tuple of (dict, dict)

"""
return synchronized_request.SynchronizedRequest(self,
request,
self._global_endpoint_manager,
self.connection_policy,
self.pipeline_client,
'DELETE',
path,
request_data=None,
query_params=None,
headers=headers)
return synchronized_request.SynchronizedRequest(
self,
request,
self._global_endpoint_manager,
self.connection_policy,
self.pipeline_client,
"DELETE",
path,
request_data=None,
query_params=None,
headers=headers,
)

def QueryFeed(self, path, collection_id, query, options, partition_key_range_id=None):
"""Query Feed for Document Collection resource.
Expand Down
4 changes: 2 additions & 2 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_default_retry_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ def __init__(self, *args):

def needsRetry(self, error_code):
if error_code in DefaultRetryPolicy.CONNECTION_ERROR_CODES:
if (len(self.args) > 0):
if (self.args[3].method == 'GET') or (http_constants.HttpHeaders.IsQuery in self.args[3].headers):
if self.args:
if (self.args[3].method == "GET") or (http_constants.HttpHeaders.IsQuery in self.args[3].headers):
return True
return False
return True
Expand Down
49 changes: 31 additions & 18 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_synchronized_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from . import http_constants
from . import _retry_utility


def _is_readable_stream(obj):
"""Checks whether obj is a file-like readable stream.

Expand Down Expand Up @@ -69,7 +70,7 @@ def _Request(global_endpoint_manager, request_options, connection_policy, pipeli
# Every request tries to perform a refresh
global_endpoint_manager.refresh_endpoint_list(None)

if (request_options.endpoint_override):
if request_options.endpoint_override:
base_url = request_options.endpoint_override
else:
base_url = global_endpoint_manager.resolve_service_endpoint(request_options)
Expand All @@ -80,7 +81,7 @@ def _Request(global_endpoint_manager, request_options, connection_policy, pipeli

# The requests library now expects header values to be strings only starting 2.11,
# and will raise an error on validation if they are not, so casting all header values to strings.
request.headers.update({ header: str(value) for header, value in request.headers.items() })
request.headers.update({header: str(value) for header, value in request.headers.items()})

# We are disabling the SSL verification for local emulator(localhost/127.0.0.1) or if the user
# has explicitly specified to disable SSL verification.
Expand All @@ -98,15 +99,16 @@ def _Request(global_endpoint_manager, request_options, connection_policy, pipeli
stream=is_media_stream,
connection_timeout=connection_timeout / 1000.0,
connection_verify=ca_certs,
connection_cert=cert_files)

connection_cert=cert_files
)
else:
response = pipeline_client._pipeline.run(
request,
stream=is_media_stream,
connection_timeout=connection_timeout / 1000.0,
# If SSL is disabled, verify = false
connection_verify=is_ssl_enabled)
connection_verify=is_ssl_enabled
)

response = response.http_response
headers = dict(response.headers)
Expand Down Expand Up @@ -136,16 +138,19 @@ def _Request(global_endpoint_manager, request_options, connection_policy, pipeli

return (result, headers)

def SynchronizedRequest(client,
request_options,
global_endpoint_manager,
connection_policy,
pipeline_client,
method,
path,
request_data,
query_params,
headers):

def SynchronizedRequest(
client,
request_options,
global_endpoint_manager,
connection_policy,
pipeline_client,
method,
path,
request_data,
query_params,
headers,
):
"""Performs one synchronized http request according to the parameters.

:param object client:
Expand Down Expand Up @@ -176,13 +181,21 @@ def SynchronizedRequest(client,
headers=headers,
content=request_data if not is_stream else None,
form_content=None,
stream_content=request_data if is_stream else None)
stream_content=request_data if is_stream else None
)

if request.data and isinstance(request.data, six.string_types):
request.headers[http_constants.HttpHeaders.ContentLength] = len(request.data)
elif request.data is None:
request.headers[http_constants.HttpHeaders.ContentLength] = 0

# Pass _Request function with it's parameters to retry_utility's Execute method that wraps the call with retries
return _retry_utility.Execute(client, global_endpoint_manager, _Request, request_options, connection_policy, pipeline_client, request)

return _retry_utility.Execute(
client,
global_endpoint_manager,
_Request,
request_options,
connection_policy,
pipeline_client,
request
)
You are viewing a condensed version of this merge commit. You can view the full changes here.