From d751f63c8a5ad507c8d83d3ad98738cf70541cfc Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Thu, 14 Aug 2025 06:40:38 +0000 Subject: [PATCH 01/28] feat: support for async bidi streaming apis --- google/api_core/bidi_async.py | 276 +++++++++++++++++++++++++++++++ noxfile.py | 55 +++--- tests/asyncio/test_bidi_async.py | 276 +++++++++++++++++++++++++++++++ 3 files changed, 579 insertions(+), 28 deletions(-) create mode 100644 google/api_core/bidi_async.py create mode 100644 tests/asyncio/test_bidi_async.py diff --git a/google/api_core/bidi_async.py b/google/api_core/bidi_async.py new file mode 100644 index 00000000..786cfe9a --- /dev/null +++ b/google/api_core/bidi_async.py @@ -0,0 +1,276 @@ +# Copyright 2020, Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Asynchronous bi-directional streaming RPC helpers.""" + +import asyncio +import logging + +from google.api_core import exceptions + +_LOGGER = logging.getLogger(__name__) + + +class _AsyncRequestQueueGenerator: + """An async helper for sending requests to a gRPC stream from a Queue. + + This generator takes requests off a given queue and yields them to gRPC. + + This helper is useful when you have an indeterminate, indefinite, or + otherwise open-ended set of requests to send through a request-streaming + (or bidirectional) RPC. + + The reason this is necessary is because gRPC takes an async iterator as the + request for request-streaming RPCs. gRPC consumes this iterator to allow + it to block while generating requests for the stream. However, if the + generator blocks indefinitely gRPC will not be able to clean up the task + as it'll be blocked on `anext(iterator)` and not be able to check the + channel status to stop iterating. This helper mitigates that by waiting + on the queue with a timeout and checking the RPC state before yielding. + + Finally, it allows for retrying without swapping queues because if it does + pull an item off the queue when the RPC is inactive, it'll immediately put + it back and then exit. This is necessary because yielding the item in this + case will cause gRPC to discard it. In practice, this means that the order +of messages is not guaranteed. If such a thing is necessary it would be + easy to use a priority queue. + + Example:: + + requests = _AsyncRequestQueueGenerator(q) + call = await stub.StreamingRequest(requests) + requests.call = call + + async for response in call: + print(response) + await q.put(...) + + Args: + queue (asyncio.Queue): The request queue. + period (float): The number of seconds to wait for items from the queue + before checking if the RPC is cancelled. In practice, this + determines the maximum amount of time the request consumption + task will live after the RPC is cancelled. + initial_request (Union[protobuf.Message, + Callable[[], protobuf.Message]]): The initial request to + yield. This is done independently of the request queue to allow for + easily restarting streams that require some initial configuration + request. + """ + + def __init__(self, queue: asyncio.Queue, period: float = 1, initial_request=None): + self._queue = queue + self._period = period + self._initial_request = initial_request + self.call = None + + def _is_active(self): + # Note: there is a possibility that this starts *before* the call + # property is set. So we have to check if self.call is set before + # seeing if it's active. We need to return True if self.call is None. + # See https://github.com/googleapis/python-api-core/issues/560. + return self.call is None or not self.call.done() + + async def __aiter__(self): + if self._initial_request is not None: + if callable(self._initial_request): + yield self._initial_request() + else: + yield self._initial_request + + while True: + try: + item = self._queue.get_nowait() + except asyncio.QueueEmpty: + if not self._is_active(): + _LOGGER.debug( + "Empty queue and inactive call, exiting request generator." + ) + return + else: + # call is still active, keep waiting for queue items. + await asyncio.sleep(self._period) + continue + + # The consumer explicitly sent "None", indicating that the request + # should end. + if item is None: + _LOGGER.debug("Cleanly exiting request generator.") + return + + if not self._is_active(): + # We have an item, but the call is closed. We should put the + # item back on the queue so that the next call can consume it. + await self._queue.put(item) + _LOGGER.debug( + "Inactive call, replacing item on queue and exiting " + "request generator." + ) + return + + yield item + + + +class AsyncBidiRpc: + """A helper for consuming a async bi-directional streaming RPC. + + This maps gRPC's built-in interface which uses a request iterator and a + response iterator into a socket-like :func:`send` and :func:`recv`. This + is a more useful pattern for long-running or asymmetric streams (streams + where there is not a direct correlation between the requests and + responses). + + Example:: + + initial_request = example_pb2.StreamingRpcRequest( + setting='example') + rpc = AsyncBidiRpc( + stub.StreamingRpc, + initial_request=initial_request, + metadata=[('name', 'value')] + ) + + await rpc.open() + + while rpc.is_active: + print(await rpc.recv()) + await rpc.send(example_pb2.StreamingRpcRequest( + data='example')) + + This does *not* retry the stream on errors. See :class:`AsyncResumableBidiRpc`. + + Args: + start_rpc (grpc.aio.StreamStreamMultiCallable): The gRPC method used to + start the RPC. + initial_request (Union[protobuf.Message, + Callable[[], protobuf.Message]]): The initial request to + yield. This is useful if an initial request is needed to start the + stream. + metadata (Sequence[Tuple(str, str)]): RPC metadata to include in + the request. + """ + + def __init__(self, start_rpc, initial_request=None, metadata=None): + self._start_rpc = start_rpc + self._initial_request = initial_request + self._rpc_metadata = metadata + self._request_queue = asyncio.Queue() + self._request_generator = None + self._callbacks = [] + self.call = None + self._loop = asyncio.get_event_loop() + + def add_done_callback(self, callback): + """Adds a callback that will be called when the RPC terminates. + + This occurs when the RPC errors or is successfully terminated. + + Args: + callback (Callable[[grpc.Future], None]): The callback to execute. + It will be provided with the same gRPC future as the underlying + stream which will also be a :class:`grpc.aio.Call`. + """ + self._callbacks.append(callback) + + def _on_call_done(self, future): + # This occurs when the RPC errors or is successfully terminated. + # Note that grpc's "future" here can also be a grpc.RpcError. + # See note in https://github.com/grpc/grpc/issues/10885#issuecomment-302651331 + # that `grpc.RpcError` is also `grpc.aio.Call`. + for callback in self._callbacks: + callback(future) + + async def open(self): + """Opens the stream.""" + if self.is_active: + raise ValueError("Can not open an already open stream.") + + request_generator = _AsyncRequestQueueGenerator( + self._request_queue, initial_request=self._initial_request + ) + try: + call = await self._start_rpc(request_generator, metadata=self._rpc_metadata) + except exceptions.GoogleAPICallError as exc: + # The original `grpc.RpcError` (which is usually also a `grpc.Call`) is + # available from the ``response`` property on the mapped exception. + self._on_call_done(exc.response) + raise + + request_generator.call = call + + # TODO: api_core should expose the future interface for wrapped + # callables as well. + if hasattr(call, "_wrapped"): # pragma: NO COVER + call._wrapped.add_done_callback(self._on_call_done) + else: + call.add_done_callback(self._on_call_done) + + self._request_generator = request_generator + self.call = call + + async def close(self): + """Closes the stream.""" + if self.call is None: + return + + await self._request_queue.put(None) + self.call.cancel() + self._request_generator = None + self._initial_request = None + self._callbacks = [] + # Don't set self.call to None. Keep it around so that send/recv can + # raise the error. + + async def send(self, request): + """Queue a message to be sent on the stream. + + If the underlying RPC has been closed, this will raise. + + Args: + request (protobuf.Message): The request to send. + """ + if self.call is None: + raise ValueError("Can not send() on an RPC that has never been open()ed.") + + # Don't use self.is_active(), as ResumableBidiRpc will overload it + # to mean something semantically different. + if not self.call.done(): + await self._request_queue.put(request) + else: + # calling read should cause the call to raise. + await self.call.read() + + async def recv(self): + """Wait for a message to be returned from the stream. + + If the underlying RPC has been closed, this will raise. + + Returns: + protobuf.Message: The received message. + """ + if self.call is None: + raise ValueError("Can not recv() on an RPC that has never been open()ed.") + + return await self.call.read() + + @property + def is_active(self): + """bool: True if this stream is currently open and active.""" + return self.call is not None and not self.call.done() + + @property + def pending_requests(self): + """int: Returns an estimate of the number of queued requests.""" + return self._request_queue.qsize() diff --git a/noxfile.py b/noxfile.py index ac21330e..a96f1e91 100644 --- a/noxfile.py +++ b/noxfile.py @@ -180,37 +180,36 @@ def default(session, install_grpc=True, prerelease=False, install_async_rest=Fal session.run("python", "-c", "import grpc; print(grpc.__version__)") session.run("python", "-c", "import google.auth; print(google.auth.__version__)") - pytest_args = [ - "python", - "-m", - "pytest", - *( - # Helpful for running a single test or testfile. - session.posargs - or [ - "--quiet", - "--cov=google.api_core", - "--cov=tests.unit", - "--cov-append", - "--cov-config=.coveragerc", - "--cov-report=", - "--cov-fail-under=0", - # Running individual tests with parallelism enabled is usually not helpful. - "-n=auto", - os.path.join("tests", "unit"), - ] - ), - ] + pytest_command = ["python", "-m", "pytest"] - session.install("asyncmock", "pytest-asyncio") + if session.posargs: + pytest_options = list(session.posargs) + else: + pytest_options = [ + "--quiet", + "--cov=google.api_core", + "--cov=tests.unit", + "--cov-append", + "--cov-config=.coveragerc", + "--cov-report=", + "--cov-fail-under=0", + "-n=auto", + os.path.join("tests", "unit"), + "--cov=tests.asyncio", + os.path.join("tests", "asyncio"), + ] - # Having positional arguments means the user wants to run specific tests. - # Best not to add additional tests to that list. - if not session.posargs: - pytest_args.append("--cov=tests.asyncio") - pytest_args.append(os.path.join("tests", "asyncio")) + # Temporarily disable coverage for Python 3.12 due to a missing + # _sqlite3 module in some environments. This is a workaround. The proper + # fix is to reinstall Python 3.12 with SQLite support. + # We only do this when not running specific tests (no posargs). + if session.python == "3.12" and not session.posargs: + pytest_options = [opt for opt in pytest_options if not opt.startswith("--cov")] + pytest_command.extend(["-p", "no:cov"]) + + session.install("asyncmock", "pytest-asyncio") - session.run(*pytest_args) + session.run(*pytest_command, *pytest_options) @nox.session(python=PYTHON_VERSIONS) diff --git a/tests/asyncio/test_bidi_async.py b/tests/asyncio/test_bidi_async.py new file mode 100644 index 00000000..3da7a1ee --- /dev/null +++ b/tests/asyncio/test_bidi_async.py @@ -0,0 +1,276 @@ +# Copyright 2020, Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio + +try: + from unittest import mock +except ImportError: # pragma: NO COVER + import mock # type: ignore + +import pytest + +try: + import grpc + from grpc import aio +except ImportError: # pragma: NO COVER + pytest.skip("No GRPC", allow_module_level=True) + +from google.api_core import bidi_async +from google.api_core import exceptions + + +async def consume_async_generator(gen): + items = [] + async for item in gen: + items.append(item) + return items + + +@pytest.mark.asyncio +class Test_AsyncRequestQueueGenerator: + async def test_bounded_consume(self): + call = mock.create_autospec(aio.Call, instance=True) + call.done.return_value = False + + q = asyncio.Queue() + await q.put(mock.sentinel.A) + await q.put(mock.sentinel.B) + + generator = bidi_async._AsyncRequestQueueGenerator(q, period=0.01) + generator.call = call + + items = [] + gen_aiter = generator.__aiter__() + + items.append(await gen_aiter.__anext__()) + items.append(await gen_aiter.__anext__()) + + # At this point, the queue is empty. The next call to anext will sleep. + # We make the call inactive. + call.done.return_value = True + + with pytest.raises(StopAsyncIteration): + await gen_aiter.__anext__() + + assert items == [mock.sentinel.A, mock.sentinel.B] + + async def test_yield_initial_and_exit(self): + q = asyncio.Queue() + call = mock.create_autospec(aio.Call, instance=True) + call.done.return_value = True + + generator = bidi_async._AsyncRequestQueueGenerator( + q, initial_request=mock.sentinel.A + ) + generator.call = call + + items = await consume_async_generator(generator) + + assert items == [mock.sentinel.A] + + async def test_yield_initial_callable_and_exit(self): + q = asyncio.Queue() + call = mock.create_autospec(aio.Call, instance=True) + call.done.return_value = True + + generator = bidi_async._AsyncRequestQueueGenerator( + q, initial_request=lambda: mock.sentinel.A + ) + generator.call = call + + items = await consume_async_generator(generator) + + assert items == [mock.sentinel.A] + + async def test_exit_when_inactive_with_item(self): + q = asyncio.Queue() + await q.put(mock.sentinel.A) + + call = mock.create_autospec(aio.Call, instance=True) + call.done.return_value = True + + generator = bidi_async._AsyncRequestQueueGenerator(q) + generator.call = call + + items = await consume_async_generator(generator) + + assert items == [] + # Make sure it put the item back. + assert not q.empty() + assert await q.get() == mock.sentinel.A + + async def test_exit_when_inactive_empty(self): + q = asyncio.Queue() + call = mock.create_autospec(aio.Call, instance=True) + call.done.return_value = True + + generator = bidi_async._AsyncRequestQueueGenerator(q) + generator.call = call + + items = await consume_async_generator(generator) + + assert items == [] + + async def test_exit_with_stop(self): + q = asyncio.Queue() + await q.put(None) + call = mock.create_autospec(aio.Call, instance=True) + call.done.return_value = False + + generator = bidi_async._AsyncRequestQueueGenerator(q) + generator.call = call + + items = await consume_async_generator(generator) + + assert items == [] + + +def make_async_rpc(): + """Makes a mock async RPC used to test Bidi classes.""" + call = mock.create_autospec(aio.StreamStreamCall, instance=True) + rpc = mock.AsyncMock(spec=aio.StreamStreamMultiCallable) + + async def rpc_side_effect(request, metadata=None): + call.done.return_value = False + return call + + rpc.side_effect = rpc_side_effect + + def cancel_side_effect(): + call.done.return_value = True + return True + + call.cancel.side_effect = cancel_side_effect + call.read = mock.AsyncMock() + + return rpc, call + + +class AsyncClosedCall: + def __init__(self, exception): + self.exception = exception + + def done(self): + return True + + async def read(self): + raise self.exception + + +@pytest.mark.asyncio +class TestAsyncBidiRpc: + def test_initial_state(self): + bidi_rpc = bidi_async.AsyncBidiRpc(None) + assert bidi_rpc.is_active is False + + def test_done_callbacks(self): + bidi_rpc = bidi_async.AsyncBidiRpc(None) + callback = mock.Mock(spec=["__call__"]) + + bidi_rpc.add_done_callback(callback) + bidi_rpc._on_call_done(mock.sentinel.future) + + callback.assert_called_once_with(mock.sentinel.future) + + async def test_metadata(self): + rpc, call = make_async_rpc() + bidi_rpc = bidi_async.AsyncBidiRpc(rpc, metadata=mock.sentinel.A) + assert bidi_rpc._rpc_metadata == mock.sentinel.A + + await bidi_rpc.open() + assert bidi_rpc.call == call + rpc.assert_awaited_once() + assert rpc.call_args.kwargs["metadata"] == mock.sentinel.A + + async def test_open(self): + rpc, call = make_async_rpc() + bidi_rpc = bidi_async.AsyncBidiRpc(rpc) + + await bidi_rpc.open() + + assert bidi_rpc.call == call + assert bidi_rpc.is_active + call.add_done_callback.assert_called_once_with(bidi_rpc._on_call_done) + + async def test_open_error_already_open(self): + rpc, _ = make_async_rpc() + bidi_rpc = bidi_async.AsyncBidiRpc(rpc) + + await bidi_rpc.open() + + with pytest.raises(ValueError): + await bidi_rpc.open() + + async def test_close(self): + rpc, call = make_async_rpc() + bidi_rpc = bidi_async.AsyncBidiRpc(rpc) + await bidi_rpc.open() + + await bidi_rpc.close() + + call.cancel.assert_called_once() + assert bidi_rpc.call is call + assert bidi_rpc.is_active is False + # ensure the request queue was signaled to stop. + assert bidi_rpc.pending_requests == 1 + assert await bidi_rpc._request_queue.get() is None + # ensure request and callbacks are cleaned up + assert bidi_rpc._initial_request is None + assert not bidi_rpc._callbacks + + async def test_close_no_rpc(self): + bidi_rpc = bidi_async.AsyncBidiRpc(None) + await bidi_rpc.close() + + async def test_send(self): + rpc, call = make_async_rpc() + bidi_rpc = bidi_async.AsyncBidiRpc(rpc) + await bidi_rpc.open() + + await bidi_rpc.send(mock.sentinel.request) + + assert bidi_rpc.pending_requests == 1 + assert await bidi_rpc._request_queue.get() is mock.sentinel.request + + async def test_send_not_open(self): + bidi_rpc = bidi_async.AsyncBidiRpc(None) + + with pytest.raises(ValueError): + await bidi_rpc.send(mock.sentinel.request) + + async def test_send_dead_rpc(self): + error = ValueError() + bidi_rpc = bidi_async.AsyncBidiRpc(None) + bidi_rpc.call = AsyncClosedCall(error) + + with pytest.raises(ValueError) as exc_info: + await bidi_rpc.send(mock.sentinel.request) + + assert exc_info.value == error + + async def test_recv(self): + bidi_rpc = bidi_async.AsyncBidiRpc(None) + bidi_rpc.call = mock.create_autospec(aio.Call, instance=True) + bidi_rpc.call.read = mock.AsyncMock(return_value=mock.sentinel.response) + + response = await bidi_rpc.recv() + + assert response == mock.sentinel.response + + async def test_recv_not_open(self): + bidi_rpc = bidi_async.AsyncBidiRpc(None) + + with pytest.raises(ValueError): + await bidi_rpc.recv() From fdcd903214620d409fb245225afda5545a28edc1 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Thu, 14 Aug 2025 06:57:15 +0000 Subject: [PATCH 02/28] fix linting --- google/api_core/bidi_async.py | 77 ++++++++++++++++---------------- tests/asyncio/test_bidi_async.py | 2 - 2 files changed, 38 insertions(+), 41 deletions(-) diff --git a/google/api_core/bidi_async.py b/google/api_core/bidi_async.py index 786cfe9a..6c74a09c 100644 --- a/google/api_core/bidi_async.py +++ b/google/api_core/bidi_async.py @@ -25,48 +25,48 @@ class _AsyncRequestQueueGenerator: """An async helper for sending requests to a gRPC stream from a Queue. - This generator takes requests off a given queue and yields them to gRPC. - - This helper is useful when you have an indeterminate, indefinite, or - otherwise open-ended set of requests to send through a request-streaming - (or bidirectional) RPC. - - The reason this is necessary is because gRPC takes an async iterator as the - request for request-streaming RPCs. gRPC consumes this iterator to allow - it to block while generating requests for the stream. However, if the - generator blocks indefinitely gRPC will not be able to clean up the task - as it'll be blocked on `anext(iterator)` and not be able to check the - channel status to stop iterating. This helper mitigates that by waiting - on the queue with a timeout and checking the RPC state before yielding. - - Finally, it allows for retrying without swapping queues because if it does - pull an item off the queue when the RPC is inactive, it'll immediately put - it back and then exit. This is necessary because yielding the item in this - case will cause gRPC to discard it. In practice, this means that the order -of messages is not guaranteed. If such a thing is necessary it would be - easy to use a priority queue. + This generator takes requests off a given queue and yields them to gRPC. - Example:: + This helper is useful when you have an indeterminate, indefinite, or + otherwise open-ended set of requests to send through a request-streaming + (or bidirectional) RPC. - requests = _AsyncRequestQueueGenerator(q) - call = await stub.StreamingRequest(requests) - requests.call = call + The reason this is necessary is because gRPC takes an async iterator as the + request for request-streaming RPCs. gRPC consumes this iterator to allow + it to block while generating requests for the stream. However, if the + generator blocks indefinitely gRPC will not be able to clean up the task + as it'll be blocked on `anext(iterator)` and not be able to check the + channel status to stop iterating. This helper mitigates that by waiting + on the queue with a timeout and checking the RPC state before yielding. - async for response in call: - print(response) - await q.put(...) + Finally, it allows for retrying without swapping queues because if it does + pull an item off the queue when the RPC is inactive, it'll immediately put + it back and then exit. This is necessary because yielding the item in this + case will cause gRPC to discard it. In practice, this means that the order + of messages is not guaranteed. If such a thing is necessary it would be + easy to use a priority queue. - Args: - queue (asyncio.Queue): The request queue. - period (float): The number of seconds to wait for items from the queue - before checking if the RPC is cancelled. In practice, this - determines the maximum amount of time the request consumption - task will live after the RPC is cancelled. - initial_request (Union[protobuf.Message, - Callable[[], protobuf.Message]]): The initial request to - yield. This is done independently of the request queue to allow for - easily restarting streams that require some initial configuration - request. + Example:: + + requests = _AsyncRequestQueueGenerator(q) + call = await stub.StreamingRequest(requests) + requests.call = call + + async for response in call: + print(response) + await q.put(...) + + Args: + queue (asyncio.Queue): The request queue. + period (float): The number of seconds to wait for items from the queue + before checking if the RPC is cancelled. In practice, this + determines the maximum amount of time the request consumption + task will live after the RPC is cancelled. + initial_request (Union[protobuf.Message, + Callable[[], protobuf.Message]]): The initial request to + yield. This is done independently of the request queue to allow for + easily restarting streams that require some initial configuration + request. """ def __init__(self, queue: asyncio.Queue, period: float = 1, initial_request=None): @@ -122,7 +122,6 @@ async def __aiter__(self): yield item - class AsyncBidiRpc: """A helper for consuming a async bi-directional streaming RPC. diff --git a/tests/asyncio/test_bidi_async.py b/tests/asyncio/test_bidi_async.py index 3da7a1ee..6423b749 100644 --- a/tests/asyncio/test_bidi_async.py +++ b/tests/asyncio/test_bidi_async.py @@ -22,13 +22,11 @@ import pytest try: - import grpc from grpc import aio except ImportError: # pragma: NO COVER pytest.skip("No GRPC", allow_module_level=True) from google.api_core import bidi_async -from google.api_core import exceptions async def consume_async_generator(gen): From 58351785901def5d388e9f3ac8144b454a76454b Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Wed, 27 Aug 2025 08:12:23 +0000 Subject: [PATCH 03/28] Add bidiBase, refactor bidiRpc and add AsyncBidi --- google/api_core/bidi.py | 40 ++------------ google/api_core/bidi_async.py | 44 +++------------ google/api_core/bidi_base.py | 95 ++++++++++++++++++++++++++++++++ tests/asyncio/test_bidi_async.py | 25 ++++++--- 4 files changed, 125 insertions(+), 79 deletions(-) create mode 100644 google/api_core/bidi_base.py diff --git a/google/api_core/bidi.py b/google/api_core/bidi.py index bed4c70e..31930551 100644 --- a/google/api_core/bidi.py +++ b/google/api_core/bidi.py @@ -22,6 +22,7 @@ import time from google.api_core import exceptions +from google.api_core.bidi_base import BidiRpcBase _LOGGER = logging.getLogger(__name__) _BIDIRECTIONAL_CONSUMER_NAME = "Thread-ConsumeBidirectionalStream" @@ -201,7 +202,7 @@ def __repr__(self): ) -class BidiRpc(object): +class BidiRpc(BidiRpcBase): """A helper for consuming a bi-directional streaming RPC. This maps gRPC's built-in interface which uses a request iterator and a @@ -240,35 +241,9 @@ class BidiRpc(object): the request. """ - def __init__(self, start_rpc, initial_request=None, metadata=None): - self._start_rpc = start_rpc - self._initial_request = initial_request - self._rpc_metadata = metadata - self._request_queue = queue_module.Queue() - self._request_generator = None - self._is_active = False - self._callbacks = [] - self.call = None - - def add_done_callback(self, callback): - """Adds a callback that will be called when the RPC terminates. - - This occurs when the RPC errors or is successfully terminated. - - Args: - callback (Callable[[grpc.Future], None]): The callback to execute. - It will be provided with the same gRPC future as the underlying - stream which will also be a :class:`grpc.Call`. - """ - self._callbacks.append(callback) - - def _on_call_done(self, future): - # This occurs when the RPC errors or is successfully terminated. - # Note that grpc's "future" here can also be a grpc.RpcError. - # See note in https://github.com/grpc/grpc/issues/10885#issuecomment-302651331 - # that `grpc.RpcError` is also `grpc.call`. - for callback in self._callbacks: - callback(future) + def _create_queue(self): + """Create a queue for requests.""" + return queue_module.Queue() def open(self): """Opens the stream.""" @@ -352,11 +327,6 @@ def is_active(self): """bool: True if this stream is currently open and active.""" return self.call is not None and self.call.is_active() - @property - def pending_requests(self): - """int: Returns an estimate of the number of queued requests.""" - return self._request_queue.qsize() - def _never_terminate(future_or_error): """By default, no errors cause BiDi termination.""" diff --git a/google/api_core/bidi_async.py b/google/api_core/bidi_async.py index 6c74a09c..c072f563 100644 --- a/google/api_core/bidi_async.py +++ b/google/api_core/bidi_async.py @@ -1,4 +1,4 @@ -# Copyright 2020, Google LLC +# Copyright 2024, Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ import logging from google.api_core import exceptions +from google.api_core.bidi_base import BidiRpcBase _LOGGER = logging.getLogger(__name__) @@ -69,7 +70,7 @@ class _AsyncRequestQueueGenerator: request. """ - def __init__(self, queue: asyncio.Queue, period: float = 1, initial_request=None): + def __init__(self, queue: asyncio.Queue, period: float = 0.1, initial_request=None): self._queue = queue self._period = period self._initial_request = initial_request @@ -122,7 +123,7 @@ async def __aiter__(self): yield item -class AsyncBidiRpc: +class AsyncBidiRpc(BidiRpcBase): """A helper for consuming a async bi-directional streaming RPC. This maps gRPC's built-in interface which uses a request iterator and a @@ -161,35 +162,9 @@ class AsyncBidiRpc: the request. """ - def __init__(self, start_rpc, initial_request=None, metadata=None): - self._start_rpc = start_rpc - self._initial_request = initial_request - self._rpc_metadata = metadata - self._request_queue = asyncio.Queue() - self._request_generator = None - self._callbacks = [] - self.call = None - self._loop = asyncio.get_event_loop() - - def add_done_callback(self, callback): - """Adds a callback that will be called when the RPC terminates. - - This occurs when the RPC errors or is successfully terminated. - - Args: - callback (Callable[[grpc.Future], None]): The callback to execute. - It will be provided with the same gRPC future as the underlying - stream which will also be a :class:`grpc.aio.Call`. - """ - self._callbacks.append(callback) - - def _on_call_done(self, future): - # This occurs when the RPC errors or is successfully terminated. - # Note that grpc's "future" here can also be a grpc.RpcError. - # See note in https://github.com/grpc/grpc/issues/10885#issuecomment-302651331 - # that `grpc.RpcError` is also `grpc.aio.Call`. - for callback in self._callbacks: - callback(future) + def _create_queue(self): + """Create a queue for requests.""" + return asyncio.Queue() async def open(self): """Opens the stream.""" @@ -268,8 +243,3 @@ async def recv(self): def is_active(self): """bool: True if this stream is currently open and active.""" return self.call is not None and not self.call.done() - - @property - def pending_requests(self): - """int: Returns an estimate of the number of queued requests.""" - return self._request_queue.qsize() diff --git a/google/api_core/bidi_base.py b/google/api_core/bidi_base.py new file mode 100644 index 00000000..809ca1f5 --- /dev/null +++ b/google/api_core/bidi_base.py @@ -0,0 +1,95 @@ +# Copyright 2024, Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may obtain a copy of the License at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Base class for Bi-directional streaming RPC helpers.""" + + +class BidiRpcBase: + """A base helper for consuming a bi-directional streaming RPC. + + This maps gRPC's built-in interface which uses a request iterator and a + response iterator into a socket-like :func:`send` and :func:`recv`. This + is a more useful pattern for long-running or asymmetric streams (streams + where there is not a direct correlation between the requests and + responses). + + This does *not* retry the stream on errors. + + Args: + start_rpc (grpc.StreamStreamMultiCallable): The gRPC method used to + start the RPC. + initial_request (Union[protobuf.Message, + Callable[[], protobuf.Message]]): The initial request to + yield. This is useful if an initial request is needed to start the + stream. + metadata (Sequence[Tuple(str, str)]): RPC metadata to include in + the request. + """ + + def __init__(self, start_rpc, initial_request=None, metadata=None): + self._start_rpc = start_rpc + self._initial_request = initial_request + self._rpc_metadata = metadata + self._request_queue = self._create_queue() + self._request_generator = None + self._callbacks = [] + self.call = None + + def _create_queue(self): + """Create a queue for requests.""" + raise NotImplementedError("Not implemented in base class") + + def add_done_callback(self, callback): + """Adds a callback that will be called when the RPC terminates. + + This occurs when the RPC errors or is successfully terminated. + + Args: + callback (Callable[[grpc.Future], None]): The callback to execute. + It will be provided with the same gRPC future as the underlying + stream which will also be a :class:`grpc.aio.Call`. + """ + self._callbacks.append(callback) + + def _on_call_done(self, future): + # This occurs when the RPC errors or is successfully terminated. + # Note that grpc's "future" here can also be a grpc.RpcError. + # See note in https://github.com/grpc/grpc/issues/10885#issuecomment-302651331 + # that `grpc.RpcError` is also `grpc.aio.Call`. + for callback in self._callbacks: + callback(future) + + def open(self): + """Opens the stream.""" + raise NotImplementedError("Not implemented in base class") + + def close(self): + """Closes the stream.""" + raise NotImplementedError("Not implemented in base class") + + def send(self, request): + """Queue a message to be sent on the stream.""" + raise NotImplementedError("Not implemented in base class") + + def recv(self): + """Wait for a message to be returned from the stream.""" + raise NotImplementedError("Not implemented in base class") + + @property + def is_active(self): + """bool: True if this stream is currently open and active.""" + raise NotImplementedError("Not implemented in base class") + + @property + def pending_requests(self): + """int: Returns an estimate of the number of queued requests.""" + return self._request_queue.qsize() diff --git a/tests/asyncio/test_bidi_async.py b/tests/asyncio/test_bidi_async.py index 6423b749..9ed4c44f 100644 --- a/tests/asyncio/test_bidi_async.py +++ b/tests/asyncio/test_bidi_async.py @@ -14,10 +14,12 @@ import asyncio +from unittest import mock + try: - from unittest import mock + from unittest.mock import AsyncMock except ImportError: # pragma: NO COVER - import mock # type: ignore + from asyncmock import AsyncMock import pytest @@ -138,9 +140,9 @@ async def test_exit_with_stop(self): def make_async_rpc(): """Makes a mock async RPC used to test Bidi classes.""" call = mock.create_autospec(aio.StreamStreamCall, instance=True) - rpc = mock.AsyncMock(spec=aio.StreamStreamMultiCallable) + rpc = AsyncMock() - async def rpc_side_effect(request, metadata=None): + def rpc_side_effect(request, metadata=None): call.done.return_value = False return call @@ -151,7 +153,7 @@ def cancel_side_effect(): return True call.cancel.side_effect = cancel_side_effect - call.read = mock.AsyncMock() + call.read = AsyncMock() return rpc, call @@ -167,7 +169,6 @@ async def read(self): raise self.exception -@pytest.mark.asyncio class TestAsyncBidiRpc: def test_initial_state(self): bidi_rpc = bidi_async.AsyncBidiRpc(None) @@ -182,6 +183,7 @@ def test_done_callbacks(self): callback.assert_called_once_with(mock.sentinel.future) + @pytest.mark.asyncio async def test_metadata(self): rpc, call = make_async_rpc() bidi_rpc = bidi_async.AsyncBidiRpc(rpc, metadata=mock.sentinel.A) @@ -192,6 +194,7 @@ async def test_metadata(self): rpc.assert_awaited_once() assert rpc.call_args.kwargs["metadata"] == mock.sentinel.A + @pytest.mark.asyncio async def test_open(self): rpc, call = make_async_rpc() bidi_rpc = bidi_async.AsyncBidiRpc(rpc) @@ -202,6 +205,7 @@ async def test_open(self): assert bidi_rpc.is_active call.add_done_callback.assert_called_once_with(bidi_rpc._on_call_done) + @pytest.mark.asyncio async def test_open_error_already_open(self): rpc, _ = make_async_rpc() bidi_rpc = bidi_async.AsyncBidiRpc(rpc) @@ -211,6 +215,7 @@ async def test_open_error_already_open(self): with pytest.raises(ValueError): await bidi_rpc.open() + @pytest.mark.asyncio async def test_close(self): rpc, call = make_async_rpc() bidi_rpc = bidi_async.AsyncBidiRpc(rpc) @@ -228,10 +233,12 @@ async def test_close(self): assert bidi_rpc._initial_request is None assert not bidi_rpc._callbacks + @pytest.mark.asyncio async def test_close_no_rpc(self): bidi_rpc = bidi_async.AsyncBidiRpc(None) await bidi_rpc.close() + @pytest.mark.asyncio async def test_send(self): rpc, call = make_async_rpc() bidi_rpc = bidi_async.AsyncBidiRpc(rpc) @@ -242,12 +249,14 @@ async def test_send(self): assert bidi_rpc.pending_requests == 1 assert await bidi_rpc._request_queue.get() is mock.sentinel.request + @pytest.mark.asyncio async def test_send_not_open(self): bidi_rpc = bidi_async.AsyncBidiRpc(None) with pytest.raises(ValueError): await bidi_rpc.send(mock.sentinel.request) + @pytest.mark.asyncio async def test_send_dead_rpc(self): error = ValueError() bidi_rpc = bidi_async.AsyncBidiRpc(None) @@ -258,15 +267,17 @@ async def test_send_dead_rpc(self): assert exc_info.value == error + @pytest.mark.asyncio async def test_recv(self): bidi_rpc = bidi_async.AsyncBidiRpc(None) bidi_rpc.call = mock.create_autospec(aio.Call, instance=True) - bidi_rpc.call.read = mock.AsyncMock(return_value=mock.sentinel.response) + bidi_rpc.call.read = AsyncMock(return_value=mock.sentinel.response) response = await bidi_rpc.recv() assert response == mock.sentinel.response + @pytest.mark.asyncio async def test_recv_not_open(self): bidi_rpc = bidi_async.AsyncBidiRpc(None) From df8fba573cdc689a4f15f73b1796fd15ae53be3e Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 29 Aug 2025 08:14:13 +0000 Subject: [PATCH 04/28] remove changes from noxfile --- noxfile.py | 55 +++++++++++++++++++++++++++--------------------------- 1 file changed, 28 insertions(+), 27 deletions(-) diff --git a/noxfile.py b/noxfile.py index a96f1e91..ac21330e 100644 --- a/noxfile.py +++ b/noxfile.py @@ -180,36 +180,37 @@ def default(session, install_grpc=True, prerelease=False, install_async_rest=Fal session.run("python", "-c", "import grpc; print(grpc.__version__)") session.run("python", "-c", "import google.auth; print(google.auth.__version__)") - pytest_command = ["python", "-m", "pytest"] - - if session.posargs: - pytest_options = list(session.posargs) - else: - pytest_options = [ - "--quiet", - "--cov=google.api_core", - "--cov=tests.unit", - "--cov-append", - "--cov-config=.coveragerc", - "--cov-report=", - "--cov-fail-under=0", - "-n=auto", - os.path.join("tests", "unit"), - "--cov=tests.asyncio", - os.path.join("tests", "asyncio"), - ] - - # Temporarily disable coverage for Python 3.12 due to a missing - # _sqlite3 module in some environments. This is a workaround. The proper - # fix is to reinstall Python 3.12 with SQLite support. - # We only do this when not running specific tests (no posargs). - if session.python == "3.12" and not session.posargs: - pytest_options = [opt for opt in pytest_options if not opt.startswith("--cov")] - pytest_command.extend(["-p", "no:cov"]) + pytest_args = [ + "python", + "-m", + "pytest", + *( + # Helpful for running a single test or testfile. + session.posargs + or [ + "--quiet", + "--cov=google.api_core", + "--cov=tests.unit", + "--cov-append", + "--cov-config=.coveragerc", + "--cov-report=", + "--cov-fail-under=0", + # Running individual tests with parallelism enabled is usually not helpful. + "-n=auto", + os.path.join("tests", "unit"), + ] + ), + ] session.install("asyncmock", "pytest-asyncio") - session.run(*pytest_command, *pytest_options) + # Having positional arguments means the user wants to run specific tests. + # Best not to add additional tests to that list. + if not session.posargs: + pytest_args.append("--cov=tests.asyncio") + pytest_args.append(os.path.join("tests", "asyncio")) + + session.run(*pytest_args) @nox.session(python=PYTHON_VERSIONS) From 19cf2737019e5392340e3b6056c01c957c2a5ff6 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Sat, 30 Aug 2025 12:05:59 +0000 Subject: [PATCH 05/28] use queue.get() instead of queue.get_nowait() --- google/api_core/bidi.py | 2 +- google/api_core/bidi_async.py | 20 ++----------- google/api_core/bidi_base.py | 26 ++++++++--------- tests/asyncio/test_bidi_async.py | 50 ++++++++++++++------------------ 4 files changed, 38 insertions(+), 60 deletions(-) diff --git a/google/api_core/bidi.py b/google/api_core/bidi.py index 31930551..12a55435 100644 --- a/google/api_core/bidi.py +++ b/google/api_core/bidi.py @@ -297,7 +297,7 @@ def send(self, request): request (protobuf.Message): The request to send. """ if self.call is None: - raise ValueError("Can not send() on an RPC that has never been open()ed.") + raise ValueError("Can not send() on an RPC that has never been opened.") # Don't use self.is_active(), as ResumableBidiRpc will overload it # to mean something semantically different. diff --git a/google/api_core/bidi_async.py b/google/api_core/bidi_async.py index c072f563..cd5ecf36 100644 --- a/google/api_core/bidi_async.py +++ b/google/api_core/bidi_async.py @@ -59,10 +59,6 @@ class _AsyncRequestQueueGenerator: Args: queue (asyncio.Queue): The request queue. - period (float): The number of seconds to wait for items from the queue - before checking if the RPC is cancelled. In practice, this - determines the maximum amount of time the request consumption - task will live after the RPC is cancelled. initial_request (Union[protobuf.Message, Callable[[], protobuf.Message]]): The initial request to yield. This is done independently of the request queue to allow for @@ -70,9 +66,8 @@ class _AsyncRequestQueueGenerator: request. """ - def __init__(self, queue: asyncio.Queue, period: float = 0.1, initial_request=None): + def __init__(self, queue: asyncio.Queue, initial_request=None): self._queue = queue - self._period = period self._initial_request = initial_request self.call = None @@ -91,18 +86,7 @@ async def __aiter__(self): yield self._initial_request while True: - try: - item = self._queue.get_nowait() - except asyncio.QueueEmpty: - if not self._is_active(): - _LOGGER.debug( - "Empty queue and inactive call, exiting request generator." - ) - return - else: - # call is still active, keep waiting for queue items. - await asyncio.sleep(self._period) - continue + item = await self._queue.get() # The consumer explicitly sent "None", indicating that the request # should end. diff --git a/google/api_core/bidi_base.py b/google/api_core/bidi_base.py index 809ca1f5..4e637475 100644 --- a/google/api_core/bidi_base.py +++ b/google/api_core/bidi_base.py @@ -1,4 +1,4 @@ -# Copyright 2024, Google LLC +# Copyright 2025, Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # You may obtain a copy of the License at @@ -68,21 +68,21 @@ def _on_call_done(self, future): for callback in self._callbacks: callback(future) - def open(self): - """Opens the stream.""" - raise NotImplementedError("Not implemented in base class") + # def open(self): + # """Opens the stream.""" + # raise NotImplementedError("Not implemented in base class") - def close(self): - """Closes the stream.""" - raise NotImplementedError("Not implemented in base class") + # def close(self): + # """Closes the stream.""" + # raise NotImplementedError("Not implemented in base class") - def send(self, request): - """Queue a message to be sent on the stream.""" - raise NotImplementedError("Not implemented in base class") + # def send(self, request): + # """Queue a message to be sent on the stream.""" + # raise NotImplementedError("Not implemented in base class") - def recv(self): - """Wait for a message to be returned from the stream.""" - raise NotImplementedError("Not implemented in base class") + # def recv(self): + # """Wait for a message to be returned from the stream.""" + # raise NotImplementedError("Not implemented in base class") @property def is_active(self): diff --git a/tests/asyncio/test_bidi_async.py b/tests/asyncio/test_bidi_async.py index 9ed4c44f..bbb76c63 100644 --- a/tests/asyncio/test_bidi_async.py +++ b/tests/asyncio/test_bidi_async.py @@ -1,4 +1,4 @@ -# Copyright 2020, Google LLC All rights reserved. +# Copyright 2025, Google LLC All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -31,13 +31,6 @@ from google.api_core import bidi_async -async def consume_async_generator(gen): - items = [] - async for item in gen: - items.append(item) - return items - - @pytest.mark.asyncio class Test_AsyncRequestQueueGenerator: async def test_bounded_consume(self): @@ -48,21 +41,22 @@ async def test_bounded_consume(self): await q.put(mock.sentinel.A) await q.put(mock.sentinel.B) - generator = bidi_async._AsyncRequestQueueGenerator(q, period=0.01) + generator = bidi_async._AsyncRequestQueueGenerator(q) generator.call = call items = [] - gen_aiter = generator.__aiter__() + gen_aiter = aiter(generator) - items.append(await gen_aiter.__anext__()) - items.append(await gen_aiter.__anext__()) + items.append(await anext(gen_aiter)) + items.append(await anext(gen_aiter)) # At this point, the queue is empty. The next call to anext will sleep. # We make the call inactive. call.done.return_value = True - with pytest.raises(StopAsyncIteration): - await gen_aiter.__anext__() + with pytest.raises(asyncio.TimeoutError): + async with asyncio.timeout(1): + await anext(gen_aiter) assert items == [mock.sentinel.A, mock.sentinel.B] @@ -76,9 +70,7 @@ async def test_yield_initial_and_exit(self): ) generator.call = call - items = await consume_async_generator(generator) - - assert items == [mock.sentinel.A] + assert await anext(aiter(generator)) == mock.sentinel.A async def test_yield_initial_callable_and_exit(self): q = asyncio.Queue() @@ -90,9 +82,7 @@ async def test_yield_initial_callable_and_exit(self): ) generator.call = call - items = await consume_async_generator(generator) - - assert items == [mock.sentinel.A] + assert await anext(aiter(generator)) == mock.sentinel.A async def test_exit_when_inactive_with_item(self): q = asyncio.Queue() @@ -104,9 +94,13 @@ async def test_exit_when_inactive_with_item(self): generator = bidi_async._AsyncRequestQueueGenerator(q) generator.call = call - items = await consume_async_generator(generator) + with pytest.raises(StopAsyncIteration) as exc_info: + assert await anext(aiter(generator)) + assert ( + exc_info.value.args[0] + == "Inactive call, replacing item on queue and exiting request generator." + ) - assert items == [] # Make sure it put the item back. assert not q.empty() assert await q.get() == mock.sentinel.A @@ -119,9 +113,9 @@ async def test_exit_when_inactive_empty(self): generator = bidi_async._AsyncRequestQueueGenerator(q) generator.call = call - items = await consume_async_generator(generator) - - assert items == [] + with pytest.raises(asyncio.TimeoutError): + async with asyncio.timeout(1): + await anext(aiter(generator)) async def test_exit_with_stop(self): q = asyncio.Queue() @@ -132,9 +126,9 @@ async def test_exit_with_stop(self): generator = bidi_async._AsyncRequestQueueGenerator(q) generator.call = call - items = await consume_async_generator(generator) - - assert items == [] + with pytest.raises(StopAsyncIteration) as exc_info: + assert await anext(aiter(generator)) + assert exc_info.value.args[0] == "Cleanly exiting request generator." def make_async_rpc(): From 5b6d28784087a0f92af5b0233acc2a6f518c5963 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Sat, 30 Aug 2025 15:25:15 +0000 Subject: [PATCH 06/28] use asyncio.wait_for instead of asyncio.timeout python versions <= 3.10 dont support `async with asyncio.timeout` --- tests/asyncio/test_bidi_async.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/asyncio/test_bidi_async.py b/tests/asyncio/test_bidi_async.py index bbb76c63..18d83082 100644 --- a/tests/asyncio/test_bidi_async.py +++ b/tests/asyncio/test_bidi_async.py @@ -55,8 +55,7 @@ async def test_bounded_consume(self): call.done.return_value = True with pytest.raises(asyncio.TimeoutError): - async with asyncio.timeout(1): - await anext(gen_aiter) + await asyncio.wait_for(anext(gen_aiter), timeout=1) assert items == [mock.sentinel.A, mock.sentinel.B] @@ -114,8 +113,7 @@ async def test_exit_when_inactive_empty(self): generator.call = call with pytest.raises(asyncio.TimeoutError): - async with asyncio.timeout(1): - await anext(aiter(generator)) + await asyncio.wait_for(anext(aiter(generator)), timeout=1) async def test_exit_with_stop(self): q = asyncio.Queue() From 2599f9632a957b574e5b3f18e9976d4f04bc2246 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Sat, 30 Aug 2025 15:31:28 +0000 Subject: [PATCH 07/28] aiter() and anext() are not defined in python<=3.9 provide support for the same. --- tests/asyncio/test_bidi_async.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/asyncio/test_bidi_async.py b/tests/asyncio/test_bidi_async.py index 18d83082..e90b972b 100644 --- a/tests/asyncio/test_bidi_async.py +++ b/tests/asyncio/test_bidi_async.py @@ -30,6 +30,21 @@ from google.api_core import bidi_async +try: + aiter +except NameError: + + def aiter(obj): + return obj.__aiter__() + + +try: + anext +except NameError: + + async def anext(obj): + return await obj.__anext__() + @pytest.mark.asyncio class Test_AsyncRequestQueueGenerator: From 9c1a62186f2d50c7ef66c82cb588955e250fe07c Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Sat, 30 Aug 2025 15:54:51 +0000 Subject: [PATCH 08/28] remove support for python 3.7 and below --- tests/asyncio/test_bidi_async.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/asyncio/test_bidi_async.py b/tests/asyncio/test_bidi_async.py index e90b972b..0f21540e 100644 --- a/tests/asyncio/test_bidi_async.py +++ b/tests/asyncio/test_bidi_async.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import sys import asyncio from unittest import mock @@ -191,6 +192,10 @@ def test_done_callbacks(self): callback.assert_called_once_with(mock.sentinel.future) @pytest.mark.asyncio + @pytest.mark.skipif( + sys.version_info < (3, 8), + reason="Python 3.8 below doesnt provide support for assert_awaited_once", + ) async def test_metadata(self): rpc, call = make_async_rpc() bidi_rpc = bidi_async.AsyncBidiRpc(rpc, metadata=mock.sentinel.A) From cc19089454036378aa267283861bab6af2303e58 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Sat, 30 Aug 2025 16:11:08 +0000 Subject: [PATCH 09/28] fix mypy errors --- tests/asyncio/test_bidi_async.py | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/tests/asyncio/test_bidi_async.py b/tests/asyncio/test_bidi_async.py index 0f21540e..bb40ed03 100644 --- a/tests/asyncio/test_bidi_async.py +++ b/tests/asyncio/test_bidi_async.py @@ -17,10 +17,7 @@ from unittest import mock -try: - from unittest.mock import AsyncMock -except ImportError: # pragma: NO COVER - from asyncmock import AsyncMock +from unittest.mock import AsyncMock import pytest @@ -31,18 +28,12 @@ from google.api_core import bidi_async -try: - aiter -except NameError: + +if sys.version < (3, 10): # type: ignore[operator] def aiter(obj): return obj.__aiter__() - -try: - anext -except NameError: - async def anext(obj): return await obj.__anext__() @@ -193,7 +184,7 @@ def test_done_callbacks(self): @pytest.mark.asyncio @pytest.mark.skipif( - sys.version_info < (3, 8), + sys.version_info < (3, 8), # type: ignore[operator] reason="Python 3.8 below doesnt provide support for assert_awaited_once", ) async def test_metadata(self): From a86a9b11cc8afece58e6346c1403fa476a83ab7f Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Sat, 30 Aug 2025 16:15:40 +0000 Subject: [PATCH 10/28] fix typo, version -> version_info --- tests/asyncio/test_bidi_async.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/asyncio/test_bidi_async.py b/tests/asyncio/test_bidi_async.py index bb40ed03..117b393a 100644 --- a/tests/asyncio/test_bidi_async.py +++ b/tests/asyncio/test_bidi_async.py @@ -29,7 +29,7 @@ from google.api_core import bidi_async -if sys.version < (3, 10): # type: ignore[operator] +if sys.version_info < (3, 10): # type: ignore[operator] def aiter(obj): return obj.__aiter__() From 5464dc5a623022bf5e112c73437495345e711547 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Sat, 30 Aug 2025 16:20:01 +0000 Subject: [PATCH 11/28] AsyncMock not present in python3.7 --- tests/asyncio/test_bidi_async.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/asyncio/test_bidi_async.py b/tests/asyncio/test_bidi_async.py index 117b393a..b87fa913 100644 --- a/tests/asyncio/test_bidi_async.py +++ b/tests/asyncio/test_bidi_async.py @@ -17,7 +17,11 @@ from unittest import mock -from unittest.mock import AsyncMock +try: + from unittest.mock import AsyncMock +except ImportError: # pragma: NO COVER + from mock import AsyncMock # type: ignore + import pytest From e96a4092e266889815c0667a1a5e604f4cbc4e5c Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Sat, 30 Aug 2025 17:07:44 +0000 Subject: [PATCH 12/28] update doc strings --- google/api_core/bidi_async.py | 66 +++++++++++++++++------------------ google/api_core/bidi_base.py | 25 +++---------- 2 files changed, 38 insertions(+), 53 deletions(-) diff --git a/google/api_core/bidi_async.py b/google/api_core/bidi_async.py index cd5ecf36..4287e66d 100644 --- a/google/api_core/bidi_async.py +++ b/google/api_core/bidi_async.py @@ -26,44 +26,44 @@ class _AsyncRequestQueueGenerator: """An async helper for sending requests to a gRPC stream from a Queue. - This generator takes requests off a given queue and yields them to gRPC. - - This helper is useful when you have an indeterminate, indefinite, or - otherwise open-ended set of requests to send through a request-streaming - (or bidirectional) RPC. - - The reason this is necessary is because gRPC takes an async iterator as the - request for request-streaming RPCs. gRPC consumes this iterator to allow - it to block while generating requests for the stream. However, if the - generator blocks indefinitely gRPC will not be able to clean up the task - as it'll be blocked on `anext(iterator)` and not be able to check the - channel status to stop iterating. This helper mitigates that by waiting - on the queue with a timeout and checking the RPC state before yielding. - - Finally, it allows for retrying without swapping queues because if it does - pull an item off the queue when the RPC is inactive, it'll immediately put - it back and then exit. This is necessary because yielding the item in this - case will cause gRPC to discard it. In practice, this means that the order + This generator takes requests off a given queue and yields them to gRPC. + + This helper is useful when you have an indeterminate, indefinite, or + otherwise open-ended set of requests to send through a request-streaming + (or bidirectional) RPC. + + The reason this is necessary + + is because it's let's user have control on the when they would want to + send requests proto messages instead of sending all of them initilally. + + This is achieved via asynchronous queue (asyncio.Queue), + gRPC awaits until there's a message in the queue. + + Finally, it allows for retrying without swapping queues because if it does + pull an item off the queue when the RPC is inactive, it'll immediately put + it back and then exit. This is necessary because yielding the item in this + case will cause gRPC to discard it. In practice, this means that the order of messages is not guaranteed. If such a thing is necessary it would be - easy to use a priority queue. + easy to use a priority queue. - Example:: + Example:: - requests = _AsyncRequestQueueGenerator(q) - call = await stub.StreamingRequest(requests) - requests.call = call + requests = _AsyncRequestQueueGenerator(q) + call = await stub.StreamingRequest(requests) + requests.call = call - async for response in call: - print(response) - await q.put(...) + async for response in call: + print(response) + await q.put(...) - Args: - queue (asyncio.Queue): The request queue. - initial_request (Union[protobuf.Message, - Callable[[], protobuf.Message]]): The initial request to - yield. This is done independently of the request queue to allow for - easily restarting streams that require some initial configuration - request. + Args: + queue (asyncio.Queue): The request queue. + initial_request (Union[protobuf.Message, + Callable[[], protobuf.Message]]): The initial request to + yield. This is done independently of the request queue to allow for + easily restarting streams that require some initial configuration + request. """ def __init__(self, queue: asyncio.Queue, initial_request=None): diff --git a/google/api_core/bidi_base.py b/google/api_core/bidi_base.py index 4e637475..8d9ec84b 100644 --- a/google/api_core/bidi_base.py +++ b/google/api_core/bidi_base.py @@ -10,11 +10,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Base class for Bi-directional streaming RPC helpers.""" +"""Base class for bi-directional streaming RPC helpers.""" class BidiRpcBase: - """A base helper for consuming a bi-directional streaming RPC. + """A base class for consuming a bi-directional streaming RPC. This maps gRPC's built-in interface which uses a request iterator and a response iterator into a socket-like :func:`send` and :func:`recv`. This @@ -25,8 +25,9 @@ class BidiRpcBase: This does *not* retry the stream on errors. Args: - start_rpc (grpc.StreamStreamMultiCallable): The gRPC method used to - start the RPC. + start_rpc (Union[grpc.StreamStreamMultiCallable, + grpc.aio.StreamStreamMultiCallable]): The gRPC method used + to start the RPC. initial_request (Union[protobuf.Message, Callable[[], protobuf.Message]]): The initial request to yield. This is useful if an initial request is needed to start the @@ -68,22 +69,6 @@ def _on_call_done(self, future): for callback in self._callbacks: callback(future) - # def open(self): - # """Opens the stream.""" - # raise NotImplementedError("Not implemented in base class") - - # def close(self): - # """Closes the stream.""" - # raise NotImplementedError("Not implemented in base class") - - # def send(self, request): - # """Queue a message to be sent on the stream.""" - # raise NotImplementedError("Not implemented in base class") - - # def recv(self): - # """Wait for a message to be returned from the stream.""" - # raise NotImplementedError("Not implemented in base class") - @property def is_active(self): """bool: True if this stream is currently open and active.""" From 52686b8f3899decb17ae0ec5e189c9657271187c Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Mon, 1 Sep 2025 11:39:58 +0000 Subject: [PATCH 13/28] use pytest.approx to round of floating point errors. --- tests/unit/gapic/test_method.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/unit/gapic/test_method.py b/tests/unit/gapic/test_method.py index 8896429c..32c28f73 100644 --- a/tests/unit/gapic/test_method.py +++ b/tests/unit/gapic/test_method.py @@ -201,7 +201,10 @@ def test_wrap_method_with_overriding_timeout_as_a_number(): result = wrapped_method(timeout=22) assert result == 42 - method.assert_called_once_with(timeout=22, metadata=mock.ANY) + actual_timeout = method.call_args[1]["timeout"] + metadata = method.call_args[1]["metadata"] + assert metadata == mock.ANY + assert actual_timeout == pytest.approx(22) def test_wrap_method_with_call(): From d01421be191d023a19e4740303a95934845e8f8c Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Mon, 1 Sep 2025 12:25:46 +0000 Subject: [PATCH 14/28] add tolerance --- tests/unit/gapic/test_method.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/gapic/test_method.py b/tests/unit/gapic/test_method.py index 32c28f73..7111539d 100644 --- a/tests/unit/gapic/test_method.py +++ b/tests/unit/gapic/test_method.py @@ -204,7 +204,7 @@ def test_wrap_method_with_overriding_timeout_as_a_number(): actual_timeout = method.call_args[1]["timeout"] metadata = method.call_args[1]["metadata"] assert metadata == mock.ANY - assert actual_timeout == pytest.approx(22) + assert actual_timeout == pytest.approx(22, abs=0.01) def test_wrap_method_with_call(): From 98eae06758120e7e7cbdeab0a15fc32fd984cdb4 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Mon, 1 Sep 2025 13:26:29 +0000 Subject: [PATCH 15/28] add test_open_error_call_error unit test --- tests/asyncio/test_bidi_async.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tests/asyncio/test_bidi_async.py b/tests/asyncio/test_bidi_async.py index b87fa913..9625c806 100644 --- a/tests/asyncio/test_bidi_async.py +++ b/tests/asyncio/test_bidi_async.py @@ -31,6 +31,7 @@ pytest.skip("No GRPC", allow_module_level=True) from google.api_core import bidi_async +from google.api_core import exceptions if sys.version_info < (3, 10): # type: ignore[operator] @@ -222,6 +223,23 @@ async def test_open_error_already_open(self): with pytest.raises(ValueError): await bidi_rpc.open() + @pytest.mark.asyncio + async def test_open_error_call_error(self): + rpc, _ = make_async_rpc() + expected_exception = exceptions.GoogleAPICallError( + "test", response=mock.sentinel.response + ) + rpc.side_effect = expected_exception + bidi_rpc = bidi_async.AsyncBidiRpc(rpc) + callback = mock.Mock(spec=["__call__"]) + bidi_rpc.add_done_callback(callback) + + with pytest.raises(exceptions.GoogleAPICallError) as exc_info: + await bidi_rpc.open() + + assert exc_info.value == expected_exception + callback.assert_called_once_with(mock.sentinel.response) + @pytest.mark.asyncio async def test_close(self): rpc, call = make_async_rpc() From 5a7550f507a1aa9234a0f419fb0de2b77f9b228e Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Mon, 8 Sep 2025 19:18:51 +0000 Subject: [PATCH 16/28] minor changes based on PR comments --- google/api_core/bidi_async.py | 15 +++++++-------- google/api_core/bidi_base.py | 4 ++-- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/google/api_core/bidi_async.py b/google/api_core/bidi_async.py index 4287e66d..3aa0d31c 100644 --- a/google/api_core/bidi_async.py +++ b/google/api_core/bidi_async.py @@ -1,4 +1,4 @@ -# Copyright 2024, Google LLC +# Copyright 2025, Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -24,9 +24,9 @@ class _AsyncRequestQueueGenerator: - """An async helper for sending requests to a gRPC stream from a Queue. + """_AsyncRequestQueueGenerator is a helper class for sending asynchronous requests to a gRPC stream from a Queue. - This generator takes requests off a given queue and yields them to gRPC. + This generator takes asynchronous requests off a given queue and yields them to gRPC. This helper is useful when you have an indeterminate, indefinite, or otherwise open-ended set of requests to send through a request-streaming @@ -72,10 +72,9 @@ def __init__(self, queue: asyncio.Queue, initial_request=None): self.call = None def _is_active(self): - # Note: there is a possibility that this starts *before* the call - # property is set. So we have to check if self.call is set before - # seeing if it's active. We need to return True if self.call is None. - # See https://github.com/googleapis/python-api-core/issues/560. + """ + Returns true if the call is not set or not completed. + """ return self.call is None or not self.call.done() async def __aiter__(self): @@ -200,7 +199,7 @@ async def send(self, request): request (protobuf.Message): The request to send. """ if self.call is None: - raise ValueError("Can not send() on an RPC that has never been open()ed.") + raise ValueError("Can not send() on an RPC that has never been opened.") # Don't use self.is_active(), as ResumableBidiRpc will overload it # to mean something semantically different. diff --git a/google/api_core/bidi_base.py b/google/api_core/bidi_base.py index 8d9ec84b..195e3575 100644 --- a/google/api_core/bidi_base.py +++ b/google/api_core/bidi_base.py @@ -47,7 +47,7 @@ def __init__(self, start_rpc, initial_request=None, metadata=None): def _create_queue(self): """Create a queue for requests.""" - raise NotImplementedError("Not implemented in base class") + raise NotImplementedError("`_create_queue` is not implemented.") def add_done_callback(self, callback): """Adds a callback that will be called when the RPC terminates. @@ -72,7 +72,7 @@ def _on_call_done(self, future): @property def is_active(self): """bool: True if this stream is currently open and active.""" - raise NotImplementedError("Not implemented in base class") + raise NotImplementedError("`is_active` is not implemented.") @property def pending_requests(self): From c486c49eb8c5310a68630bf375fa811c550c1bcc Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Mon, 8 Sep 2025 19:22:17 +0000 Subject: [PATCH 17/28] formatting fixes --- google/api_core/bidi_async.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/google/api_core/bidi_async.py b/google/api_core/bidi_async.py index 3aa0d31c..f88a3e48 100644 --- a/google/api_core/bidi_async.py +++ b/google/api_core/bidi_async.py @@ -24,9 +24,11 @@ class _AsyncRequestQueueGenerator: - """_AsyncRequestQueueGenerator is a helper class for sending asynchronous requests to a gRPC stream from a Queue. + """_AsyncRequestQueueGenerator is a helper class for sending asynchronous + requests to a gRPC stream from a Queue. - This generator takes asynchronous requests off a given queue and yields them to gRPC. + This generator takes asynchronous requests off a given queue and yields them + to gRPC. This helper is useful when you have an indeterminate, indefinite, or otherwise open-ended set of requests to send through a request-streaming From 14342b14af92051a77f228e5f18f137a371fce26 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Mon, 8 Sep 2025 19:28:41 +0000 Subject: [PATCH 18/28] remove changes from test_method.py --- tests/unit/gapic/test_method.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/unit/gapic/test_method.py b/tests/unit/gapic/test_method.py index 7111539d..c27de64e 100644 --- a/tests/unit/gapic/test_method.py +++ b/tests/unit/gapic/test_method.py @@ -201,6 +201,7 @@ def test_wrap_method_with_overriding_timeout_as_a_number(): result = wrapped_method(timeout=22) assert result == 42 + actual_timeout = method.call_args[1]["timeout"] metadata = method.call_args[1]["metadata"] assert metadata == mock.ANY From 6fabb1f347df0de6c3b520b147be82c2c8eabb20 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Wed, 17 Sep 2025 17:01:44 +0000 Subject: [PATCH 19/28] fix typos in doc string and error msg --- google/api_core/bidi_async.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/google/api_core/bidi_async.py b/google/api_core/bidi_async.py index f88a3e48..873e9301 100644 --- a/google/api_core/bidi_async.py +++ b/google/api_core/bidi_async.py @@ -34,10 +34,9 @@ class _AsyncRequestQueueGenerator: otherwise open-ended set of requests to send through a request-streaming (or bidirectional) RPC. - The reason this is necessary - - is because it's let's user have control on the when they would want to - send requests proto messages instead of sending all of them initilally. + The reason this is necessary is because it lets the user have control on + when they would want to send requests proto messages instead of sending all + of them initilally. This is achieved via asynchronous queue (asyncio.Queue), gRPC awaits until there's a message in the queue. @@ -220,7 +219,7 @@ async def recv(self): protobuf.Message: The received message. """ if self.call is None: - raise ValueError("Can not recv() on an RPC that has never been open()ed.") + raise ValueError("Can not recv() on an RPC that has never been opened.") return await self.call.read() From 2df8cc419753dc6f1baa84310f5e5ae796c0b73d Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Thu, 18 Sep 2025 17:19:04 +0000 Subject: [PATCH 20/28] minor fixes in comments & doc strings --- google/api_core/bidi_async.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/google/api_core/bidi_async.py b/google/api_core/bidi_async.py index 873e9301..9e0b8899 100644 --- a/google/api_core/bidi_async.py +++ b/google/api_core/bidi_async.py @@ -153,7 +153,7 @@ def _create_queue(self): async def open(self): """Opens the stream.""" if self.is_active: - raise ValueError("Can not open an already open stream.") + raise ValueError("Cannot open an already open stream.") request_generator = _AsyncRequestQueueGenerator( self._request_queue, initial_request=self._initial_request @@ -168,8 +168,6 @@ async def open(self): request_generator.call = call - # TODO: api_core should expose the future interface for wrapped - # callables as well. if hasattr(call, "_wrapped"): # pragma: NO COVER call._wrapped.add_done_callback(self._on_call_done) else: From b963922d564c3f658bd038c1b0ba4d8196ac6430 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Wed, 1 Oct 2025 13:45:29 +0000 Subject: [PATCH 21/28] address review comments. mostly fix doc-strings --- google/api_core/bidi.py | 57 +++++++++--------- google/api_core/bidi_async.py | 99 ++++++++++++++++++-------------- google/api_core/bidi_base.py | 4 +- tests/asyncio/test_bidi_async.py | 4 +- 4 files changed, 87 insertions(+), 77 deletions(-) diff --git a/google/api_core/bidi.py b/google/api_core/bidi.py index 12a55435..c91884cb 100644 --- a/google/api_core/bidi.py +++ b/google/api_core/bidi.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Bi-directional streaming RPC helpers.""" +"""Helpers for synchronous bidirectional streaming RPCs.""" import collections import datetime @@ -28,6 +28,28 @@ _BIDIRECTIONAL_CONSUMER_NAME = "Thread-ConsumeBidirectionalStream" +# The reason this is necessary is because gRPC takes an iterator as the +# request for request-streaming RPCs. gRPC consumes this iterator in another +# thread to allow it to block while generating requests for the stream. +# However, if the generator blocks indefinitely gRPC will not be able to +# clean up the thread as it'll be blocked on `next(iterator)` and not be able +# to check the channel status to stop iterating. This helper mitigates that +# by waiting on the queue with a timeout and checking the RPC state before +# yielding. +# +# Finally, it allows for retrying without swapping queues because if it does +# pull an item off the queue when the RPC is inactive, it'll immediately put +# it back and then exit. This is necessary because yielding the item in this +# case will cause gRPC to discard it. In practice, this means that the order +# of messages is not guaranteed. If such a thing is necessary it would be +# easy to use a priority queue. +# +# Note that it is possible to accomplish this behavior without "spinning" +# (using a queue timeout). One possible way would be to use more threads to +# multiplex the grpc end event with the queue, another possible way is to +# use selectors and a custom event/queue object. Both of these approaches +# are significant from an engineering perspective for small benefit - the +# CPU consumed by spinning is pretty minuscule. class _RequestQueueGenerator(object): """A helper for sending requests to a gRPC stream from a Queue. @@ -37,21 +59,6 @@ class _RequestQueueGenerator(object): otherwise open-ended set of requests to send through a request-streaming (or bidirectional) RPC. - The reason this is necessary is because gRPC takes an iterator as the - request for request-streaming RPCs. gRPC consumes this iterator in another - thread to allow it to block while generating requests for the stream. - However, if the generator blocks indefinitely gRPC will not be able to - clean up the thread as it'll be blocked on `next(iterator)` and not be able - to check the channel status to stop iterating. This helper mitigates that - by waiting on the queue with a timeout and checking the RPC state before - yielding. - - Finally, it allows for retrying without swapping queues because if it does - pull an item off the queue when the RPC is inactive, it'll immediately put - it back and then exit. This is necessary because yielding the item in this - case will cause gRPC to discard it. In practice, this means that the order - of messages is not guaranteed. If such a thing is necessary it would be - easy to use a priority queue. Example:: @@ -63,12 +70,6 @@ class _RequestQueueGenerator(object): print(response) q.put(...) - Note that it is possible to accomplish this behavior without "spinning" - (using a queue timeout). One possible way would be to use more threads to - multiplex the grpc end event with the queue, another possible way is to - use selectors and a custom event/queue object. Both of these approaches - are significant from an engineering perspective for small benefit - the - CPU consumed by spinning is pretty minuscule. Args: queue (queue_module.Queue): The request queue. @@ -248,7 +249,7 @@ def _create_queue(self): def open(self): """Opens the stream.""" if self.is_active: - raise ValueError("Can not open an already open stream.") + raise ValueError("Cannot open an already open stream.") request_generator = _RequestQueueGenerator( self._request_queue, initial_request=self._initial_request @@ -297,7 +298,7 @@ def send(self, request): request (protobuf.Message): The request to send. """ if self.call is None: - raise ValueError("Can not send() on an RPC that has never been opened.") + raise ValueError("Cannot send on an RPC that has never been opened.") # Don't use self.is_active(), as ResumableBidiRpc will overload it # to mean something semantically different. @@ -318,13 +319,13 @@ def recv(self): protobuf.Message: The received message. """ if self.call is None: - raise ValueError("Can not recv() on an RPC that has never been open()ed.") + raise ValueError("Cannot recv on an RPC that has never been opened.") return next(self.call) @property def is_active(self): - """bool: True if this stream is currently open and active.""" + """True if this stream is currently open and active.""" return self.call is not None and self.call.is_active() @@ -514,7 +515,7 @@ def _send(self, request): call = self.call if call is None: - raise ValueError("Can not send() on an RPC that has never been open()ed.") + raise ValueError("Cannot send on an RPC that has never been opened.") # Don't use self.is_active(), as ResumableBidiRpc will overload it # to mean something semantically different. @@ -533,7 +534,7 @@ def _recv(self): call = self.call if call is None: - raise ValueError("Can not recv() on an RPC that has never been open()ed.") + raise ValueError("Cannot recv on an RPC that has never been opened.") return next(call) diff --git a/google/api_core/bidi_async.py b/google/api_core/bidi_async.py index 9e0b8899..674ea1e0 100644 --- a/google/api_core/bidi_async.py +++ b/google/api_core/bidi_async.py @@ -16,37 +16,42 @@ import asyncio import logging +from typing import Callable, Optional, Union + +from grpc import aio from google.api_core import exceptions from google.api_core.bidi_base import BidiRpcBase +from google.protobuf.message import Message as ProtobufMessage + + _LOGGER = logging.getLogger(__name__) +# The reason this is necessary is because it lets the user have control on +# when they would want to send requests proto messages instead of sending all +# of them initially. +# +# This is achieved via asynchronous queue (asyncio.Queue), +# gRPC awaits until there's a message in the queue. +# +# Finally, it allows for retrying without swapping queues because if it does +# pull an item off the queue when the RPC is inactive, it'll immediately put +# it back and then exit. This is necessary because yielding the item in this +# case will cause gRPC to discard it. In practice, this means that the order +# of messages is not guaranteed. If preserving order is necessary it would be +# easy to use a priority queue. class _AsyncRequestQueueGenerator: """_AsyncRequestQueueGenerator is a helper class for sending asynchronous requests to a gRPC stream from a Queue. - This generator takes asynchronous requests off a given queue and yields them - to gRPC. - - This helper is useful when you have an indeterminate, indefinite, or - otherwise open-ended set of requests to send through a request-streaming - (or bidirectional) RPC. + This generator takes asynchronous requests off a given `asyncio.Queue` and + yields them to gRPC. - The reason this is necessary is because it lets the user have control on - when they would want to send requests proto messages instead of sending all - of them initilally. - - This is achieved via asynchronous queue (asyncio.Queue), - gRPC awaits until there's a message in the queue. - - Finally, it allows for retrying without swapping queues because if it does - pull an item off the queue when the RPC is inactive, it'll immediately put - it back and then exit. This is necessary because yielding the item in this - case will cause gRPC to discard it. In practice, this means that the order - of messages is not guaranteed. If such a thing is necessary it would be - easy to use a priority queue. + It's useful when you have an indeterminate, indefinite, or otherwise + open-ended set of requests to send through a request-streaming (or + bidirectional) RPC. Example:: @@ -60,22 +65,30 @@ class _AsyncRequestQueueGenerator: Args: queue (asyncio.Queue): The request queue. - initial_request (Union[protobuf.Message, - Callable[[], protobuf.Message]]): The initial request to + initial_request (Union[ProtobufMessage, + Callable[[], ProtobufMessage]]): The initial request to yield. This is done independently of the request queue to allow for easily restarting streams that require some initial configuration request. """ - def __init__(self, queue: asyncio.Queue, initial_request=None): + def __init__( + self, + queue: asyncio.Queue, + initial_request: Optional[ + Union[ProtobufMessage, Callable[[], ProtobufMessage]] + ] = None, + ) -> None: self._queue = queue self._initial_request = initial_request - self.call = None - - def _is_active(self): - """ - Returns true if the call is not set or not completed. - """ + self.call: Optional[aio.Call] = None + + def _is_active(self) -> bool: + """Returns true if the call is not set or not completed.""" + # Note: there is a possibility that this starts *before* the call + # property is set. So we have to check if self.call is set before + # seeing if it's active. We need to return True if self.call is None. + # See https://github.com/googleapis/python-api-core/issues/560. return self.call is None or not self.call.done() async def __aiter__(self): @@ -133,24 +146,24 @@ class AsyncBidiRpc(BidiRpcBase): await rpc.send(example_pb2.StreamingRpcRequest( data='example')) - This does *not* retry the stream on errors. See :class:`AsyncResumableBidiRpc`. + This does *not* retry the stream on errors. Args: start_rpc (grpc.aio.StreamStreamMultiCallable): The gRPC method used to start the RPC. - initial_request (Union[protobuf.Message, - Callable[[], protobuf.Message]]): The initial request to + initial_request (Union[ProtobufMessage, + Callable[[], ProtobufMessage]]): The initial request to yield. This is useful if an initial request is needed to start the stream. metadata (Sequence[Tuple(str, str)]): RPC metadata to include in the request. """ - def _create_queue(self): + def _create_queue(self) -> asyncio.Queue: """Create a queue for requests.""" return asyncio.Queue() - async def open(self): + async def open(self) -> None: """Opens the stream.""" if self.is_active: raise ValueError("Cannot open an already open stream.") @@ -176,7 +189,7 @@ async def open(self): self._request_generator = request_generator self.call = call - async def close(self): + async def close(self) -> None: """Closes the stream.""" if self.call is None: return @@ -189,39 +202,37 @@ async def close(self): # Don't set self.call to None. Keep it around so that send/recv can # raise the error. - async def send(self, request): + async def send(self, request: ProtobufMessage) -> None: """Queue a message to be sent on the stream. If the underlying RPC has been closed, this will raise. Args: - request (protobuf.Message): The request to send. + request (ProtobufMessage): The request to send. """ if self.call is None: - raise ValueError("Can not send() on an RPC that has never been opened.") + raise ValueError("Cannot send on an RPC that has never been opened.") - # Don't use self.is_active(), as ResumableBidiRpc will overload it - # to mean something semantically different. if not self.call.done(): await self._request_queue.put(request) else: # calling read should cause the call to raise. await self.call.read() - async def recv(self): + async def recv(self) -> ProtobufMessage: """Wait for a message to be returned from the stream. If the underlying RPC has been closed, this will raise. Returns: - protobuf.Message: The received message. + ProtobufMessage: The received message. """ if self.call is None: - raise ValueError("Can not recv() on an RPC that has never been opened.") + raise ValueError("Cannot recv on an RPC that has never been opened.") return await self.call.read() @property - def is_active(self): - """bool: True if this stream is currently open and active.""" + def is_active(self) -> bool: + """Whether the stream is currently open and active.""" return self.call is not None and not self.call.done() diff --git a/google/api_core/bidi_base.py b/google/api_core/bidi_base.py index 195e3575..bae9a1c6 100644 --- a/google/api_core/bidi_base.py +++ b/google/api_core/bidi_base.py @@ -71,10 +71,10 @@ def _on_call_done(self, future): @property def is_active(self): - """bool: True if this stream is currently open and active.""" + """True if the gRPC call is not done yet.""" raise NotImplementedError("`is_active` is not implemented.") @property def pending_requests(self): - """int: Returns an estimate of the number of queued requests.""" + """Estimate of the number of queued requests.""" return self._request_queue.qsize() diff --git a/tests/asyncio/test_bidi_async.py b/tests/asyncio/test_bidi_async.py index 9625c806..bd18fa6e 100644 --- a/tests/asyncio/test_bidi_async.py +++ b/tests/asyncio/test_bidi_async.py @@ -33,7 +33,7 @@ from google.api_core import bidi_async from google.api_core import exceptions - +# TODO: remove this when droppping support for "Python 3.10" and below. if sys.version_info < (3, 10): # type: ignore[operator] def aiter(obj): @@ -290,8 +290,6 @@ async def test_send_dead_rpc(self): with pytest.raises(ValueError) as exc_info: await bidi_rpc.send(mock.sentinel.request) - assert exc_info.value == error - @pytest.mark.asyncio async def test_recv(self): bidi_rpc = bidi_async.AsyncBidiRpc(None) From 7475d99e44922774034daa2b6cb6ef49c37e663d Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Wed, 1 Oct 2025 13:51:08 +0000 Subject: [PATCH 22/28] remove unused variable to fix lint --- tests/asyncio/test_bidi_async.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/asyncio/test_bidi_async.py b/tests/asyncio/test_bidi_async.py index bd18fa6e..b0547702 100644 --- a/tests/asyncio/test_bidi_async.py +++ b/tests/asyncio/test_bidi_async.py @@ -287,7 +287,7 @@ async def test_send_dead_rpc(self): bidi_rpc = bidi_async.AsyncBidiRpc(None) bidi_rpc.call = AsyncClosedCall(error) - with pytest.raises(ValueError) as exc_info: + with pytest.raises(ValueError): await bidi_rpc.send(mock.sentinel.request) @pytest.mark.asyncio From 8bf8396ab3739e34073e8aa19739e3f103b4219a Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Wed, 8 Oct 2025 16:05:58 +0000 Subject: [PATCH 23/28] fix & imporove docstrings & comments. Reduce test timeout duration --- google/api_core/bidi.py | 47 +++++++++++++++++--------------- google/api_core/bidi_async.py | 4 +++ google/api_core/bidi_base.py | 16 ++++++++--- tests/asyncio/test_bidi_async.py | 17 +++++------- 4 files changed, 48 insertions(+), 36 deletions(-) diff --git a/google/api_core/bidi.py b/google/api_core/bidi.py index c91884cb..7cbaf220 100644 --- a/google/api_core/bidi.py +++ b/google/api_core/bidi.py @@ -28,28 +28,6 @@ _BIDIRECTIONAL_CONSUMER_NAME = "Thread-ConsumeBidirectionalStream" -# The reason this is necessary is because gRPC takes an iterator as the -# request for request-streaming RPCs. gRPC consumes this iterator in another -# thread to allow it to block while generating requests for the stream. -# However, if the generator blocks indefinitely gRPC will not be able to -# clean up the thread as it'll be blocked on `next(iterator)` and not be able -# to check the channel status to stop iterating. This helper mitigates that -# by waiting on the queue with a timeout and checking the RPC state before -# yielding. -# -# Finally, it allows for retrying without swapping queues because if it does -# pull an item off the queue when the RPC is inactive, it'll immediately put -# it back and then exit. This is necessary because yielding the item in this -# case will cause gRPC to discard it. In practice, this means that the order -# of messages is not guaranteed. If such a thing is necessary it would be -# easy to use a priority queue. -# -# Note that it is possible to accomplish this behavior without "spinning" -# (using a queue timeout). One possible way would be to use more threads to -# multiplex the grpc end event with the queue, another possible way is to -# use selectors and a custom event/queue object. Both of these approaches -# are significant from an engineering perspective for small benefit - the -# CPU consumed by spinning is pretty minuscule. class _RequestQueueGenerator(object): """A helper for sending requests to a gRPC stream from a Queue. @@ -98,6 +76,31 @@ def _is_active(self): return self.call is None or self.call.is_active() def __iter__(self): + # The reason this is necessary is because gRPC takes an iterator as the + # request for request-streaming RPCs. gRPC consumes this iterator in + # another thread to allow it to block while generating requests for + # the stream. However, if the generator blocks indefinitely gRPC will + # not be able to clean up the thread as it'll be blocked on + # `next(iterator)` and not be able to check the channel status to stop + # iterating. This helper mitigates that by waiting on the queue with + # a timeout and checking the RPC state before yielding. + # + # Finally, it allows for retrying without swapping queues because if + # it does pull an item off the queue when the RPC is inactive, it'll + # immediately put it back and then exit. This is necessary because + # yielding the item in this case will cause gRPC to discard it. In + # practice, this means that the order of messages is not guaranteed. + # If such a thing is necessary it would be easy to use a priority + # queue. + # + # Note that it is possible to accomplish this behavior without + # "spinning" (using a queue timeout). One possible way would be to use + # more threads to multiplex the grpc end event with the queue, another + # possible way is to use selectors and a custom event/queue object. + # Both of these approaches are significant from an engineering + # perspective for small benefit - the CPU consumed by spinning is + # pretty minuscule. + if self._initial_request is not None: if callable(self._initial_request): yield self._initial_request() diff --git a/google/api_core/bidi_async.py b/google/api_core/bidi_async.py index 674ea1e0..c65b69f6 100644 --- a/google/api_core/bidi_async.py +++ b/google/api_core/bidi_async.py @@ -146,6 +146,8 @@ class AsyncBidiRpc(BidiRpcBase): await rpc.send(example_pb2.StreamingRpcRequest( data='example')) + await rpc.close() + This does *not* retry the stream on errors. Args: @@ -181,6 +183,8 @@ async def open(self) -> None: request_generator.call = call + # TODO: api_core should expose the future interface for wrapped + # callables as well. if hasattr(call, "_wrapped"): # pragma: NO COVER call._wrapped.add_done_callback(self._on_call_done) else: diff --git a/google/api_core/bidi_base.py b/google/api_core/bidi_base.py index bae9a1c6..9288fda4 100644 --- a/google/api_core/bidi_base.py +++ b/google/api_core/bidi_base.py @@ -55,9 +55,13 @@ def add_done_callback(self, callback): This occurs when the RPC errors or is successfully terminated. Args: - callback (Callable[[grpc.Future], None]): The callback to execute. - It will be provided with the same gRPC future as the underlying - stream which will also be a :class:`grpc.aio.Call`. + callback (Union[Callable[[grpc.Future], None], Callable[[Any], None]]): + The callback to execute after gRPC call completed (success or + failure). + + For sync streaming gRPC: Callable[[grpc.Future], None] + + For async streaming gRPC: Callable[[Any], None] """ self._callbacks.append(callback) @@ -65,7 +69,11 @@ def _on_call_done(self, future): # This occurs when the RPC errors or is successfully terminated. # Note that grpc's "future" here can also be a grpc.RpcError. # See note in https://github.com/grpc/grpc/issues/10885#issuecomment-302651331 - # that `grpc.RpcError` is also `grpc.aio.Call`. + # that `grpc.RpcError` is also `grpc.Call`. + # for asynchronous gRPC call it would be `grpc.aio.AioRpcError` + + # Note: sync callbacks can be limiting for async code, because you can't + # await anything in a sync callback. for callback in self._callbacks: callback(future) diff --git a/tests/asyncio/test_bidi_async.py b/tests/asyncio/test_bidi_async.py index b0547702..9e00cc61 100644 --- a/tests/asyncio/test_bidi_async.py +++ b/tests/asyncio/test_bidi_async.py @@ -67,7 +67,7 @@ async def test_bounded_consume(self): call.done.return_value = True with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for(anext(gen_aiter), timeout=1) + await asyncio.wait_for(anext(gen_aiter), timeout=0.01) assert items == [mock.sentinel.A, mock.sentinel.B] @@ -105,12 +105,10 @@ async def test_exit_when_inactive_with_item(self): generator = bidi_async._AsyncRequestQueueGenerator(q) generator.call = call - with pytest.raises(StopAsyncIteration) as exc_info: + with pytest.raises( + StopAsyncIteration, + ): assert await anext(aiter(generator)) - assert ( - exc_info.value.args[0] - == "Inactive call, replacing item on queue and exiting request generator." - ) # Make sure it put the item back. assert not q.empty() @@ -125,7 +123,7 @@ async def test_exit_when_inactive_empty(self): generator.call = call with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for(anext(aiter(generator)), timeout=1) + await asyncio.wait_for(anext(aiter(generator)), timeout=.01) async def test_exit_with_stop(self): q = asyncio.Queue() @@ -136,9 +134,8 @@ async def test_exit_with_stop(self): generator = bidi_async._AsyncRequestQueueGenerator(q) generator.call = call - with pytest.raises(StopAsyncIteration) as exc_info: + with pytest.raises(StopAsyncIteration): assert await anext(aiter(generator)) - assert exc_info.value.args[0] == "Cleanly exiting request generator." def make_async_rpc(): @@ -190,7 +187,7 @@ def test_done_callbacks(self): @pytest.mark.asyncio @pytest.mark.skipif( sys.version_info < (3, 8), # type: ignore[operator] - reason="Python 3.8 below doesnt provide support for assert_awaited_once", + reason="Versions of Python below 3.8 don't provide support for assert_awaited_once", ) async def test_metadata(self): rpc, call = make_async_rpc() From 827d3c1b05028bfaa621b0801c1fdb1205467fe8 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Wed, 8 Oct 2025 16:13:02 +0000 Subject: [PATCH 24/28] write floating point numbers are per pylint --- tests/asyncio/test_bidi_async.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/asyncio/test_bidi_async.py b/tests/asyncio/test_bidi_async.py index 9e00cc61..696113db 100644 --- a/tests/asyncio/test_bidi_async.py +++ b/tests/asyncio/test_bidi_async.py @@ -123,7 +123,7 @@ async def test_exit_when_inactive_empty(self): generator.call = call with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for(anext(aiter(generator)), timeout=.01) + await asyncio.wait_for(anext(aiter(generator)), timeout=0.01) async def test_exit_with_stop(self): q = asyncio.Queue() From 69f061b16fa503de568a0e82038b6ef6bad7c572 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Thu, 9 Oct 2025 09:20:30 +0000 Subject: [PATCH 25/28] correct exception type in _async files --- google/api_core/bidi_async.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/google/api_core/bidi_async.py b/google/api_core/bidi_async.py index c65b69f6..2fe22d01 100644 --- a/google/api_core/bidi_async.py +++ b/google/api_core/bidi_async.py @@ -176,8 +176,9 @@ async def open(self) -> None: try: call = await self._start_rpc(request_generator, metadata=self._rpc_metadata) except exceptions.GoogleAPICallError as exc: - # The original `grpc.RpcError` (which is usually also a `grpc.Call`) is - # available from the ``response`` property on the mapped exception. + # The original `grpc.aio.AioRpcError` (which is usually also a + # `grpc.aio.Call`) is available from the ``response`` property on + # the mapped exception. self._on_call_done(exc.response) raise From d528dd0f1f56a35bd81016710e2f45d572a4468e Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Thu, 9 Oct 2025 10:18:39 +0000 Subject: [PATCH 26/28] move comments from class level to method level --- google/api_core/bidi_async.py | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/google/api_core/bidi_async.py b/google/api_core/bidi_async.py index 2fe22d01..c4bdc629 100644 --- a/google/api_core/bidi_async.py +++ b/google/api_core/bidi_async.py @@ -29,19 +29,7 @@ _LOGGER = logging.getLogger(__name__) -# The reason this is necessary is because it lets the user have control on -# when they would want to send requests proto messages instead of sending all -# of them initially. -# -# This is achieved via asynchronous queue (asyncio.Queue), -# gRPC awaits until there's a message in the queue. -# -# Finally, it allows for retrying without swapping queues because if it does -# pull an item off the queue when the RPC is inactive, it'll immediately put -# it back and then exit. This is necessary because yielding the item in this -# case will cause gRPC to discard it. In practice, this means that the order -# of messages is not guaranteed. If preserving order is necessary it would be -# easy to use a priority queue. + class _AsyncRequestQueueGenerator: """_AsyncRequestQueueGenerator is a helper class for sending asynchronous requests to a gRPC stream from a Queue. @@ -92,6 +80,20 @@ def _is_active(self) -> bool: return self.call is None or not self.call.done() async def __aiter__(self): + # The reason this is necessary is because it lets the user have + # control on when they would want to send requests proto messages + # instead of sending all of them initially. + # + # This is achieved via asynchronous queue (asyncio.Queue), + # gRPC awaits until there's a message in the queue. + # + # Finally, it allows for retrying without swapping queues because if + # it does pull an item off the queue when the RPC is inactive, it'll + # immediately put it back and then exit. This is necessary because + # yielding the item in this case will cause gRPC to discard it. In + # practice, this means that the order of messages is not guaranteed. + # If preserving order is necessary it would be easy to use a priority + # queue. if self._initial_request is not None: if callable(self._initial_request): yield self._initial_request() From 4ee982654014107cc844d8745d4484ef7a8777ab Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Thu, 9 Oct 2025 10:22:36 +0000 Subject: [PATCH 27/28] add .close() in Example code --- google/api_core/bidi.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/google/api_core/bidi.py b/google/api_core/bidi.py index 7cbaf220..0ad67aba 100644 --- a/google/api_core/bidi.py +++ b/google/api_core/bidi.py @@ -232,6 +232,8 @@ class BidiRpc(BidiRpcBase): rpc.send(example_pb2.StreamingRpcRequest( data='example')) + rpc.close() + This does *not* retry the stream on errors. See :class:`ResumableBidiRpc`. Args: From 5f3e5ba277ac11250dbbee296363431b74a28bc9 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Thu, 9 Oct 2025 10:43:15 +0000 Subject: [PATCH 28/28] throw better error msg when stream is not opened --- google/api_core/bidi.py | 4 ++-- google/api_core/bidi_async.py | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/google/api_core/bidi.py b/google/api_core/bidi.py index 0ad67aba..270ad091 100644 --- a/google/api_core/bidi.py +++ b/google/api_core/bidi.py @@ -303,7 +303,7 @@ def send(self, request): request (protobuf.Message): The request to send. """ if self.call is None: - raise ValueError("Cannot send on an RPC that has never been opened.") + raise ValueError("Cannot send on an RPC stream that has never been opened.") # Don't use self.is_active(), as ResumableBidiRpc will overload it # to mean something semantically different. @@ -324,7 +324,7 @@ def recv(self): protobuf.Message: The received message. """ if self.call is None: - raise ValueError("Cannot recv on an RPC that has never been opened.") + raise ValueError("Cannot recv on an RPC stream that has never been opened.") return next(self.call) diff --git a/google/api_core/bidi_async.py b/google/api_core/bidi_async.py index c4bdc629..d73b4c98 100644 --- a/google/api_core/bidi_async.py +++ b/google/api_core/bidi_async.py @@ -29,7 +29,6 @@ _LOGGER = logging.getLogger(__name__) - class _AsyncRequestQueueGenerator: """_AsyncRequestQueueGenerator is a helper class for sending asynchronous requests to a gRPC stream from a Queue. @@ -218,7 +217,7 @@ async def send(self, request: ProtobufMessage) -> None: request (ProtobufMessage): The request to send. """ if self.call is None: - raise ValueError("Cannot send on an RPC that has never been opened.") + raise ValueError("Cannot send on an RPC stream that has never been opened.") if not self.call.done(): await self._request_queue.put(request) @@ -235,7 +234,7 @@ async def recv(self) -> ProtobufMessage: ProtobufMessage: The received message. """ if self.call is None: - raise ValueError("Cannot recv on an RPC that has never been opened.") + raise ValueError("Cannot recv on an RPC stream that has never been opened.") return await self.call.read()