Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
d751f63
feat: support for async bidi streaming apis
chandra-siri Aug 14, 2025
fdcd903
fix linting
chandra-siri Aug 14, 2025
5835178
Add bidiBase, refactor bidiRpc and add AsyncBidi
chandra-siri Aug 27, 2025
df8fba5
remove changes from noxfile
chandra-siri Aug 29, 2025
19cf273
use queue.get() instead of queue.get_nowait()
chandra-siri Aug 30, 2025
5b6d287
use asyncio.wait_for instead of asyncio.timeout
chandra-siri Aug 30, 2025
2599f96
aiter() and anext() are not defined in python<=3.9
chandra-siri Aug 30, 2025
9c1a621
remove support for python 3.7 and below
chandra-siri Aug 30, 2025
cc19089
fix mypy errors
chandra-siri Aug 30, 2025
a86a9b1
fix typo, version -> version_info
chandra-siri Aug 30, 2025
5464dc5
AsyncMock not present in python3.7
chandra-siri Aug 30, 2025
e96a409
update doc strings
chandra-siri Aug 30, 2025
52686b8
use pytest.approx to round of floating point errors.
chandra-siri Sep 1, 2025
d01421b
add tolerance
chandra-siri Sep 1, 2025
98eae06
add test_open_error_call_error unit test
chandra-siri Sep 1, 2025
af2121c
Merge branch 'main' of https://github.com/googleapis/python-api-core …
chandra-siri Sep 8, 2025
5a7550f
minor changes based on PR comments
chandra-siri Sep 8, 2025
c486c49
formatting fixes
chandra-siri Sep 8, 2025
14342b1
remove changes from test_method.py
chandra-siri Sep 8, 2025
6fabb1f
fix typos in doc string and error msg
chandra-siri Sep 17, 2025
2df8cc4
minor fixes in comments & doc strings
chandra-siri Sep 18, 2025
b963922
address review comments. mostly fix doc-strings
chandra-siri Oct 1, 2025
470aeb4
Merge branch 'main' of https://github.com/googleapis/python-api-core …
chandra-siri Oct 1, 2025
7475d99
remove unused variable to fix lint
chandra-siri Oct 1, 2025
8bf8396
fix & imporove docstrings & comments. Reduce test timeout duration
chandra-siri Oct 8, 2025
827d3c1
write floating point numbers are per pylint
chandra-siri Oct 8, 2025
69f061b
correct exception type in _async files
chandra-siri Oct 9, 2025
d528dd0
move comments from class level to method level
chandra-siri Oct 9, 2025
4ee9826
add .close() in Example code
chandra-siri Oct 9, 2025
5f3e5ba
throw better error msg when stream is not opened
chandra-siri Oct 9, 2025
82cea4f
Merge branch 'main' into feat/834-bidi-async-support
chandra-siri Oct 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 39 additions & 63 deletions google/api_core/bidi.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""Bi-directional streaming RPC helpers."""
"""Helpers for synchronous bidirectional streaming RPCs."""

import collections
import datetime
Expand All @@ -22,6 +22,7 @@
import time

from google.api_core import exceptions
from google.api_core.bidi_base import BidiRpcBase

_LOGGER = logging.getLogger(__name__)
_BIDIRECTIONAL_CONSUMER_NAME = "Thread-ConsumeBidirectionalStream"
Expand All @@ -36,21 +37,6 @@ class _RequestQueueGenerator(object):
otherwise open-ended set of requests to send through a request-streaming
(or bidirectional) RPC.

The reason this is necessary is because gRPC takes an iterator as the
request for request-streaming RPCs. gRPC consumes this iterator in another
thread to allow it to block while generating requests for the stream.
However, if the generator blocks indefinitely gRPC will not be able to
clean up the thread as it'll be blocked on `next(iterator)` and not be able
to check the channel status to stop iterating. This helper mitigates that
by waiting on the queue with a timeout and checking the RPC state before
yielding.

Finally, it allows for retrying without swapping queues because if it does
pull an item off the queue when the RPC is inactive, it'll immediately put
it back and then exit. This is necessary because yielding the item in this
case will cause gRPC to discard it. In practice, this means that the order
of messages is not guaranteed. If such a thing is necessary it would be
easy to use a priority queue.

Example::

Expand All @@ -62,12 +48,6 @@ class _RequestQueueGenerator(object):
print(response)
q.put(...)

Note that it is possible to accomplish this behavior without "spinning"
(using a queue timeout). One possible way would be to use more threads to
multiplex the grpc end event with the queue, another possible way is to
use selectors and a custom event/queue object. Both of these approaches
are significant from an engineering perspective for small benefit - the
CPU consumed by spinning is pretty minuscule.

Args:
queue (queue_module.Queue): The request queue.
Expand Down Expand Up @@ -96,6 +76,31 @@ def _is_active(self):
return self.call is None or self.call.is_active()

def __iter__(self):
# The reason this is necessary is because gRPC takes an iterator as the
# request for request-streaming RPCs. gRPC consumes this iterator in
# another thread to allow it to block while generating requests for
# the stream. However, if the generator blocks indefinitely gRPC will
# not be able to clean up the thread as it'll be blocked on
# `next(iterator)` and not be able to check the channel status to stop
# iterating. This helper mitigates that by waiting on the queue with
# a timeout and checking the RPC state before yielding.
#
# Finally, it allows for retrying without swapping queues because if
# it does pull an item off the queue when the RPC is inactive, it'll
# immediately put it back and then exit. This is necessary because
# yielding the item in this case will cause gRPC to discard it. In
# practice, this means that the order of messages is not guaranteed.
# If such a thing is necessary it would be easy to use a priority
# queue.
#
# Note that it is possible to accomplish this behavior without
# "spinning" (using a queue timeout). One possible way would be to use
# more threads to multiplex the grpc end event with the queue, another
# possible way is to use selectors and a custom event/queue object.
# Both of these approaches are significant from an engineering
# perspective for small benefit - the CPU consumed by spinning is
# pretty minuscule.

if self._initial_request is not None:
if callable(self._initial_request):
yield self._initial_request()
Expand Down Expand Up @@ -201,7 +206,7 @@ def __repr__(self):
)


class BidiRpc(object):
class BidiRpc(BidiRpcBase):
"""A helper for consuming a bi-directional streaming RPC.

