Skip to content
Prev Previous commit
Next Next commit
Refactored request creation
  • Loading branch information
annatisch committed Aug 28, 2019
commit d628d91bc6922a6a5e84271a5a22be6c6836ac24
156 changes: 81 additions & 75 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1801,7 +1801,7 @@ def fetch_fn(options):

return query_iterable.QueryIterable(self, query, options, fetch_fn)

def ReadMedia(self, media_link):
def ReadMedia(self, media_link, **kwargs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should consider removing ReadMedia and UpdateMedia at some point, since we dont expose it via container or database or item in the new object model.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @srinathnarayanan
Just to confirm - ReadMedia and UpdateMedia are now deprecated features? Or do they need to be exposed in the wrapping API layer?
Either way - I will not change it in this PR - however I will open an issue to resolve it in a subsequent one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are deprecated. Attachment related APIs too. I'll get a confirmation from @christopheranderson on whether we are planning to remove them for good.

"""Reads a media.

When self.connection_policy.MediaReadMode ==
Expand All @@ -1825,11 +1825,11 @@ def ReadMedia(self, media_link):
headers = base.GetHeaders(self, default_headers, "get", path, attachment_id, "media", {})

# ReadMedia will always use WriteEndpoint since it's not replicated in readable Geo regions
request = _request_object.RequestObject("media", documents._OperationType.Read)
result, self.last_response_headers = self.__Get(path, request, headers)
request_options = _request_object.RequestObject("media", documents._OperationType.Read)
result, self.last_response_headers = self.__Get(path, request_options, headers, **kwargs)
return result

def UpdateMedia(self, media_link, readable_stream, options=None):
def UpdateMedia(self, media_link, readable_stream, options=None, **kwargs):
"""Updates a media and returns it.

:param str media_link:
Expand Down Expand Up @@ -1864,8 +1864,8 @@ def UpdateMedia(self, media_link, readable_stream, options=None):
headers = base.GetHeaders(self, initial_headers, "put", path, attachment_id, "media", options)

# UpdateMedia will use WriteEndpoint since it uses PUT operation
request = _request_object.RequestObject("media", documents._OperationType.Update)
result, self.last_response_headers = self.__Put(path, request, readable_stream, headers)
request_options = _request_object.RequestObject("media", documents._OperationType.Update)
result, self.last_response_headers = self.__Put(path, request_options, readable_stream, headers, **kwargs)

self._UpdateSessionIfRequired(headers, result, self.last_response_headers)
return result
Expand Down Expand Up @@ -2014,7 +2014,7 @@ def DeleteUserDefinedFunction(self, udf_link, options=None):
udf_id = base.GetResourceIdOrFullNameFromLink(udf_link)
return self.DeleteResource(path, "udfs", udf_id, None, options)

def ExecuteStoredProcedure(self, sproc_link, params, options=None):
def ExecuteStoredProcedure(self, sproc_link, params, options=None, **kwargs):
"""Executes a store procedure.

:param str sproc_link:
Expand Down Expand Up @@ -2044,8 +2044,8 @@ def ExecuteStoredProcedure(self, sproc_link, params, options=None):
headers = base.GetHeaders(self, initial_headers, "post", path, sproc_id, "sprocs", options)

# ExecuteStoredProcedure will use WriteEndpoint since it uses POST operation
request = _request_object.RequestObject("sprocs", documents._OperationType.ExecuteJavaScript)
result, self.last_response_headers = self.__Post(path, request, params, headers)
request_options = _request_object.RequestObject("sprocs", documents._OperationType.ExecuteJavaScript)
result, self.last_response_headers = self.__Post(path, request_options, params, headers, **kwargs)
return result

def ReplaceStoredProcedure(self, sproc_link, sproc, options=None):
Expand Down Expand Up @@ -2194,7 +2194,7 @@ def fetch_fn(options):

return query_iterable.QueryIterable(self, query, options, fetch_fn)

def GetDatabaseAccount(self, url_connection=None):
def GetDatabaseAccount(self, url_connection=None, **kwargs):
"""Gets database account info.

:return:
Expand All @@ -2209,8 +2209,8 @@ def GetDatabaseAccount(self, url_connection=None):
initial_headers = dict(self.default_headers)
headers = base.GetHeaders(self, initial_headers, "get", "", "", "", {}) # path # id # type

request = _request_object.RequestObject("databaseaccount", documents._OperationType.Read, url_connection)
result, self.last_response_headers = self.__Get("", request, headers)
request_options = _request_object.RequestObject("databaseaccount", documents._OperationType.Read, url_connection)
result, self.last_response_headers = self.__Get("", request_options, headers, **kwargs)
database_account = documents.DatabaseAccount()
database_account.DatabasesLink = "/dbs/"
database_account.MediaLink = "/media/"
Expand Down Expand Up @@ -2239,7 +2239,7 @@ def GetDatabaseAccount(self, url_connection=None):
)
return database_account

