AsyncChannel produces unexpected exception on cancel
Hey there,
I'm using the AsyncChannel to send messages to a GRPC server. When receiving a grpc error the client prints some non-catched errors of the sending task in the console (related to: https://github.com/danielgtaylor/python-betterproto/issues/188).
future: <Task finished name='Task-2' coro=<ServiceStub._send_messages() done, defined at /home/PycharmProjects/Worker/venv/lib/python3.8/site-packages/betterproto/grpc/grpclib_client.py:163> exception=ValueError('task_done() called too many times')>
Traceback (most recent call last):
File "/home/PycharmProjects/Worker/venv/lib/python3.8/site-packages/betterproto/grpc/util/async_channel.py", line 87, in __anext__
result = await self._queue.get()
File "/usr/lib/python3.8/asyncio/queues.py", line 163, in get
await getter
asyncio.exceptions.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/PycharmProjects/Worker/venv/lib/python3.8/site-packages/betterproto/grpc/grpclib_client.py", line 166, in _send_messages
async for message in messages:
File "/home/PycharmProjects/Worker/venv/lib/python3.8/site-packages/betterproto/grpc/util/async_channel.py", line 93, in __anext__
self._queue.task_done()
File "/usr/lib/python3.8/asyncio/queues.py", line 206, in task_done
raise ValueError('task_done() called too many times')
ValueError: task_done() called too many times
When the server sends a grpc-error the error is handled in this except-block, which cancels the sending task: https://github.com/danielgtaylor/python-betterproto/blob/master/src/betterproto/grpc/grpclib_client.py#L159-L161
This cancel-method results in throwing a CancelledError inside the current await of the sending task, which is the get-method on the queue of the AsyncChannel:
https://github.com/danielgtaylor/python-betterproto/blob/master/src/betterproto/grpc/util/async_channel.py#L87
In this case the get-method is raising a CancelledError even if the queue is empty. If this happens the task_done-method is called on an empty queue, and the above error message is shown.
A quick fix would be the following, however I haven't checked if this fix has any side effects:
try:
...
finally:
self._waiting_receivers -= 1
if not self._queue.empty():
self._queue.task_done()
Thanks for the detailed report.
So the problem is that we call task_done based on the assumption that we got an item from the queue, but there might not have even been an item on the queue to get in the scenario you describe?
I haven't got time to dive into this properly right now, but off the top of my head I'm wondering does this mean there are probably two problems to solve? :
- (L93](https://github.com/danielgtaylor/python-betterproto/blob/master/src/betterproto/grpc/util/async_channel.py#L93) calls task done event
- that asyncio.exceptions.CancelledError isn't gracefully handled anywhere (should it be?)
I assume you tried your suggested fix and then it worked fine suggesting 2. isn't an issue?
At any rate, I'm not sure the solution you suggest is quite correct, because if my understanding of the error scenario is correct then the problem isn't that the queue is empty now because that is expected when you successfully get the last item from the queue, but rather that it might have never had anything added at all (or not since the last successful get)!
So maybe the correct solution is to refactor along the lines of:
self._waiting_receivers += 1
result = await self._queue.get()
self._waiting_receivers -= 1
self._queue.task_done()
if result is self.__flush:
return None
return result
though I'd have to verify that this doesn't break anything. Maybe it's better to instead explicitly check whether an item was got from the queue.
It would be great to have a PR with a test case for this if you're up for it?