Bug: FASTSTREAM_SUPERVISOR_DISABLED=0 causing infinite loop when nats service unavailable
Describe the bug When I have a subscription to a nats stream and the service goes down, we get an infinite loop similar to the following:
nats.js.errors.ServiceUnavailableError: nats: ServiceUnavailableError: code=None err_code=None description='None'
2025-10-31T18:15:28.385+00:00 - interactem.core.nats.broker - INFO - callback for Task-241369 is being executed...
2025-10-31T18:15:28.388+00:00 - interactem.core.nats.broker - ERROR - Task-241369 raised an exception, retrying...
If this behavior causes issues, you can disable it via setting the FASTSTREAM_SUPERVISOR_DISABLED env to 1. Also, please consider opening issue on the repository: https://github.com/ag2ai/faststream.
Traceback (most recent call last):
File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/faststream/nats/subscriber/usecases/stream_pull_subscriber.py", line 80, in _consume_pull
messages = await self.subscription.fetch(
File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/nats/js/client.py", line 1098, in fetch
msg = await self._fetch_one(expires, timeout, heartbeat)
File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/nats/js/client.py", line 1164, in _fetch_one
raise nats.js.errors.APIError.from_msg(msg)
File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/nats/js/errors.py", line 74, in from_msg
raise ServiceUnavailableError
nats.js.errors.ServiceUnavailableError: nats: ServiceUnavailableError: code=None err_code=None description='None'
2025-10-31T18:15:28.389+00:00 - interactem.core.nats.broker - INFO - callback for Task-241372 is being executed...
2025-10-31T18:15:28.389+00:00 - interactem.core.nats.broker - ERROR - Task-241372 raised an exception, retrying...
If this behavior causes issues, you can disable it via setting the FASTSTREAM_SUPERVISOR_DISABLED env to 1. Also, please consider opening issue on the repository: https://github.com/ag2ai/faststream.
Traceback (most recent call last):
File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/faststream/nats/subscriber/usecases/stream_pull_subscriber.py", line 80, in _consume_pull
messages = await self.subscription.fetch(
File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/nats/js/client.py", line 1098, in fetch
msg = await self._fetch_one(expires, timeout, heartbeat)
File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/nats/js/client.py", line 1164, in _fetch_one
raise nats.js.errors.APIError.from_msg(msg)
File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/nats/js/errors.py", line 74, in from_msg
raise ServiceUnavailableError
nats.js.errors.ServiceUnavailableError: nats: ServiceUnavailableError: code=None err_code=None description='None'
2025-10-31T18:15:28.390+00:00 - interactem.agent.broker - ERROR - Unexpected error: unhandled errors in a TaskGroup (1 sub-exception)
+ Exception Group Traceback (most recent call last):
| File "/Users/swelborn/gits/interactEM/backend/agent/interactem/agent/entrypoint.py", line 8, in main
| await app.run()
| File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/faststream/app.py", line 100, in run
| raise ex from None
| File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/faststream/app.py", line 96, in run
| await self._shutdown(log_level)
| File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/faststream/_internal/application.py", line 250, in _shutdown
| await self.stop()
| File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/faststream/_internal/application.py", line 256, in stop
| async with self._shutdown_hooks_context():
| File "/Users/swelborn/.local/share/uv/python/cpython-3.10.16-macos-aarch64-none/lib/python3.10/contextlib.py", line 199, in __aenter__
| return await anext(self.gen)
| File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/faststream/_internal/application.py", line 263, in _shutdown_hooks_context
| await func()
| File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/fast_depends/use.py", line 170, in injected_wrapper
| return await real_model.asolve( # type: ignore[no-any-return]
| File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/fast_depends/core/model.py", line 339, in asolve
| response = await run_async(self.call, *final_args, **final_kwargs)
| File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/fast_depends/utils.py", line 48, in run_async
| return await cast(Callable[P, Awaitable[T]], func)(*args, **kwargs)
| File "/Users/swelborn/gits/interactEM/backend/agent/interactem/agent/broker.py", line 38, in on_shutdown
| await agent.shutdown()
| File "/Users/swelborn/gits/interactEM/backend/agent/interactem/agent/agent.py", line 345, in shutdown
| await self.agent_kv.stop()
| File "/Users/swelborn/gits/interactEM/backend/core/interactem/core/nats/kv.py", line 272, in stop
| await self._main_task
| File "/Users/swelborn/gits/interactEM/backend/core/interactem/core/nats/kv.py", line 125, in _loop
| await self._update_values()
| File "/Users/swelborn/gits/interactEM/backend/core/interactem/core/nats/kv.py", line 162, in _update_values
| async with anyio.create_task_group() as tg:
| File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 781, in __aexit__
| raise BaseExceptionGroup(
| exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/nats/js/client.py", line 201, in publish
| msg = await self._nc.request(
| File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/nats/aio/client.py", line 1066, in request
| raise errors.NoRespondersError
| nats.errors.NoRespondersError: nats: no responders available for request
|
| During handling of the above exception, another exception occurred:
|
| Traceback (most recent call last):
| File "/Users/swelborn/gits/interactEM/backend/core/interactem/core/nats/kv.py", line 174, in _safe_put_value
| await self._bucket.put(key_str, validated.model_dump_json().encode())
| File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/nats/js/kv.py", line 209, in put
| pa = await self._js.publish(f"{self._pre}{key}", value)
| File "/Users/swelborn/gits/interactEM/backend/agent/.venv/lib/python3.10/site-packages/nats/js/client.py", line 208, in publish
| raise nats.js.errors.NoStreamResponseError
| nats.js.errors.NoStreamResponseError: nats: no response from stream
+------------------------------------
How to reproduce
I can develop a reproducer if you need, but before I do I would like to understand the intended behavior of this supervisor. It could be beneficial, but logs will become huge if this yells about this type of exception in a tight loop...
to give you an idea, this happens in my code here: https://github.com/NERSC/interactEM/blob/78b4f07747440d51064f0b204ebddf779ca71a39/backend/orchestrator/interactem/orchestrator/app.py
Hi @swelborn , this is normal behavior. There are some endless asyncio.Tasks that might fail. The supervisor is implemented via a task callback, which prints the error trace and restarts the task. You can disable this behavior by setting FASTSTREAM_SUPERVISOR_DISABLED=1.
Hey @fil1n, thanks for getting back. That makes sense, but wondering if there is some way to suppress these exceptions if they are endless.
I still want the behavior of trying to reconnect, but want to avoid the endless logging while the nats service is down or undergoing a leader election.
At the moment avoiding endless logging without disabling the supervisor is impossible. I will submit PR to add this option this week or next.