faststream icon indicating copy to clipboard operation
faststream copied to clipboard

Bug: FASTSTREAM_SUPERVISOR_DISABLED=0 causing infinite loop when nats service unavailable

Open swelborn opened this issue 1 month ago • 3 comments

Describe the bug When I have a subscription to a nats stream and the service goes down, we get an infinite loop similar to the following:

nats.js.errors.ServiceUnavailableError: nats: ServiceUnavailableError: code=None err_code=None description='None'
2025-10-31T18:15:28.385+00:00 - interactem.core.nats.broker - INFO - callback for Task-241369 is being executed...
2025-10-31T18:15:28.388+00:00 - interactem.core.nats.broker - ERROR - Task-241369 raised an exception, retrying...
If this behavior causes issues, you can disable it via setting the FASTSTREAM_SUPERVISOR_DISABLED env to 1. Also, please consider opening issue on the repository: https://github.com/ag2ai/faststream.
Traceback (most recent call last):
  File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/faststream/nats/subscriber/usecases/stream_pull_subscriber.py", line 80, in _consume_pull
    messages = await self.subscription.fetch(
  File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/nats/js/client.py", line 1098, in fetch
    msg = await self._fetch_one(expires, timeout, heartbeat)
  File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/nats/js/client.py", line 1164, in _fetch_one
    raise nats.js.errors.APIError.from_msg(msg)
  File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/nats/js/errors.py", line 74, in from_msg
    raise ServiceUnavailableError
nats.js.errors.ServiceUnavailableError: nats: ServiceUnavailableError: code=None err_code=None description='None'
2025-10-31T18:15:28.389+00:00 - interactem.core.nats.broker - INFO - callback for Task-241372 is being executed...
2025-10-31T18:15:28.389+00:00 - interactem.core.nats.broker - ERROR - Task-241372 raised an exception, retrying...
If this behavior causes issues, you can disable it via setting the FASTSTREAM_SUPERVISOR_DISABLED env to 1. Also, please consider opening issue on the repository: https://github.com/ag2ai/faststream.
Traceback (most recent call last):
  File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/faststream/nats/subscriber/usecases/stream_pull_subscriber.py", line 80, in _consume_pull
    messages = await self.subscription.fetch(
  File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/nats/js/client.py", line 1098, in fetch
    msg = await self._fetch_one(expires, timeout, heartbeat)
  File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/nats/js/client.py", line 1164, in _fetch_one
    raise nats.js.errors.APIError.from_msg(msg)
  File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/nats/js/errors.py", line 74, in from_msg
    raise ServiceUnavailableError
nats.js.errors.ServiceUnavailableError: nats: ServiceUnavailableError: code=None err_code=None description='None'
2025-10-31T18:15:28.390+00:00 - interactem.agent.broker - ERROR - Unexpected error: unhandled errors in a TaskGroup (1 sub-exception)
  + Exception Group Traceback (most recent call last):
  |   File "/Users/swelborn/gits/interactEM/backend/agent/interactem/agent/entrypoint.py", line 8, in main
  |     await app.run()
  |   File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/faststream/app.py", line 100, in run
  |     raise ex from None
  |   File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/faststream/app.py", line 96, in run
  |     await self._shutdown(log_level)
  |   File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/faststream/_internal/application.py", line 250, in _shutdown
  |     await self.stop()
  |   File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/faststream/_internal/application.py", line 256, in stop
  |     async with self._shutdown_hooks_context():
  |   File "/Users/swelborn/.local/share/uv/python/cpython-3.10.16-macos-aarch64-none/lib/python3.10/contextlib.py", line 199, in __aenter__
  |     return await anext(self.gen)
  |   File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/faststream/_internal/application.py", line 263, in _shutdown_hooks_context
  |     await func()
  |   File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/fast_depends/use.py", line 170, in injected_wrapper
  |     return await real_model.asolve(  # type: ignore[no-any-return]
  |   File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/fast_depends/core/model.py", line 339, in asolve
  |     response = await run_async(self.call, *final_args, **final_kwargs)
  |   File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/fast_depends/utils.py", line 48, in run_async
  |     return await cast(Callable[P, Awaitable[T]], func)(*args, **kwargs)
  |   File "/Users/swelborn/gits/interactEM/backend/agent/interactem/agent/broker.py", line 38, in on_shutdown
  |     await agent.shutdown()
  |   File "/Users/swelborn/gits/interactEM/backend/agent/interactem/agent/agent.py", line 345, in shutdown
  |     await self.agent_kv.stop()
  |   File "/Users/swelborn/gits/interactEM/backend/core/interactem/core/nats/kv.py", line 272, in stop
  |     await self._main_task
  |   File "/Users/swelborn/gits/interactEM/backend/core/interactem/core/nats/kv.py", line 125, in _loop
  |     await self._update_values()
  |   File "/Users/swelborn/gits/interactEM/backend/core/interactem/core/nats/kv.py", line 162, in _update_values
  |     async with anyio.create_task_group() as tg:
  |   File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 781, in __aexit__
  |     raise BaseExceptionGroup(
  | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/nats/js/client.py", line 201, in publish
    |     msg = await self._nc.request(
    |   File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/nats/aio/client.py", line 1066, in request
    |     raise errors.NoRespondersError
    | nats.errors.NoRespondersError: nats: no responders available for request
    | 
    | During handling of the above exception, another exception occurred:
    | 
    | Traceback (most recent call last):
    |   File "/Users/swelborn/gits/interactEM/backend/core/interactem/core/nats/kv.py", line 174, in _safe_put_value
    |     await self._bucket.put(key_str, validated.model_dump_json().encode())
    |   File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/nats/js/kv.py", line 209, in put
    |     pa = await self._js.publish(f"{self._pre}{key}", value)
    |   File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/nats/js/client.py", line 208, in publish
    |     raise nats.js.errors.NoStreamResponseError
    | nats.js.errors.NoStreamResponseError: nats: no response from stream
    +------------------------------------

How to reproduce

I can develop a reproducer if you need, but before I do I would like to understand the intended behavior of this supervisor. It could be beneficial, but logs will become huge if this yells about this type of exception in a tight loop...

to give you an idea, this happens in my code here: https://github.com/NERSC/interactEM/blob/78b4f07747440d51064f0b204ebddf779ca71a39/backend/orchestrator/interactem/orchestrator/app.py

swelborn avatar Oct 31 '25 18:10 swelborn

Hi @swelborn , this is normal behavior. There are some endless asyncio.Tasks that might fail. The supervisor is implemented via a task callback, which prints the error trace and restarts the task. You can disable this behavior by setting FASTSTREAM_SUPERVISOR_DISABLED=1.

fil1n avatar Nov 01 '25 14:11 fil1n

Hey @fil1n, thanks for getting back. That makes sense, but wondering if there is some way to suppress these exceptions if they are endless.

I still want the behavior of trying to reconnect, but want to avoid the endless logging while the nats service is down or undergoing a leader election.

swelborn avatar Nov 02 '25 19:11 swelborn

At the moment avoiding endless logging without disabling the supervisor is impossible. I will submit PR to add this option this week or next.

fil1n avatar Nov 03 '25 10:11 fil1n