def Create(self, body, path, typ, id, initial_headers, options=None): # pylint: disable=redefined-builtin
def Create(self, body, path, typ, id, initial_headers, options=None, **kwargs): # pylint: disable=redefined-builtin
"""Creates a Azure Cosmos resource and returns it.

:param dict body:
Expand All @@ -2263,14 +2263,14 @@ def Create(self, body, path, typ, id, initial_headers, options=None): # pylint:
headers = base.GetHeaders(self, initial_headers, "post", path, id, typ, options)
# Create will use WriteEndpoint since it uses POST operation

request = _request_object.RequestObject(typ, documents._OperationType.Create)
result, self.last_response_headers = self.__Post(path, request, body, headers)
request_options = _request_object.RequestObject(typ, documents._OperationType.Create)
result, self.last_response_headers = self.__Post(path, request_options, body, headers, **kwargs)

# update session for write request
self._UpdateSessionIfRequired(headers, result, self.last_response_headers)
return result

def Upsert(self, body, path, typ, id, initial_headers, options=None): # pylint: disable=redefined-builtin
def Upsert(self, body, path, typ, id, initial_headers, options=None, **kwargs): # pylint: disable=redefined-builtin
"""Upserts a Azure Cosmos resource and returns it.

:param dict body:
Expand All @@ -2296,13 +2296,13 @@ def Upsert(self, body, path, typ, id, initial_headers, options=None): # pylint:
headers[http_constants.HttpHeaders.IsUpsert] = True

# Upsert will use WriteEndpoint since it uses POST operation
request = _request_object.RequestObject(typ, documents._OperationType.Upsert)
result, self.last_response_headers = self.__Post(path, request, body, headers)
request_options = _request_object.RequestObject(typ, documents._OperationType.Upsert)
result, self.last_response_headers = self.__Post(path, request_options, body, headers, **kwargs)
# update session for write request
self._UpdateSessionIfRequired(headers, result, self.last_response_headers)
return result

def Replace(self, resource, path, typ, id, initial_headers, options=None): # pylint: disable=redefined-builtin
def Replace(self, resource, path, typ, id, initial_headers, options=None, **kwargs): # pylint: disable=redefined-builtin
"""Replaces a Azure Cosmos resource and returns it.

:param dict resource:
Expand All @@ -2325,14 +2325,14 @@ def Replace(self, resource, path, typ, id, initial_headers, options=None): # py
initial_headers = initial_headers or self.default_headers
headers = base.GetHeaders(self, initial_headers, "put", path, id, typ, options)
# Replace will use WriteEndpoint since it uses PUT operation
request = _request_object.RequestObject(typ, documents._OperationType.Replace)
result, self.last_response_headers = self.__Put(path, request, resource, headers)
request_options = _request_object.RequestObject(typ, documents._OperationType.Replace)
result, self.last_response_headers = self.__Put(path, request_options, resource, headers, **kwargs)

# update session for request mutates data on server side
self._UpdateSessionIfRequired(headers, result, self.last_response_headers)
return result

def Read(self, path, typ, id, initial_headers, options=None): # pylint: disable=redefined-builtin
def Read(self, path, typ, id, initial_headers, options=None, **kwargs): # pylint: disable=redefined-builtin
"""Reads a Azure Cosmos resource and returns it.

:param str path:
Expand All @@ -2354,11 +2354,11 @@ def Read(self, path, typ, id, initial_headers, options=None): # pylint: disable
initial_headers = initial_headers or self.default_headers
headers = base.GetHeaders(self, initial_headers, "get", path, id, typ, options)
# Read will use ReadEndpoint since it uses GET operation
request = _request_object.RequestObject(typ, documents._OperationType.Read)
result, self.last_response_headers = self.__Get(path, request, headers)
request_options = _request_object.RequestObject(typ, documents._OperationType.Read)
result, self.last_response_headers = self.__Get(path, request_options, headers, **kwargs)
return result

def DeleteResource(self, path, typ, id, initial_headers, options=None): # pylint: disable=redefined-builtin
def DeleteResource(self, path, typ, id, initial_headers, options=None, **kwargs): # pylint: disable=redefined-builtin
"""Deletes a Azure Cosmos resource and returns it.

:param str path:
Expand All @@ -2380,15 +2380,15 @@ def DeleteResource(self, path, typ, id, initial_headers, options=None): # pylin
initial_headers = initial_headers or self.default_headers
headers = base.GetHeaders(self, initial_headers, "delete", path, id, typ, options)
# Delete will use WriteEndpoint since it uses DELETE operation
request = _request_object.RequestObject(typ, documents._OperationType.Delete)
result, self.last_response_headers = self.__Delete(path, request, headers)
request_options = _request_object.RequestObject(typ, documents._OperationType.Delete)
result, self.last_response_headers = self.__Delete(path, request_options, headers, **kwargs)

# update session for request mutates data on server side
self._UpdateSessionIfRequired(headers, result, self.last_response_headers)

return result

def __Get(self, path, request, headers):
def __Get(self, path, request_options, headers, **kwargs):
"""Azure Cosmos 'GET' http request.

:params str url:
Expand All @@ -2401,20 +2401,19 @@ def __Get(self, path, request, headers):
tuple of (dict, dict)

