orchestrator-core icon indicating copy to clipboard operation
orchestrator-core copied to clipboard

CLI fails with RuntimeError: cannot schedule new futures after interpreter shutdown

Open torkashvand opened this issue 4 months ago • 4 comments

Contact Details

No response

What happened?

When we run an import using the CLI the process fails with this error:

RuntimeError: cannot schedule new futures after interpreter shutdown

here it is the actual code:

@app.command()
def import_edge_port(filepath: str = common_filepath_option) -> None:
    """Import Edge Port into GSO."""
    successfully_imported_data = []
    data = _read_data(Path(filepath))
    for edge_port in data:
        typer.echo(f"Importing Edge Port {edge_port['name']} on {edge_port['node']}. ")
        try:
            edge_port["node"] = get_active_router_subscription_id(edge_port["node"])
            initial_data = EdgePortImportModel(**edge_port)
            start_process("create_imported_edge_port", [initial_data.model_dump()])
            successfully_imported_data.append(edge_port["name"])
            typer.echo(f"Successfully imported Edge Port {edge_port['name']} on {edge_port['node']}.")
        except ValidationError as e:
            typer.echo(f"Validation error: {e}")

    typer.echo(IMPORT_WAIT_MESSAGE)
    time.sleep(1)

    edge_port_ids = get_subscriptions(
        product_types=[ProductType.IMPORTED_EDGE_PORT],
        lifecycles=[SubscriptionLifecycle.ACTIVE],
        includes=["subscription_id"],
    )
    for subscription_id in edge_port_ids:
        typer.echo(f"Migrating Edge Port {subscription_id}")
        start_process("import_edge_port", [subscription_id])

    if successfully_imported_data:
        typer.echo("Successfully imported Edge Ports:")
        for item in successfully_imported_data:
            typer.echo(f"- {item}")

and the workflow is:

@workflow(
    "Import Edge Port",
    initial_input_form=initial_input_form_generator,
    target=Target.CREATE,
)
def create_imported_edge_port() -> StepList:
    """Import an Edge Port without provisioning it."""
    return (
        begin
        >> create_subscription
        >> store_process_subscription(Target.CREATE)
        >> initialize_subscription
        >> set_status(SubscriptionLifecycle.ACTIVE)
        >> resync
        >> done
    )

What’s happening, step by step:

  1. The CLI command (e.g., import_edge_port) runs one or more workflows using start_process(...).

  2. These workflows include steps like resync or unsync, which are responsible for setting the subscription's insync status.

  3. Inside those steps, the system tries to invalidate the subscription cache so that any WebSocket clients (like the UI) get updated.

  4. This is done using a function called sync_invalidate_subscription_cache.

  5. That function was introduced by Tjeerd on **24 June 2025 (40 days ago) **#977. It runs an async Redis broadcast using a helper library called anyio, which temporarily starts an async event loop to talk to Redis.

  6. The event loop is created, the Redis broadcast is sent, and then the loop is shut down.

  7. However, the Redis connection object (used behind the scenes) is not explicitly closed — it waits for Python to clean it up automatically.

  8. After the workflow finishes, the CLI process ends and Python starts shutting down the interpreter.

  9. It shuts down the thread pool and async machinery.

  10. At this point, Python tries to run cleanup (del) for the Redis connection, which still has unfinished work.

  11. But it’s too late — the event loop and thread pool are already gone.

  12. This results in:

RuntimeError: cannot schedule new futures after interpreter shutdown

In the CLI, the process is short-lived, so cleanup happens after the system is already half-shut down.

I believe the root problem is not our import logic. It’s that the workflows indirectly uses async Redis calls, and the cleanup of those calls happens too late. Because sync_invalidate_subscription_cache wasn’t designed for short-lived CLI usage, we see this failure only there — not in FastAPI or lets say GUI.

Version

4.1.0

What python version are you seeing the problem on?

Python 3.12

Relevant log output

