|
26 | 26 | class _AsyncRequestQueueGenerator: |
27 | 27 | """An async helper for sending requests to a gRPC stream from a Queue. |
28 | 28 |
|
29 | | - This generator takes requests off a given queue and yields them to gRPC. |
30 | | -
|
31 | | - This helper is useful when you have an indeterminate, indefinite, or |
32 | | - otherwise open-ended set of requests to send through a request-streaming |
33 | | - (or bidirectional) RPC. |
34 | | -
|
35 | | - The reason this is necessary is because gRPC takes an async iterator as the |
36 | | - request for request-streaming RPCs. gRPC consumes this iterator to allow |
37 | | - it to block while generating requests for the stream. However, if the |
38 | | - generator blocks indefinitely gRPC will not be able to clean up the task |
39 | | - as it'll be blocked on `anext(iterator)` and not be able to check the |
40 | | - channel status to stop iterating. This helper mitigates that by waiting |
41 | | - on the queue with a timeout and checking the RPC state before yielding. |
42 | | -
|
43 | | - Finally, it allows for retrying without swapping queues because if it does |
44 | | - pull an item off the queue when the RPC is inactive, it'll immediately put |
45 | | - it back and then exit. This is necessary because yielding the item in this |
46 | | - case will cause gRPC to discard it. In practice, this means that the order |
| 29 | + This generator takes requests off a given queue and yields them to gRPC. |
| 30 | +
|
| 31 | + This helper is useful when you have an indeterminate, indefinite, or |
| 32 | + otherwise open-ended set of requests to send through a request-streaming |
| 33 | + (or bidirectional) RPC. |
| 34 | +
|
| 35 | + The reason this is necessary |
| 36 | +
|
| 37 | + is because it's let's user have control on the when they would want to |
| 38 | + send requests proto messages instead of sending all of them initilally. |
| 39 | +
|
| 40 | + This is achieved via asynchronous queue (asyncio.Queue), |
| 41 | + gRPC awaits until there's a message in the queue. |
| 42 | +
|
| 43 | + Finally, it allows for retrying without swapping queues because if it does |
| 44 | + pull an item off the queue when the RPC is inactive, it'll immediately put |
| 45 | + it back and then exit. This is necessary because yielding the item in this |
| 46 | + case will cause gRPC to discard it. In practice, this means that the order |
47 | 47 | of messages is not guaranteed. If such a thing is necessary it would be |
48 | | - easy to use a priority queue. |
| 48 | + easy to use a priority queue. |
49 | 49 |
|
50 | | - Example:: |
| 50 | + Example:: |
51 | 51 |
|
52 | | - requests = _AsyncRequestQueueGenerator(q) |
53 | | - call = await stub.StreamingRequest(requests) |
54 | | - requests.call = call |
| 52 | + requests = _AsyncRequestQueueGenerator(q) |
| 53 | + call = await stub.StreamingRequest(requests) |
| 54 | + requests.call = call |
55 | 55 |
|
56 | | - async for response in call: |
57 | | - print(response) |
58 | | - await q.put(...) |
| 56 | + async for response in call: |
| 57 | + print(response) |
| 58 | + await q.put(...) |
59 | 59 |
|
60 | | - Args: |
61 | | - queue (asyncio.Queue): The request queue. |
62 | | - initial_request (Union[protobuf.Message, |
63 | | - Callable[[], protobuf.Message]]): The initial request to |
64 | | - yield. This is done independently of the request queue to allow for |
65 | | - easily restarting streams that require some initial configuration |
66 | | - request. |
| 60 | + Args: |
| 61 | + queue (asyncio.Queue): The request queue. |
| 62 | + initial_request (Union[protobuf.Message, |
| 63 | + Callable[[], protobuf.Message]]): The initial request to |
| 64 | + yield. This is done independently of the request queue to allow for |
| 65 | + easily restarting streams that require some initial configuration |
| 66 | + request. |
67 | 67 | """ |
68 | 68 |
|
69 | 69 | def __init__(self, queue: asyncio.Queue, initial_request=None): |
|
0 commit comments