"""
request = self.pipeline_client.get(url=path, headers=headers)
return synchronized_request.SynchronizedRequest(
self,
request,
self._global_endpoint_manager,
self.connection_policy,
self.pipeline_client,
"GET",
path,
None,
None,
headers,
client=self,
request_options=request_options,
global_endpoint_manager=self._global_endpoint_manager,
connection_policy=self.connection_policy,
pipeline_client=self.pipeline_client,
request=request,
request_data=None,
**kwargs
)

def __Post(self, path, request, body, headers):
def __Post(self, path, request_options, body, headers, **kwargs):
"""Azure Cosmos 'POST' http request.

:params str url:
Expand All @@ -2428,20 +2427,19 @@ def __Post(self, path, request, body, headers):
tuple of (dict, dict)

"""
request = self.pipeline_client.post(url=path, headers=headers)
return synchronized_request.SynchronizedRequest(
self,
request,
self._global_endpoint_manager,
self.connection_policy,
self.pipeline_client,
"POST",
path,
body,
query_params=None,
headers=headers,
client=self,
request_options=request_options,
global_endpoint_manager=self._global_endpoint_manager,
connection_policy=self.connection_policy,
pipeline_client=self.pipeline_client,
request=request,
request_data=body,
**kwargs
)

def __Put(self, path, request, body, headers):
def __Put(self, path, request_options, body, headers, **kwargs):
"""Azure Cosmos 'PUT' http request.

:params str url:
Expand All @@ -2455,20 +2453,19 @@ def __Put(self, path, request, body, headers):
tuple of (dict, dict)

"""
request = self.pipeline_client.put(url=path, headers=headers)
return synchronized_request.SynchronizedRequest(
self,
request,
self._global_endpoint_manager,
self.connection_policy,
self.pipeline_client,
"PUT",
path,
body,
query_params=None,
headers=headers,
client=self,
request_options=request_options,
global_endpoint_manager=self._global_endpoint_manager,
connection_policy=self.connection_policy,
pipeline_client=self.pipeline_client,
request=request,
request_data=body,
**kwargs
)

def __Delete(self, path, request, headers):
def __Delete(self, path, request_options, headers, **kwargs):
"""Azure Cosmos 'DELETE' http request.

:params str url:
Expand All @@ -2481,17 +2478,16 @@ def __Delete(self, path, request, headers):
tuple of (dict, dict)

"""
request = self.pipeline_client.delete(url=path, headers=headers)
return synchronized_request.SynchronizedRequest(
self,
request,
self._global_endpoint_manager,
self.connection_policy,
self.pipeline_client,
"DELETE",
path,
client=self,
request_options=request_options,
global_endpoint_manager=self._global_endpoint_manager,
connection_policy=self.connection_policy,
pipeline_client=self.pipeline_client,
request=request,
request_data=None,
query_params=None,
headers=headers,
**kwargs
)

def QueryFeed(self, path, collection_id, query, options, partition_key_range_id=None):
Expand Down Expand Up @@ -2525,7 +2521,17 @@ def QueryFeed(self, path, collection_id, query, options, partition_key_range_id=
)

def __QueryFeed(
self, path, typ, id_, result_fn, create_fn, query, options=None, partition_key_range_id=None, response_hook=None
self,
path,
typ,
id_,
result_fn,
create_fn,
query,
options=None,
partition_key_range_id=None,
response_hook=None,
**kwargs
):
"""Query for more than one Azure Cosmos resources.

Expand Down Expand Up @@ -2564,9 +2570,9 @@ def __GetBodiesFromQueryResult(result):
# Copy to make sure that default_headers won't be changed.
if query is None:
# Query operations will use ReadEndpoint even though it uses GET(for feed requests)
request = _request_object.RequestObject(typ, documents._OperationType.ReadFeed)
request_options = _request_object.RequestObject(typ, documents._OperationType.ReadFeed)
headers = base.GetHeaders(self, initial_headers, "get", path, id_, typ, options, partition_key_range_id)
result, self.last_response_headers = self.__Get(path, request, headers)
result, self.last_response_headers = self.__Get(path, request_options, headers, **kwargs)
if response_hook:
response_hook(self.last_response_headers, result)
return __GetBodiesFromQueryResult(result)
Expand All @@ -2585,9 +2591,9 @@ def __GetBodiesFromQueryResult(result):
raise SystemError("Unexpected query compatibility mode.")

# Query operations will use ReadEndpoint even though it uses POST(for regular query operations)
request = _request_object.RequestObject(typ, documents._OperationType.SqlQuery)
request_options = _request_object.RequestObject(typ, documents._OperationType.SqlQuery)
headers = base.GetHeaders(self, initial_headers, "post", path, id_, typ, options, partition_key_range_id)
result, self.last_response_headers = self.__Post(path, request, query, headers)
result, self.last_response_headers = self.__Post(path, request_options, query, headers, **kwargs)

if response_hook:
response_hook(self.last_response_headers, result)
Expand Down
Loading