~ $ PYTHONPATH=. python gso/main.py import-cli import-l3-core-service --filepath=/var/tmp/VIE/ae21/ias.json
2025-08-01 13:59:50 [info     ] Could not get git commit hash, not setting version reason="Command '['/usr/bin/env', 'git', 'rev-parse', 'HEAD']' returned non-zero exit status 127."
2025-08-01 13:59:51 [info     ] Could not get git commit hash, not setting version reason="Command '['/usr/bin/env', 'git', 'rev-parse', 'HEAD']' returned non-zero exit status 127."
2025-08-01 13:59:51 [debug    ] Using orjson
2025-08-01 13:59:52 [info     ] WebSocketManager object configured, all methods referencing `websocket_manager` should work. [orchestrator.websocket]
2025-08-01 13:59:52 [info     ] DistLockManager object configured, all methods referencing `distlock_manager` should work. [orchestrator.distlock]
2025-08-01 13:59:52 [info     ] Database object configured, all methods referencing `db` should work. [orchestrator.db]
Starting import from /var/tmp/VIE/ae21/ias.json
Creating imported IAS for ROEDUNET
Successfully created imported IAS for ROEDUNET
Waiting for the dust to settle before importing new products...
2025-08-01 13:59:54 [info     ] Lifecycle validation check ok  [orchestrator.domain.base] func=set_status.<locals>._set_status process_id=85c7e8ad-bc44-47ec-a11e-cad042c5ff68 status=active subscription_description='Initial subscription of A pre-existing Internet Access Service that is imported into the service database' subscription_id=UUID('6b9f258f-cdb9-4e49-bd3b-9ea47c64e981') workflow_name=create_imported_ias
Exception ignored in: <function AbstractConnection.__del__ at 0x7f1748ae1760>
Traceback (most recent call last):
  File "/app/redis/asyncio/connection.py", line 217, in __del__
    self._close()
  File "/app/redis/asyncio/connection.py", line 224, in _close
    self._writer.close()
  File "/usr/local/lib/python3.12/asyncio/streams.py", line 358, in close
    return self._transport.close()
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/asyncio/selector_events.py", line 1210, in close
    super().close()
  File "/usr/local/lib/python3.12/asyncio/selector_events.py", line 875, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 795, in call_soon
    self._check_closed()
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 541, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Exception ignored in: <function AbstractConnection.__del__ at 0x7f1748ae1760>
Traceback (most recent call last):
  File "/app/redis/asyncio/connection.py", line 217, in __del__
    self._close()
  File "/app/redis/asyncio/connection.py", line 224, in _close
    self._writer.close()
  File "/usr/local/lib/python3.12/asyncio/streams.py", line 358, in close
    return self._transport.close()
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/asyncio/selector_events.py", line 1210, in close
    super().close()
  File "/usr/local/lib/python3.12/asyncio/selector_events.py", line 875, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 795, in call_soon
    self._check_closed()
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 541, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Exception ignored in: <function AbstractConnection.__del__ at 0x7f1748ae1760>
Traceback (most recent call last):
  File "/app/redis/asyncio/connection.py", line 217, in __del__
    self._close()
  File "/app/redis/asyncio/connection.py", line 224, in _close
    self._writer.close()
  File "/usr/local/lib/python3.12/asyncio/streams.py", line 358, in close
    return self._transport.close()
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/asyncio/selector_events.py", line 1210, in close
    super().close()
  File "/usr/local/lib/python3.12/asyncio/selector_events.py", line 875, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 795, in call_soon
    self._check_closed()
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 541, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Importing Imported IAS with 42779c8f-1a7f-43d6-b963-611c951e8aa4
Successfully created imported L3 Core Services:
- ['a63bc68e-73cf-4bb0-bbe0-a060b862be1f']
2025-08-01 13:59:54 [info     ] Creating backup of subscription details in the state [orchestrator.workflows.steps] func=unsync process_id=474c66f9-ac71-4c9a-8d82-d20e254dcd55 subscription_id=6b9f258f-cdb9-4e49-bd3b-9ea47c64e981 workflow_name=import_ias
2025-08-01 13:59:54 [warning  ] Rolling back transaction.      [orchestrator.workflow] func=unsync process_id=474c66f9-ac71-4c9a-8d82-d20e254dcd55 workflow_name=import_ias
/app/structlog/stdlib.py:1160: UserWarning: Remove `format_exc_info` from your processor chain if you want pretty exceptions.
  ed = p(logger, meth_name, ed)  # type: ignore[arg-type]
