faust icon indicating copy to clipboard operation
faust copied to clipboard

'Cannot find stack of coroutine' on wait_empty

Open jacksmith15 opened this issue 3 years ago • 4 comments

Checklist

  • [x] I have included information about relevant versions
  • [ ] I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

I struggle to reproduce this consistently, as I suspect it is the result of a very tight race condition. I can however say that it tends to occur around rebalances, and generally with agents which return naturally.

The problem stems from the wait_empty method on a consumer, which waits for streams to be consumed before exiting (assuming stream_wait_empty is not overriden).

This method logs human tracebacks of the agents, which in turn relies on mode to produce those tracebacks - raising the error here.

This error is raised if the agent coroutine has coro.cr_frame == None, which is the valid state for a closed coroutine (i.e. one which has finished processing and returned). This could presumably be raised for agen.ag_frame etc too.

I think its valid that mode raises here (although a custom exception would be useful), because the traceback cannot be found.

My current workaround for this is as follows:

from typing import List

from faust import App
from faust.agents import Agent

class CustomAgent(Agent):
    def actor_tracebacks(self) -> List[str]:
        tracebacks: List[str] = []
        for actor in self._actors:
            try:
                tracebacks.append(actor.traceback())
            except RuntimeError as exc:
                if "cannot find stack of coroutine" in str(exc):
                    tracebacks.append(f"Could not find stack of coroutine for actor: {actor}")
                    continue
                raise exc
        return tracebacks


app = App(
    ...,
    Agent=CustomAgent,
    ...,
)

Expected behavior

No exception to be raised when collecting actor traces for the purpose of logging.

Actual behavior

A RuntimeError("cannot find stack of coroutine") is raised.

Full traceback

Error trace
  File \"/usr/local/lib/python3.8/site-packages/mode/utils/tracebacks.py\", line 52, in print_coro_stack
    tb = Traceback.from_coroutine(coro, limit=limit)
         |         |              |           -> 125
         |         |              -> <coroutine object my_agent at 0x7f6d09d361c0>
         |         -> <classmethod object at 0x7f6d0d48ce20>
         -> <class 'mode.utils.tracebacks.Traceback'>
  File \"/usr/local/lib/python3.8/site-packages/mode/utils/tracebacks.py\", line 231, in from_coroutine
    raise RuntimeError('cannot find stack of coroutine')

RuntimeError: cannot find stack of coroutine

Versions

  • Python version: 3.8
  • Faust version: 1.10.4
  • Operating system: Ubuntu 20.10
  • Kafka version: confluentinc/cp-kafka:5.5.1
  • RocksDB version (if applicable): N/A

jacksmith15 avatar Feb 16 '21 13:02 jacksmith15

I'm not entirely sure what the correct fix is here - possibly it could just be clarifying that agents which return (e.g. takewhile) are not supported.

Primarily I've posted this here to share the workaround, as I've not found any reference to this exception in the project's issues (either here or on the original repository).

jacksmith15 avatar Feb 16 '21 14:02 jacksmith15

Is this still a problem with the latest release? There was an update to aiokafka that might have addressed this.

taybin avatar Jun 08 '21 00:06 taybin

@jacksmith15 @taybin anyone figure this one out?

erikreppel avatar May 04 '23 02:05 erikreppel

I also faced this problem. Has anyone found a solution?

redb0 avatar Mar 04 '24 07:03 redb0