Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Move Storage Queue to new Paging
  • Loading branch information
lmazuel committed Jul 24, 2019
commit 32e49418e225a95420c876fac0d20f8e0a6a9315
72 changes: 35 additions & 37 deletions sdk/storage/azure-storage-queue/azure/storage/queue/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# pylint: disable=super-init-not-called

from typing import List # pylint: disable=unused-import
from azure.core.paging import Paged
from azure.core.paging import PageIterator
from ._shared.utils import (
return_context_and_deserialized,
process_storage_error)
Expand Down Expand Up @@ -220,7 +220,7 @@ def _from_generated(cls, generated):
return message


class MessagesPaged(Paged):
class MessagesPaged(PageIterator):
"""An iterable of Queue Messages.

:ivar int results_per_page: The maximum number of results retrieved per API call.
Expand All @@ -231,28 +231,27 @@ class MessagesPaged(Paged):
:param int results_per_page: The maximum number of messages to retrieve per
call.
"""
def __init__(self, command, results_per_page=None):
super(MessagesPaged, self).__init__(command, None)
def __init__(self, command, results_per_page=None, continuation_token=None):
if continuation_token is not None:
raise ValueError("This operation does not support continuation token")

super(MessagesPaged, self).__init__(
self._get_next_cb,
self._extract_data_cb,
)
self._command = command
self.results_per_page = results_per_page

def _advance_page(self):
"""Force moving the cursor to the next azure call.

This method is for advanced usage, iterator protocol is prefered.

:raises: StopIteration if no further page
:return: The current page list
:rtype: list
"""
self._current_page_iter_index = 0
def _get_next_cb(self, continuation_token):
try:
messages = self._get_next(number_of_messages=self.results_per_page)
if not messages:
raise StopIteration()
return self._command(number_of_messages=self.results_per_page)
except StorageErrorException as error:
process_storage_error(error)
self.current_page = [QueueMessage._from_generated(q) for q in messages] # pylint: disable=protected-access
return self.current_page

def _extract_data_cb(self, messages):
if not messages:
return None, None
return "TOKEN_IGNORED", iter([QueueMessage._from_generated(q) for q in messages]) # pylint: disable=protected-access


class QueueProperties(DictMixin):
Expand Down Expand Up @@ -280,7 +279,7 @@ def _from_generated(cls, generated):
return props


class QueuePropertiesPaged(Paged):
class QueuePropertiesPaged(PageIterator):
"""An iterable of Queue properties.

:ivar str service_endpoint: The service URL.
Expand All @@ -300,42 +299,41 @@ class QueuePropertiesPaged(Paged):
call.
:param str marker: An opaque continuation token.
"""
def __init__(self, command, prefix=None, results_per_page=None, marker=None):
super(QueuePropertiesPaged, self).__init__(command, None)
def __init__(self, command, prefix=None, results_per_page=None, marker=None, continuation_token=None):
if continuation_token is not None:
raise ValueError("This operation does not support continuation token")

super(QueuePropertiesPaged, self).__init__(
self._get_next_cb,
self._extract_data_cb,
)
self._command = command
self.service_endpoint = None
self.prefix = prefix
self.current_marker = None
self.results_per_page = results_per_page
self.next_marker = marker or ""
self.location_mode = None

def _advance_page(self):
"""Force moving the cursor to the next azure call.

This method is for advanced usage, iterator protocol is prefered.

:raises: StopIteration if no further page
:return: The current page list
:rtype: list
"""
if self.next_marker is None:
raise StopIteration("End of paging")
self._current_page_iter_index = 0
def _get_next_cb(self, continuation_token):
try:
self.location_mode, self._response = self._get_next(
return self._get_next(
marker=self.next_marker or None,
maxresults=self.results_per_page,
cls=return_context_and_deserialized,
use_location=self.location_mode)
except StorageErrorException as error:
process_storage_error(error)

def _extract_data_cb(self, get_next_return):
self.location_mode, self._response = get_next_return
self.service_endpoint = self._response.service_endpoint
self.prefix = self._response.prefix
self.current_marker = self._response.marker
self.results_per_page = self._response.max_results
self.current_page = [QueueProperties._from_generated(q) for q in self._response.queue_items] # pylint: disable=protected-access
self.next_marker = self._response.next_marker or None
return self.current_page

return self.next_marker, iter([QueueProperties._from_generated(q) for q in self._response.queue_items]) # pylint: disable=protected-access


class QueuePermissions(object):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

import six

from azure.core.paging import ItemPaged

from ._shared.shared_access_signature import QueueSharedAccessSignature
from ._shared.utils import (
StorageAccountHostsMixin,
Expand Down Expand Up @@ -562,7 +564,7 @@ def receive_messages(self, messages_per_page=None, visibility_timeout=None, time
cls=self._config.message_decode_policy,
**kwargs
)
return MessagesPaged(command, results_per_page=messages_per_page)
return ItemPaged(command, results_per_page=messages_per_page, page_iterator_class=MessagesPaged)
except StorageErrorException as error:
process_storage_error(error)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
except ImportError:
from urlparse import urlparse # type: ignore

from azure.core.paging import ItemPaged

from ._shared.shared_access_signature import SharedAccessSignature
from ._shared.models import LocationMode, Services
from ._shared.utils import (
Expand Down Expand Up @@ -347,8 +349,10 @@ def list_queues(
include=include,
timeout=timeout,
**kwargs)
return QueuePropertiesPaged(
command, prefix=name_starts_with, results_per_page=results_per_page, marker=marker)
return ItemPaged(
command, prefix=name_starts_with, results_per_page=results_per_page, marker=marker,
page_iterator_class=QueuePropertiesPaged
)

def create_queue(
self, name, # type: str
Expand All @@ -357,7 +361,7 @@ def create_queue(
**kwargs
):
# type: (...) -> QueueClient
"""Creates a new queue under the specified account.
"""Creates a new queue under the specified account.

If a queue with the same name already exists, the operation fails.
Returns a client with which to interact with the newly created queue.
Expand Down