Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 118 additions & 35 deletions sdk/core/azure-core/azure/core/async_paging.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,44 +23,127 @@
# IN THE SOFTWARE.
#
# --------------------------------------------------------------------------
from collections.abc import AsyncIterator
import collections.abc
import logging
from typing import (
Iterable,
AsyncIterator,
TypeVar,
Callable,
Tuple,
Optional,
Awaitable,
)


_LOGGER = logging.getLogger(__name__)

class AsyncPagedMixin(AsyncIterator):
"""Bring async to Paging.

**Keyword argument:**

*async_command* - Mandatory keyword argument for this mixin to work.
"""
def __init__(self, *args, **kwargs): # pylint: disable=unused-argument
self._async_get_next = kwargs.get("async_command")
if not self._async_get_next:
_LOGGER.debug("Paging async iterator protocol is not available for %s",
self.__class__.__name__)

async def _async_advance_page(self):
if not self._async_get_next:
raise NotImplementedError(
"The class {} does not support async paging.".format(self.__class__.__name__)
)
if self.next_link is None:
raise StopAsyncIteration("End of paging")
self._current_page_iter_index = 0
self._response = await self._async_get_next(self.next_link)
self._deserializer(self, self._response)
return self.current_page
ReturnType = TypeVar("ReturnType")
ResponseType = TypeVar("ResponseType")


class AsyncList(AsyncIterator[ReturnType]):
def __init__(self, iterable: Iterable[ReturnType]) -> None:
"""Change an iterable into a fake async iterator.

Coul be useful to fill the async iterator contract when you get a list.

:param iterable: A sync iterable of T
"""
# Technically, if it's a real iterator, I don't need "iter"
# but that will cover iterable and list as well with no troubles created.
self._iterator = iter(iterable)

async def __anext__(self) -> ReturnType:
try:
return next(self._iterator)
except StopIteration as err:
raise StopAsyncIteration() from err


class AsyncPageIterator(AsyncIterator[AsyncIterator[ReturnType]]):
def __init__(
self,
get_next: Callable[[Optional[str]], Awaitable[ResponseType]],
extract_data: Callable[
[ResponseType], Awaitable[Tuple[str, AsyncIterator[ReturnType]]]
],
continuation_token: Optional[str] = None,
) -> None:
"""Return an async iterator of pages.

:param get_next: Callable that take the continuation token and return a HTTP response
:param extract_data: Callable that take an HTTP response and return a tuple continuation token,
list of ReturnType
:param str continuation_token: The continuation token needed by get_next
"""
self._get_next = get_next
self._extract_data = extract_data
self.continuation_token = continuation_token
self._did_a_call_already = False
self._response = None
self._current_page = None

async def __anext__(self):
"""Iterate through responses."""
# Storing the list iterator might work out better, but there's no
# guarantee that some code won't replace the list entirely with a copy,
# invalidating an list iterator that might be saved between iterations.
if self.current_page and self._current_page_iter_index < len(self.current_page):
response = self.current_page[self._current_page_iter_index]
self._current_page_iter_index += 1
return response
await self._async_advance_page()
return await self.__anext__()
if self.continuation_token is None and self._did_a_call_already:
raise StopAsyncIteration("End of paging")

self._response = await self._get_next(self.continuation_token)
self._did_a_call_already = True

self.continuation_token, self._current_page = await self._extract_data(
self._response
)

# If current_page was a sync list, wrap it async-like
if isinstance(self._current_page, collections.abc.Iterable):
self._current_page = AsyncList(self._current_page)

return self._current_page


class AsyncItemPaged(AsyncIterator[ReturnType]):
def __init__(self, *args, **kwargs) -> None:
"""Return an async iterator of items.

args and kwargs will be passed to the AsyncPageIterator constructor directly,
except page_iterator_class
"""
self._args = args
self._kwargs = kwargs
self._page_iterator = (
None
) # type: Optional[AsyncIterator[AsyncIterator[ReturnType]]]
self._page = None # type: Optional[AsyncIterator[ReturnType]]
self._page_iterator_class = self._kwargs.pop(
"page_iterator_class", AsyncPageIterator
)

def by_page(
self, continuation_token: Optional[str] = None
) -> AsyncIterator[AsyncIterator[ReturnType]]:
"""Get an async iterator of pages of objects, instead of an async iterator of objects.

:param str continuation_token:
An opaque continuation token. This value can be retrieved from the
continuation_token field of a previous generator object. If specified,
this generator will begin returning results from this point.
:returns: An async iterator of pages (themselves async iterator of objects)
"""
return self._page_iterator_class(
*self._args, **self._kwargs, continuation_token=continuation_token
)

async def __anext__(self) -> ReturnType:
if self._page_iterator is None:
self._page_iterator = self.by_page()
return await self.__anext__()
if self._page is None:
# Let it raise StopAsyncIteration
self._page = await self._page_iterator.__anext__()
return await self.__anext__()
try:
return await self._page.__anext__()
except StopAsyncIteration:
self._page = None
return await self.__anext__()
153 changes: 83 additions & 70 deletions sdk/core/azure-core/azure/core/paging.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,86 +23,99 @@
# IN THE SOFTWARE.
#
# --------------------------------------------------------------------------
import sys
try:
from collections.abc import Iterator
xrange = range
except ImportError:
from collections import Iterator