This maps gRPC's built-in interface which uses a request iterator and a
Expand All @@ -227,6 +232,8 @@ class BidiRpc(object):
rpc.send(example_pb2.StreamingRpcRequest(
data='example'))

rpc.close()

This does *not* retry the stream on errors. See :class:`ResumableBidiRpc`.

Args:
Expand All @@ -240,40 +247,14 @@ class BidiRpc(object):
the request.
"""

def __init__(self, start_rpc, initial_request=None, metadata=None):
self._start_rpc = start_rpc
self._initial_request = initial_request
self._rpc_metadata = metadata
self._request_queue = queue_module.Queue()
self._request_generator = None
self._is_active = False
self._callbacks = []
self.call = None

def add_done_callback(self, callback):
"""Adds a callback that will be called when the RPC terminates.

This occurs when the RPC errors or is successfully terminated.

Args:
callback (Callable[[grpc.Future], None]): The callback to execute.
It will be provided with the same gRPC future as the underlying
stream which will also be a :class:`grpc.Call`.
"""
self._callbacks.append(callback)

def _on_call_done(self, future):
# This occurs when the RPC errors or is successfully terminated.
# Note that grpc's "future" here can also be a grpc.RpcError.
# See note in https://github.com/grpc/grpc/issues/10885#issuecomment-302651331
# that `grpc.RpcError` is also `grpc.call`.
for callback in self._callbacks:
callback(future)
def _create_queue(self):
"""Create a queue for requests."""
return queue_module.Queue()

def open(self):
"""Opens the stream."""
if self.is_active:
raise ValueError("Can not open an already open stream.")
raise ValueError("Cannot open an already open stream.")

request_generator = _RequestQueueGenerator(
self._request_queue, initial_request=self._initial_request
Expand Down Expand Up @@ -322,7 +303,7 @@ def send(self, request):
request (protobuf.Message): The request to send.
"""
if self.call is None:
raise ValueError("Can not send() on an RPC that has never been open()ed.")
raise ValueError("Cannot send on an RPC stream that has never been opened.")

# Don't use self.is_active(), as ResumableBidiRpc will overload it
# to mean something semantically different.
Expand All @@ -343,20 +324,15 @@ def recv(self):
protobuf.Message: The received message.
"""
if self.call is None:
raise ValueError("Can not recv() on an RPC that has never been open()ed.")
raise ValueError("Cannot recv on an RPC stream that has never been opened.")

return next(self.call)

@property
def is_active(self):
"""bool: True if this stream is currently open and active."""
"""True if this stream is currently open and active."""
return self.call is not None and self.call.is_active()

@property
def pending_requests(self):
"""int: Returns an estimate of the number of queued requests."""
return self._request_queue.qsize()


def _never_terminate(future_or_error):
"""By default, no errors cause BiDi termination."""
Expand Down Expand Up @@ -544,7 +520,7 @@ def _send(self, request):
call = self.call

if call is None:
raise ValueError("Can not send() on an RPC that has never been open()ed.")
raise ValueError("Cannot send on an RPC that has never been opened.")

# Don't use self.is_active(), as ResumableBidiRpc will overload it
# to mean something semantically different.
Expand All @@ -563,7 +539,7 @@ def _recv(self):
call = self.call

if call is None:
raise ValueError("Can not recv() on an RPC that has never been open()ed.")
raise ValueError("Cannot recv on an RPC that has never been opened.")

return next(call)

Expand Down
Loading