Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
remove warnings from tests
  • Loading branch information
rakshith91 committed Jul 18, 2019
commit 780d5c3819c777abbbfab75c0ecf25a49db6505b
402 changes: 402 additions & 0 deletions sdk/storage/azure-storage-queue/azure/storage/queue/aio/models.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@

import six

from ..queue_client import QueueClient as QueueClientBase
from azure.storage.queue._shared.shared_access_signature import QueueSharedAccessSignature
from azure.storage.queue._shared.base_client import StorageAccountHostsMixin, parse_connection_str, parse_query
from azure.storage.queue._shared.base_client_async import AsyncStorageAccountHostsMixin, parse_connection_str, parse_query
from azure.storage.queue._shared.request_handlers import add_metadata_headers, serialize_iso
from azure.storage.queue._shared.response_handlers import (
return_response_headers,
Expand All @@ -32,16 +33,16 @@
from azure.storage.queue._generated.models import StorageErrorException, SignedIdentifier
from azure.storage.queue._generated.models import QueueMessage as GenQueueMessage

from azure.storage.queue.models import QueueMessage, AccessPolicy, MessagesPaged
from azure.storage.queue.aio.models import QueueMessage, AccessPolicy, MessagesPaged

if TYPE_CHECKING:
from datetime import datetime
from azure.core.pipeline.policies import HTTPPolicy
from azure.storage.queue.models import QueuePermissions, QueueProperties
from azure.storage.queue.aio.models import QueuePermissions, QueueProperties


class QueueClient(StorageAccountHostsMixin):
"""A client to interact with a specific Queue.
class QueueClient(AsyncStorageAccountHostsMixin, QueueClientBase):
"""A async client to interact with a specific Queue.

:ivar str url:
The full endpoint URL to the Queue, including SAS token if used. This could be
Expand Down Expand Up @@ -83,154 +84,18 @@ def __init__(
self, queue_url, # type: str
queue=None, # type: Optional[Union[QueueProperties, str]]
credential=None, # type: Optional[Any]
loop=None, # type: Any
**kwargs # type: Any
):
# type: (...) -> None
try:
if not queue_url.lower().startswith('http'):
queue_url = "https://" + queue_url
except AttributeError:
raise ValueError("Queue URL must be a string.")
parsed_url = urlparse(queue_url.rstrip('/'))
if not parsed_url.path and not queue:
raise ValueError("Please specify a queue name.")
if not parsed_url.netloc:
raise ValueError("Invalid URL: {}".format(parsed_url))

path_queue = ""
if parsed_url.path:
path_queue = parsed_url.path.lstrip('/').partition('/')[0]
_, sas_token = parse_query(parsed_url.query)
if not sas_token and not credential:
raise ValueError("You need to provide either a SAS token or an account key to authenticate.")
try:
self.queue_name = queue.name # type: ignore
except AttributeError:
self.queue_name = queue or unquote(path_queue)
self._query_str, credential = self._format_query_string(sas_token, credential)
super(QueueClient, self).__init__(parsed_url, 'queue', credential, **kwargs)

self._config.message_encode_policy = kwargs.get('message_encode_policy') or TextXMLEncodePolicy()
self._config.message_decode_policy = kwargs.get('message_decode_policy') or TextXMLDecodePolicy()
self._client = AzureQueueStorage(self.url, pipeline=self._pipeline)

def _format_url(self, hostname):
"""Format the endpoint URL according to the current location
mode hostname.
"""
queue_name = self.queue_name
if isinstance(queue_name, six.text_type):
queue_name = queue_name.encode('UTF-8')
return "{}://{}/{}{}".format(
self.scheme,
hostname,
quote(queue_name),
self._query_str)

@classmethod
def from_connection_string(
cls, conn_str, # type: str
queue, # type: Union[str, QueueProperties]
credential=None, # type: Any
**kwargs # type: Any
):
# type: (...) -> None
"""Create QueueClient from a Connection String.

:param str conn_str:
A connection string to an Azure Storage account.
:param queue: The queue. This can either be the name of the queue,
or an instance of QueueProperties.
:type queue: str or ~azure.storage.queue.models.QueueProperties
:param credential:
The credentials with which to authenticate. This is optional if the
account URL already has a SAS token, or the connection string already has shared
access key values. The value can be a SAS token string, and account shared access
key, or an instance of a TokenCredentials class from azure.identity.

Example:
.. literalinclude:: ../tests/test_queue_samples_message.py
:start-after: [START create_queue_client_from_connection_string]
:end-before: [END create_queue_client_from_connection_string]
:language: python
:dedent: 8
:caption: Create the queue client from connection string.
"""
account_url, secondary, credential = parse_connection_str(
conn_str, credential, 'queue')
if 'secondary_hostname' not in kwargs:
kwargs['secondary_hostname'] = secondary
return cls(account_url, queue=queue, credential=credential, **kwargs) # type: ignore

def generate_shared_access_signature(
self, permission=None, # type: Optional[Union[QueuePermissions, str]]
expiry=None, # type: Optional[Union[datetime, str]]
start=None, # type: Optional[Union[datetime, str]]
policy_id=None, # type: Optional[str]
ip=None, # type: Optional[str]
protocol=None # type: Optional[str]
):
"""Generates a shared access signature for the queue.

Use the returned signature with the credential parameter of any Queue Service.

:param ~azure.storage.queue.models.QueuePermissions permission:
The permissions associated with the shared access signature. The
user is restricted to operations allowed by the permissions.
Required unless a policy_id is given referencing a stored access policy
which contains this field. This field must be omitted if it has been
specified in an associated stored access policy.
:param expiry:
The time at which the shared access signature becomes invalid.
Required unless a policy_id is given referencing a stored access policy
which contains this field. This field must be omitted if it has
been specified in an associated stored access policy. Azure will always
convert values to UTC. If a date is passed in without timezone info, it
is assumed to be UTC.
:type expiry: datetime or str
:param start:
The time at which the shared access signature becomes valid. If
omitted, start time for this call is assumed to be the time when the
storage service receives the request. Azure will always convert values
to UTC. If a date is passed in without timezone info, it is assumed to
be UTC.
:type start: datetime or str
:param str policy_id:
A unique value up to 64 characters in length that correlates to a
stored access policy. To create a stored access policy, use :func:`~set_queue_access_policy`.
:param str ip:
Specifies an IP address or a range of IP addresses from which to accept requests.
If the IP address from which the request originates does not match the IP address
or address range specified on the SAS token, the request is not authenticated.
For example, specifying sip='168.1.5.65' or sip='168.1.5.60-168.1.5.70' on the SAS
restricts the request to those IP addresses.
:param str protocol:
Specifies the protocol permitted for a request made. The default value
is https,http.
:return: A Shared Access Signature (sas) token.
:rtype: str

Example:
.. literalinclude:: ../tests/test_queue_samples_message.py
:start-after: [START queue_client_sas_token]
:end-before: [END queue_client_sas_token]
:language: python
:dedent: 12
:caption: Generate a sas token.
"""
if not hasattr(self.credential, 'account_key') and not self.credential.account_key:
raise ValueError("No account SAS key available.")
sas = QueueSharedAccessSignature(
self.credential.account_name, self.credential.account_key)
return sas.generate_queue(
self.queue_name,
permission=permission,
expiry=expiry,
start=start,
policy_id=policy_id,
ip=ip,
protocol=protocol,
)
super(QueueClient, self).__init__(
queue_url,
queue=queue,
credential=credential,
loop=loop,
**kwargs)
self._client = AzureQueueStorage(self.url, pipeline=self._pipeline, loop=loop)
self._loop = loop

async def create_queue(self, metadata=None, timeout=None, **kwargs):
# type: (Optional[Dict[str, Any]], Optional[int], Optional[Any]) -> None
Expand Down Expand Up @@ -261,12 +126,12 @@ async def create_queue(self, metadata=None, timeout=None, **kwargs):
headers = kwargs.pop('headers', {})
headers.update(add_metadata_headers(metadata)) # type: ignore
try:
return (await self._client.queue.create( # type: ignore
return await self._client.queue.create( # type: ignore
metadata=metadata,
timeout=timeout,
headers=headers,
cls=deserialize_queue_creation,
**kwargs))
**kwargs)
except StorageErrorException as error:
process_storage_error(error)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,32 @@
except ImportError:
from urlparse import urlparse # type: ignore

from ..queue_service_client import QueueServiceClient as QueueServiceClientBase
from azure.storage.queue._shared.shared_access_signature import SharedAccessSignature
from azure.storage.queue._shared.models import LocationMode, Services
from azure.storage.queue._shared.base_client import StorageAccountHostsMixin, parse_connection_str, parse_query
from azure.storage.queue._shared.base_client_async import AsyncStorageAccountHostsMixin, parse_connection_str, parse_query
from azure.storage.queue._shared.request_handlers import add_metadata_headers, serialize_iso
from azure.storage.queue._shared.response_handlers import process_storage_error
from azure.storage.queue._generated import AzureQueueStorage
from azure.storage.queue._generated.models import StorageServiceProperties, StorageErrorException

from azure.storage.queue.models import QueuePropertiesPaged
from azure.storage.queue.aio.models import QueuePropertiesPaged
from .queue_client_async import QueueClient

if TYPE_CHECKING:
from datetime import datetime
from azure.core import Configuration
from azure.core.pipeline.policies import HTTPPolicy
from azure.storage.queue._shared.models import AccountPermissions, ResourceTypes
from azure.storage.queue.models import (
from azure.storage.queue.aio.models import (
QueueProperties,
Logging,
Metrics,
CorsRule
)


class QueueServiceClient(StorageAccountHostsMixin):
class QueueServiceClient(AsyncStorageAccountHostsMixin, QueueServiceClientBase):
"""A client to interact with the Queue Service at the account level.

This client provides operations to retrieve and configure the account properties
Expand Down Expand Up @@ -85,111 +86,17 @@ class QueueServiceClient(StorageAccountHostsMixin):
def __init__(
self, account_url, # type: str
credential=None, # type: Optional[Any]
loop=None, # type: Any
**kwargs # type: Any
):
# type: (...) -> None
try:
if not account_url.lower().startswith('http'):
account_url = "https://" + account_url
except AttributeError:
raise ValueError("Account URL must be a string.")
parsed_url = urlparse(account_url.rstrip('/'))
if not parsed_url.netloc:
raise ValueError("Invalid URL: {}".format(account_url))

_, sas_token = parse_query(parsed_url.query)
if not sas_token and not credential:
raise ValueError("You need to provide either a SAS token or an account key to authenticate.")
self._query_str, credential = self._format_query_string(sas_token, credential)
super(QueueServiceClient, self).__init__(parsed_url, 'queue', credential, **kwargs)
self._client = AzureQueueStorage(self.url, pipeline=self._pipeline)

def _format_url(self, hostname):
"""Format the endpoint URL according to the current location
mode hostname.
"""
return "{}://{}/{}".format(self.scheme, hostname, self._query_str)

@classmethod
def from_connection_string(
cls, conn_str, # type: str
credential=None, # type: Optional[Any]
**kwargs # type: Any
):
"""Create QueueServiceClient from a Connection String.

:param str conn_str:
A connection string to an Azure Storage account.
:param credential:
The credentials with which to authenticate. This is optional if the
account URL already has a SAS token, or the connection string already has shared
access key values. The value can be a SAS token string, and account shared access
key, or an instance of a TokenCredentials class from azure.identity.

Example:
.. literalinclude:: ../tests/test_queue_samples_authentication.py
:start-after: [START auth_from_connection_string]
:end-before: [END auth_from_connection_string]
:language: python
:dedent: 8
:caption: Creating the QueueServiceClient with a connection string.
"""
account_url, secondary, credential = parse_connection_str(
conn_str, credential, 'queue')
if 'secondary_hostname' not in kwargs:
kwargs['secondary_hostname'] = secondary
return cls(account_url, credential=credential, **kwargs)

def generate_shared_access_signature(
self, resource_types, # type: Union[ResourceTypes, str]
permission, # type: Union[AccountPermissions, str]
expiry, # type: Optional[Union[datetime, str]]
start=None, # type: Optional[Union[datetime, str]]
ip=None, # type: Optional[str]
protocol=None # type: Optional[str]
):
"""Generates a shared access signature for the queue service.

Use the returned signature with the credential parameter of any Queue Service.

:param ~azure.storage.queue._shared.models.ResourceTypes resource_types:
Specifies the resource types that are accessible with the account SAS.
:param ~azure.storage.queue._shared.models.AccountPermissions permission:
The permissions associated with the shared access signature. The
user is restricted to operations allowed by the permissions.
:param expiry:
The time at which the shared access signature becomes invalid.
Required unless an id is given referencing a stored access policy
which contains this field. This field must be omitted if it has
been specified in an associated stored access policy. Azure will always
convert values to UTC. If a date is passed in without timezone info, it
is assumed to be UTC.
:type expiry: datetime or str
:param start:
The time at which the shared access signature becomes valid. If
omitted, start time for this call is assumed to be the time when the
storage service receives the request. Azure will always convert values
to UTC. If a date is passed in without timezone info, it is assumed to
be UTC.
:type start: datetime or str
:param str ip:
Specifies an IP address or a range of IP addresses from which to accept requests.
If the IP address from which the request originates does not match the IP address
or address range specified on the SAS token, the request is not authenticated.
For example, specifying sip=168.1.5.65 or sip=168.1.5.60-168.1.5.70 on the SAS
restricts the request to those IP addresses.
:param str protocol:
Specifies the protocol permitted for a request made. The default value
is https,http.
:return: A Shared Access Signature (sas) token.
:rtype: str
"""
if not hasattr(self.credential, 'account_key') and not self.credential.account_key:
raise ValueError("No account SAS key available.")

sas = SharedAccessSignature(self.credential.account_name, self.credential.account_key)
return sas.generate_account(
Services.QUEUE, resource_types, permission, expiry, start=start, ip=ip, protocol=protocol) # type: ignore
super(QueueServiceClient, self).__init__(
account_url,
credential=credential,
loop=loop,
**kwargs)
self._client = AzureQueueStorage(self.url, pipeline=self._pipeline, loop=loop)
self._loop = loop

async def get_service_stats(self, timeout=None, **kwargs): # type: ignore
# type: (Optional[int], Optional[Any]) -> Dict[str, Any]
Expand Down Expand Up @@ -443,4 +350,4 @@ def get_queue_client(self, queue, **kwargs):
self.url, queue=queue, credential=self.credential, key_resolver_function=self.key_resolver_function,
require_encryption=self.require_encryption, key_encryption_key=self.key_encryption_key,
_pipeline=self._pipeline, _configuration=self._config, _location_mode=self._location_mode,
_hosts=self._hosts, **kwargs)
_hosts=self._hosts, loop=self._loop, **kwargs)
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
# - Playback: run against stored recordings
# - Record: run tests against live storage and update recordings
# - RunLiveNoRecord: run tests against live storage without altering recordings
TEST_MODE = 'Playback'
TEST_MODE = 'RunLiveNoRecord'

# Set to true to enable logging for the tests
# logging is not enabled by default because it pollutes the CI logs
Expand Down
Loading