from typing import Dict, Any, List, Callable, Optional, TYPE_CHECKING # pylint: disable=unused-import

if TYPE_CHECKING:
from .pipeline.transport.base import HttpResponse
from msrest.serialization import Deserializer, Model # type: ignore # pylint: disable=unused-import

if sys.version_info >= (3, 5, 2):
# Not executed on old Python, no syntax error
from .async_paging import AsyncPagedMixin # type: ignore
else:
class AsyncPagedMixin(object): # type: ignore
pass

class Paged(AsyncPagedMixin, Iterator):
"""A container for paged REST responses.

:param response: server response object.
:type response: ~azure.core.pipeline.transport.HttpResponse
:param callable command: Function to retrieve the next page of items.
:param Deserializer deserializer: a Deserializer instance to use
"""
_validation = {} # type: Dict[str, Dict[str, Any]]
_attribute_map = {} # type: Dict[str, Dict[str, Any]]

def __init__(self, command, deserializer, **kwargs):
# type: (Callable[[str], HttpResponse], Deserializer, Any) -> None
super(Paged, self).__init__(**kwargs) # type: ignore
# Sets next_link, current_page, and _current_page_iter_index.
self.next_link = ""
self.current_page = [] # type: List[Model]
self._current_page_iter_index = 0
self._deserializer = deserializer
self._get_next = command
self._response = None # type: Optional[HttpResponse]
import itertools
from typing import (
Callable,
Optional,
TypeVar,
Iterator,
Iterable,
Tuple,
) # pylint: disable=unused-import
import logging


_LOGGER = logging.getLogger(__name__)

ReturnType = TypeVar("ReturnType")
ResponseType = TypeVar("ResponseType")


class PageIterator(Iterator[Iterator[ReturnType]]):
def __init__(
self,
get_next, # type: Callable[[Optional[str]], ResponseType]
extract_data, # type: Callable[[ResponseType], Tuple[str, Iterable[ReturnType]]]
continuation_token=None, # type: Optional[str]
):
"""Return an iterator of pages.

:param get_next: Callable that take the continuation token and return a HTTP response
:param extract_data: Callable that take an HTTP response and return a tuple continuation token,
list of ReturnType
:param str continuation_token: The continuation token needed by get_next
"""
self._get_next = get_next
self._extract_data = extract_data
self.continuation_token = continuation_token
self._did_a_call_already = False
self._response = None # type: Optional[ResponseType]
self._current_page = None # type: Optional[Iterable[ReturnType]]

def __iter__(self):
"""Return 'self'."""
# Since iteration mutates this object, consider it an iterator in-and-of
# itself.
return self

@classmethod
def _get_subtype_map(cls):
"""Required for parity to Model object for deserialization."""
return {}
def __next__(self):
# type: () -> Iterator[ReturnType]
if self.continuation_token is None and self._did_a_call_already:
raise StopIteration("End of paging")

self._response = self._get_next(self.continuation_token)
self._did_a_call_already = True

self.continuation_token, self._current_page = self._extract_data(self._response)

def _advance_page(self):
# type: () -> List[Model]
"""Force moving the cursor to the next azure call.
return iter(self._current_page)

This method is for advanced usage, iterator protocol is prefered.
next = __next__ # Python 2 compatibility.


class ItemPaged(Iterator[ReturnType]):
def __init__(self, *args, **kwargs):
"""Return an iterator of items.

:raises: StopIteration if no further page
:return: The current page list
:rtype: list
args and kwargs will be passed to the PageIterator constructor directly,
except page_iterator_class
"""
if self.next_link is None:
raise StopIteration("End of paging")
self._current_page_iter_index = 0
self._response = self._get_next(self.next_link)
self._deserializer(self, self._response)
return self.current_page
self._args = args
self._kwargs = kwargs
self._page_iterator = None
self._page_iterator_class = self._kwargs.pop(
"page_iterator_class", PageIterator
)

def by_page(self, continuation_token=None):
# type: (Optional[str]) -> Iterator[Iterator[ReturnType]]
"""Get an iterator of pages of objects, instead of an iterator of objects.

:param str continuation_token:
An opaque continuation token. This value can be retrieved from the
continuation_token field of a previous generator object. If specified,
this generator will begin returning results from this point.
:returns: An iterator of pages (themselves iterator of objects)
"""
return self._page_iterator_class(
continuation_token=continuation_token, *self._args, **self._kwargs
)

def __iter__(self):
"""Return 'self'."""
return self

def __next__(self):
"""Iterate through responses."""
# Storing the list iterator might work out better, but there's no
# guarantee that some code won't replace the list entirely with a copy,
# invalidating an list iterator that might be saved between iterations.
if self.current_page and self._current_page_iter_index < len(self.current_page):
response = self.current_page[self._current_page_iter_index]
self._current_page_iter_index += 1
return response
self._advance_page()
return self.__next__()
if self._page_iterator is None:
self._page_iterator = itertools.chain.from_iterable(self.by_page())
return next(self._page_iterator)

next = __next__ # Python 2 compatibility.
Loading