2025-08-01 13:59:54 [warning  ] Step failed                    [orchestrator.workflow] func=unsync process_id=474c66f9-ac71-4c9a-8d82-d20e254dcd55 workflow_name=import_ias
Traceback (most recent call last):
  File "/app/redis/asyncio/connection.py", line 275, in connect
    await self.retry.call_with_retry(
  File "/app/redis/asyncio/retry.py", line 59, in call_with_retry
    return await do()
           ^^^^^^^^^^
  File "/app/redis/asyncio/connection.py", line 691, in _connect
    reader, writer = await asyncio.open_connection(
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/asyncio/streams.py", line 48, in open_connection
    transport, _ = await loop.create_connection(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 1079, in create_connection
    infos = await self._ensure_resolved(
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 1455, in _ensure_resolved
    return await loop.getaddrinfo(host, port, family=family, type=type,
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 901, in getaddrinfo
    return await self.run_in_executor(
                 ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 863, in run_in_executor
    executor.submit(func, *args), loop=self)
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/concurrent/futures/thread.py", line 172, in submit
    raise RuntimeError('cannot schedule new futures after '
RuntimeError: cannot schedule new futures after interpreter shutdown
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
  File "/app/orchestrator/workflow.py", line 251, in wrapper
    result = step_in_inject_args(state)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/orchestrator/utils/state.py", line 315, in wrapper
    new_state = func(*args)
                ^^^^^^^^^^^
  File "/app/orchestrator/workflows/steps.py", line 98, in unsync
    sync_invalidate_subscription_cache(subscription.subscription_id)
  File "/app/orchestrator/websocket/__init__.py", line 98, in sync_invalidate_subscription_cache
    anyio.run(invalidate_subscription_cache, subscription_id, invalidate_all)
  File "/app/anyio/_core/_eventloop.py", line 74, in run
    return async_backend.run(func, args, {}, backend_options)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/anyio/_backends/_asyncio.py", line 2310, in run
    return runner.run(wrapper())
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/app/anyio/_backends/_asyncio.py", line 2298, in wrapper
    return await func(*args)
           ^^^^^^^^^^^^^^^^^
  File "/app/orchestrator/websocket/__init__.py", line 103, in invalidate_subscription_cache
    await broadcast_invalidate_cache({"type": "subscriptions"})
  File "/app/orchestrator/websocket/__init__.py", line 94, in broadcast_invalidate_cache
    await _broadcast_event("invalidateCache", cache_object)
  File "/app/orchestrator/websocket/__init__.py", line 86, in _broadcast_event
    await websocket_manager.broadcast_data([WS_CHANNELS.EVENTS], event)
  File "/app/orchestrator/websocket/websocket_manager.py", line 70, in broadcast_data
    await self._backend.broadcast_data(channels, data)
  File "/app/orchestrator/websocket/managers/broadcast_websocket_manager.py", line 121, in broadcast_data
    async with RedisBroadcast(self.broadcast_url).pipeline() as pipe:
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/contextlib.py", line 217, in __aexit__
    await anext(self.gen)
  File "/app/orchestrator/utils/redis.py", line 90, in pipeline
    await pipe.execute()
  File "/app/sentry_sdk/integrations/redis/_async_common.py", line 53, in _sentry_execute
    return await old_execute(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/redis/asyncio/client.py", line 1530, in execute
    conn = await self.connection_pool.get_connection("MULTI", self.shard_hint)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/redis/asyncio/connection.py", line 1058, in get_connection
    await self.ensure_connection(connection)
  File "/app/redis/asyncio/connection.py", line 1091, in ensure_connection
    await connection.connect()
  File "/app/redis/asyncio/connection.py", line 285, in connect
    raise ConnectionError(exc) from exc
redis.exceptions.ConnectionError: cannot schedule new futures after interpreter shutdown
2025-08-01 13:59:54 [error    ] Workflow returned an error.    [orchestrator.workflow] class=ConnectionError error='cannot schedule new futures after interpreter shutdown' traceback='ConnectionError: cannot schedule new futures after interpreter shutdown\n  File "/app/orchestrator/workflow.py", line 251, in wrapper\n    result = step_in_inject_args(state)\n             ^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File "/app/orchestrator/utils/state.py", line 315, in wrapper\n    new_state = func(*args)\n                ^^^^^^^^^^^\n  File "/app/orchestrator/workflows/steps.py", line 98, in unsync\n    sync_invalidate_subscription_cache(subscription.subscription_id)\n  File "/app/orchestrator/websocket/__init__.py", line 98, in sync_invalidate_subscription_cache\n    anyio.run(invalidate_subscription_cache, subscription_id, invalidate_all)\n  File "/app/anyio/_core/_eventloop.py", line 74, in run\n    return async_backend.run(func, args, {}, backend_options)\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File "/app/anyio/_backends/_asyncio.py", line 2310, in run\n    return runner.run(wrapper())\n           ^^^^^^^^^^^^^^^^^^^^^\n  File "/usr/local/lib/python3.12/asyncio/runners.py", line 118, in run\n    return self._loop.run_until_complete(task)\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete\n    return future.result()\n           ^^^^^^^^^^^^^^^\n  File "/app/anyio/_backends/_asyncio.py", line 2298, in wrapper\n    return await func(*args)\n           ^^^^^^^^^^^^^^^^^\n  File "/app/orchestrator/websocket/__init__.py", line 103, in invalidate_subscription_cache\n    await broadcast_invalidate_cache({"type": "subscriptions"})\n  File "/app/orchestrator/websocket/__init__.py", line 94, in broadcast_invalidate_cache\n    await _broadcast_event("invalidateCache", cache_object)\n  File "/app/orchestrator/websocket/__init__.py", line 86, in _broadcast_event\n    await websocket_manager.broadcast_data([WS_CHANNELS.EVENTS], event)\n  File "/app/orchestrator/websocket/websocket_manager.py", line 70, in broadcast_data\n    await self._backend.broadcast_data(channels, data)\n  File "/app/orchestrator/websocket/managers/broadcast_websocket_manager.py", line 121, in broadcast_data\n    async with RedisBroadcast(self.broadcast_url).pipeline() as pipe:\n               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File "/usr/local/lib/python3.12/contextlib.py", line 217, in __aexit__\n    await anext(self.gen)\n  File "/app/orchestrator/utils/redis.py", line 90, in pipeline\n    await pipe.execute()\n  File "/app/sentry_sdk/integrations/redis/_async_common.py", line 53, in _sentry_execute\n    return await old_execute(self, *args, **kwargs)\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File "/app/redis/asyncio/client.py", line 1530, in execute\n    conn = await self.connection_pool.get_connection("MULTI", self.shard_hint)\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File "/app/redis/asyncio/connection.py", line 1058, in get_connection\n    await self.ensure_connection(connection)\n  File "/app/redis/asyncio/connection.py", line 1091, in ensure_connection\n    await connection.connect()\n  File "/app/redis/asyncio/connection.py", line 285, in connect\n    raise ConnectionError(exc) from exc\n'
2025-08-01 13:59:55 [warning  ] Writing only process state to DB as step couldn't be found [orchestrator.services.processes] process_id=UUID('474c66f9-ac71-4c9a-8d82-d20e254dcd55')
2025-08-01 13:59:55 [error    ] Unknown workflow failure       [orchestrator.services.processes] process_id=UUID('474c66f9-ac71-4c9a-8d82-d20e254dcd55')
Traceback (most recent call last):
  File "/app/redis/asyncio/connection.py", line 275, in connect
    await self.retry.call_with_retry(
  File "/app/redis/asyncio/retry.py", line 59, in call_with_retry
    return await do()
           ^^^^^^^^^^
  File "/app/redis/asyncio/connection.py", line 691, in _connect
    reader, writer = await asyncio.open_connection(
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/asyncio/streams.py", line 48, in open_connection
    transport, _ = await loop.create_connection(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 1079, in create_connection
    infos = await self._ensure_resolved(
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 1455, in _ensure_resolved
    return await loop.getaddrinfo(host, port, family=family, type=type,
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 901, in getaddrinfo
    return await self.run_in_executor(
                 ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 863, in run_in_executor
    executor.submit(func, *args), loop=self)
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/concurrent/futures/thread.py", line 172, in submit
    raise RuntimeError('cannot schedule new futures after '
RuntimeError: cannot schedule new futures after interpreter shutdown
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
  File "/app/orchestrator/services/processes.py", line 381, in run
    result = f()
             ^^^
  File "/app/orchestrator/services/processes.py", line 473, in <lambda>
    return _run_process_async(pstat.process_id, lambda: runwf(pstat, _safe_logstep_with_func))
                                                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/orchestrator/workflow.py", line 1502, in runwf
    invalidate_status_counts()
  File "/app/orchestrator/workflow.py", line 1431, in invalidate_status_counts
    broadcast_invalidate_status_counts()
  File "/app/orchestrator/websocket/__init__.py", line 114, in broadcast_invalidate_status_counts
    sync_broadcast_invalidate_cache({"type": "processStatusCounts"})
  File "/app/orchestrator/websocket/__init__.py", line 90, in sync_broadcast_invalidate_cache
    anyio.run(broadcast_invalidate_cache, cache_object)
  File "/app/anyio/_core/_eventloop.py", line 74, in run
    return async_backend.run(func, args, {}, backend_options)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/anyio/_backends/_asyncio.py", line 2310, in run
    return runner.run(wrapper())
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/app/anyio/_backends/_asyncio.py", line 2298, in wrapper
    return await func(*args)
           ^^^^^^^^^^^^^^^^^
  File "/app/orchestrator/websocket/__init__.py", line 94, in broadcast_invalidate_cache
    await _broadcast_event("invalidateCache", cache_object)
  File "/app/orchestrator/websocket/__init__.py", line 86, in _broadcast_event
    await websocket_manager.broadcast_data([WS_CHANNELS.EVENTS], event)
  File "/app/orchestrator/websocket/websocket_manager.py", line 70, in broadcast_data
    await self._backend.broadcast_data(channels, data)
  File "/app/orchestrator/websocket/managers/broadcast_websocket_manager.py", line 121, in broadcast_data
    async with RedisBroadcast(self.broadcast_url).pipeline() as pipe:
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/contextlib.py", line 217, in __aexit__
    await anext(self.gen)
  File "/app/orchestrator/utils/redis.py", line 90, in pipeline
    await pipe.execute()
  File "/app/sentry_sdk/integrations/redis/_async_common.py", line 53, in _sentry_execute
    return await old_execute(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/redis/asyncio/client.py", line 1530, in execute
    conn = await self.connection_pool.get_connection("MULTI", self.shard_hint)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/redis/asyncio/connection.py", line 1058, in get_connection
    await self.ensure_connection(connection)
  File "/app/redis/asyncio/connection.py", line 1091, in ensure_connection
    await connection.connect()
  File "/app/redis/asyncio/connection.py", line 285, in connect
    raise ConnectionError(exc) from exc
redis.exceptions.ConnectionError: cannot schedule new futures after interpreter shutdown

torkashvand avatar Aug 04 '25 12:08 torkashvand

Thanks for the detailed bug report!

It indeed looks like the problem is that the CLI does not terminate "gracefully".

Preventing this error may be challenging. While it is problematic that the exit status is not 0, there isn't any significant impact - apart of missing a few cache invalidation messages in the frontend - so we'll assign this a low prio for now. But it will get picked up eventually.

Mark90 avatar Aug 04 '25 12:08 Mark90

Thanks, @Mark90, for looking into this. However, I don’t think this is a low-priority bug, as it prevents us from importing our existing data into our core database when using the CLI to run workflows and now we kind of blocked on this.

torkashvand avatar Aug 05 '25 15:08 torkashvand

I had to do this in our code: 💃🏻

import orchestrator.websocket

orchestrator.websocket.sync_invalidate_subscription_cache = lambda *args, **kwargs: None

torkashvand avatar Aug 05 '25 15:08 torkashvand

After the workflow finishes, the CLI process ends and Python starts shutting down the interpreter.

I gathered from this that the process ended, and that you're merely getting a RuntimeError on script exit?

Either way, it is not recommended to run workflows directly in a one-off CLI script. The correct way is to start it through the API so that it can be picked up by a dedicated threadpool/celery executor.

Mark90 avatar Aug 06 '25 07:08 Mark90