faust
faust copied to clipboard
'Cannot find stack of coroutine' on wait_empty
Checklist
- [x] I have included information about relevant versions
- [ ] I have verified that the issue persists when using the
master
branch of Faust.
Steps to reproduce
I struggle to reproduce this consistently, as I suspect it is the result of a very tight race condition. I can however say that it tends to occur around rebalances, and generally with agents which return naturally.
The problem stems from the wait_empty
method on a consumer
, which waits for streams to be consumed before exiting (assuming stream_wait_empty
is not overriden).
This method logs human tracebacks of the agents, which in turn relies on mode
to produce those tracebacks - raising the error here.
This error is raised if the agent coroutine has coro.cr_frame == None
, which is the valid state for a closed coroutine (i.e. one which has finished processing and returned). This could presumably be raised for agen.ag_frame
etc too.
I think its valid that mode
raises here (although a custom exception would be useful), because the traceback cannot be found.
My current workaround for this is as follows:
from typing import List
from faust import App
from faust.agents import Agent
class CustomAgent(Agent):
def actor_tracebacks(self) -> List[str]:
tracebacks: List[str] = []
for actor in self._actors:
try:
tracebacks.append(actor.traceback())
except RuntimeError as exc:
if "cannot find stack of coroutine" in str(exc):
tracebacks.append(f"Could not find stack of coroutine for actor: {actor}")
continue
raise exc
return tracebacks
app = App(
...,
Agent=CustomAgent,
...,
)
Expected behavior
No exception to be raised when collecting actor traces for the purpose of logging.
Actual behavior
A RuntimeError("cannot find stack of coroutine")
is raised.
Full traceback
Error trace
File \"/usr/local/lib/python3.8/site-packages/mode/utils/tracebacks.py\", line 52, in print_coro_stack
tb = Traceback.from_coroutine(coro, limit=limit)
| | | -> 125
| | -> <coroutine object my_agent at 0x7f6d09d361c0>
| -> <classmethod object at 0x7f6d0d48ce20>
-> <class 'mode.utils.tracebacks.Traceback'>
File \"/usr/local/lib/python3.8/site-packages/mode/utils/tracebacks.py\", line 231, in from_coroutine
raise RuntimeError('cannot find stack of coroutine')
RuntimeError: cannot find stack of coroutine
Versions
- Python version: 3.8
- Faust version: 1.10.4
- Operating system: Ubuntu 20.10
- Kafka version:
confluentinc/cp-kafka:5.5.1
- RocksDB version (if applicable): N/A
I'm not entirely sure what the correct fix is here - possibly it could just be clarifying that agents which return (e.g. takewhile
) are not supported.
Primarily I've posted this here to share the workaround, as I've not found any reference to this exception in the project's issues (either here or on the original repository).
Is this still a problem with the latest release? There was an update to aiokafka that might have addressed this.
@jacksmith15 @taybin anyone figure this one out?
I also faced this problem. Has anyone found a solution?