CLI fails with RuntimeError: cannot schedule new futures after interpreter shutdown
Contact Details
No response
What happened?
When we run an import using the CLI the process fails with this error:
RuntimeError: cannot schedule new futures after interpreter shutdown
here it is the actual code:
@app.command()
def import_edge_port(filepath: str = common_filepath_option) -> None:
"""Import Edge Port into GSO."""
successfully_imported_data = []
data = _read_data(Path(filepath))
for edge_port in data:
typer.echo(f"Importing Edge Port {edge_port['name']} on {edge_port['node']}. ")
try:
edge_port["node"] = get_active_router_subscription_id(edge_port["node"])
initial_data = EdgePortImportModel(**edge_port)
start_process("create_imported_edge_port", [initial_data.model_dump()])
successfully_imported_data.append(edge_port["name"])
typer.echo(f"Successfully imported Edge Port {edge_port['name']} on {edge_port['node']}.")
except ValidationError as e:
typer.echo(f"Validation error: {e}")
typer.echo(IMPORT_WAIT_MESSAGE)
time.sleep(1)
edge_port_ids = get_subscriptions(
product_types=[ProductType.IMPORTED_EDGE_PORT],
lifecycles=[SubscriptionLifecycle.ACTIVE],
includes=["subscription_id"],
)
for subscription_id in edge_port_ids:
typer.echo(f"Migrating Edge Port {subscription_id}")
start_process("import_edge_port", [subscription_id])
if successfully_imported_data:
typer.echo("Successfully imported Edge Ports:")
for item in successfully_imported_data:
typer.echo(f"- {item}")
and the workflow is:
@workflow(
"Import Edge Port",
initial_input_form=initial_input_form_generator,
target=Target.CREATE,
)
def create_imported_edge_port() -> StepList:
"""Import an Edge Port without provisioning it."""
return (
begin
>> create_subscription
>> store_process_subscription(Target.CREATE)
>> initialize_subscription
>> set_status(SubscriptionLifecycle.ACTIVE)
>> resync
>> done
)
What’s happening, step by step:
-
The CLI command (e.g., import_edge_port) runs one or more workflows using
start_process(...). -
These workflows include steps like
resyncorunsync, which are responsible for setting the subscription's insync status. -
Inside those steps, the system tries to invalidate the subscription cache so that any WebSocket clients (like the UI) get updated.
-
This is done using a function called
sync_invalidate_subscription_cache. -
That function was introduced by Tjeerd on **24 June 2025 (40 days ago) **#977. It runs an async Redis broadcast using a helper library called
anyio, which temporarily starts an async event loop to talk to Redis. -
The event loop is created, the Redis broadcast is sent, and then the loop is shut down.
-
However, the Redis connection object (used behind the scenes) is not explicitly closed — it waits for Python to clean it up automatically.
-
After the workflow finishes, the CLI process ends and Python starts shutting down the interpreter.
-
It shuts down the thread pool and async machinery.
-
At this point, Python tries to run cleanup (del) for the Redis connection, which still has unfinished work.
-
But it’s too late — the event loop and thread pool are already gone.
-
This results in:
RuntimeError: cannot schedule new futures after interpreter shutdown
In the CLI, the process is short-lived, so cleanup happens after the system is already half-shut down.
I believe the root problem is not our import logic. It’s that the workflows indirectly uses async Redis calls, and the cleanup of those calls happens too late. Because sync_invalidate_subscription_cache wasn’t designed for short-lived CLI usage, we see this failure only there — not in FastAPI or lets say GUI.
Version
4.1.0
What python version are you seeing the problem on?
Python 3.12
Relevant log output
~ $ PYTHONPATH=. python gso/main.py import-cli import-l3-core-service --filepath=/var/tmp/VIE/ae21/ias.json
2025-08-01 13:59:50 [info ] Could not get git commit hash, not setting version reason="Command '['/usr/bin/env', 'git', 'rev-parse', 'HEAD']' returned non-zero exit status 127."
2025-08-01 13:59:51 [info ] Could not get git commit hash, not setting version reason="Command '['/usr/bin/env', 'git', 'rev-parse', 'HEAD']' returned non-zero exit status 127."
2025-08-01 13:59:51 [debug ] Using orjson
2025-08-01 13:59:52 [info ] WebSocketManager object configured, all methods referencing `websocket_manager` should work. [orchestrator.websocket]
2025-08-01 13:59:52 [info ] DistLockManager object configured, all methods referencing `distlock_manager` should work. [orchestrator.distlock]
2025-08-01 13:59:52 [info ] Database object configured, all methods referencing `db` should work. [orchestrator.db]
Starting import from /var/tmp/VIE/ae21/ias.json
Creating imported IAS for ROEDUNET
Successfully created imported IAS for ROEDUNET
Waiting for the dust to settle before importing new products...
2025-08-01 13:59:54 [info ] Lifecycle validation check ok [orchestrator.domain.base] func=set_status.<locals>._set_status process_id=85c7e8ad-bc44-47ec-a11e-cad042c5ff68 status=active subscription_description='Initial subscription of A pre-existing Internet Access Service that is imported into the service database' subscription_id=UUID('6b9f258f-cdb9-4e49-bd3b-9ea47c64e981') workflow_name=create_imported_ias
Exception ignored in: <function AbstractConnection.__del__ at 0x7f1748ae1760>
Traceback (most recent call last):
File "/app/redis/asyncio/connection.py", line 217, in __del__
self._close()
File "/app/redis/asyncio/connection.py", line 224, in _close
self._writer.close()
File "/usr/local/lib/python3.12/asyncio/streams.py", line 358, in close
return self._transport.close()
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/asyncio/selector_events.py", line 1210, in close
super().close()
File "/usr/local/lib/python3.12/asyncio/selector_events.py", line 875, in close
self._loop.call_soon(self._call_connection_lost, None)
File "/usr/local/lib/python3.12/asyncio/base_events.py", line 795, in call_soon
self._check_closed()
File "/usr/local/lib/python3.12/asyncio/base_events.py", line 541, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Exception ignored in: <function AbstractConnection.__del__ at 0x7f1748ae1760>
Traceback (most recent call last):
File "/app/redis/asyncio/connection.py", line 217, in __del__
self._close()
File "/app/redis/asyncio/connection.py", line 224, in _close
self._writer.close()
File "/usr/local/lib/python3.12/asyncio/streams.py", line 358, in close
return self._transport.close()
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/asyncio/selector_events.py", line 1210, in close
super().close()
File "/usr/local/lib/python3.12/asyncio/selector_events.py", line 875, in close
self._loop.call_soon(self._call_connection_lost, None)
File "/usr/local/lib/python3.12/asyncio/base_events.py", line 795, in call_soon
self._check_closed()
File "/usr/local/lib/python3.12/asyncio/base_events.py", line 541, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Exception ignored in: <function AbstractConnection.__del__ at 0x7f1748ae1760>
Traceback (most recent call last):
File "/app/redis/asyncio/connection.py", line 217, in __del__
self._close()
File "/app/redis/asyncio/connection.py", line 224, in _close
self._writer.close()
File "/usr/local/lib/python3.12/asyncio/streams.py", line 358, in close
return self._transport.close()
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/asyncio/selector_events.py", line 1210, in close
super().close()
File "/usr/local/lib/python3.12/asyncio/selector_events.py", line 875, in close
self._loop.call_soon(self._call_connection_lost, None)
File "/usr/local/lib/python3.12/asyncio/base_events.py", line 795, in call_soon
self._check_closed()
File "/usr/local/lib/python3.12/asyncio/base_events.py", line 541, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Importing Imported IAS with 42779c8f-1a7f-43d6-b963-611c951e8aa4
Successfully created imported L3 Core Services:
- ['a63bc68e-73cf-4bb0-bbe0-a060b862be1f']
2025-08-01 13:59:54 [info ] Creating backup of subscription details in the state [orchestrator.workflows.steps] func=unsync process_id=474c66f9-ac71-4c9a-8d82-d20e254dcd55 subscription_id=6b9f258f-cdb9-4e49-bd3b-9ea47c64e981 workflow_name=import_ias
2025-08-01 13:59:54 [warning ] Rolling back transaction. [orchestrator.workflow] func=unsync process_id=474c66f9-ac71-4c9a-8d82-d20e254dcd55 workflow_name=import_ias
/app/structlog/stdlib.py:1160: UserWarning: Remove `format_exc_info` from your processor chain if you want pretty exceptions.
ed = p(logger, meth_name, ed) # type: ignore[arg-type]
2025-08-01 13:59:54 [warning ] Step failed [orchestrator.workflow] func=unsync process_id=474c66f9-ac71-4c9a-8d82-d20e254dcd55 workflow_name=import_ias
Traceback (most recent call last):
File "/app/redis/asyncio/connection.py", line 275, in connect
await self.retry.call_with_retry(
File "/app/redis/asyncio/retry.py", line 59, in call_with_retry
return await do()
^^^^^^^^^^
File "/app/redis/asyncio/connection.py", line 691, in _connect
reader, writer = await asyncio.open_connection(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/asyncio/streams.py", line 48, in open_connection
transport, _ = await loop.create_connection(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/asyncio/base_events.py", line 1079, in create_connection
infos = await self._ensure_resolved(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/asyncio/base_events.py", line 1455, in _ensure_resolved
return await loop.getaddrinfo(host, port, family=family, type=type,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/asyncio/base_events.py", line 901, in getaddrinfo
return await self.run_in_executor(
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/asyncio/base_events.py", line 863, in run_in_executor
executor.submit(func, *args), loop=self)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/concurrent/futures/thread.py", line 172, in submit
raise RuntimeError('cannot schedule new futures after '
RuntimeError: cannot schedule new futures after interpreter shutdown
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/app/orchestrator/workflow.py", line 251, in wrapper
result = step_in_inject_args(state)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/orchestrator/utils/state.py", line 315, in wrapper
new_state = func(*args)
^^^^^^^^^^^
File "/app/orchestrator/workflows/steps.py", line 98, in unsync
sync_invalidate_subscription_cache(subscription.subscription_id)
File "/app/orchestrator/websocket/__init__.py", line 98, in sync_invalidate_subscription_cache
anyio.run(invalidate_subscription_cache, subscription_id, invalidate_all)
File "/app/anyio/_core/_eventloop.py", line 74, in run
return async_backend.run(func, args, {}, backend_options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/anyio/_backends/_asyncio.py", line 2310, in run
return runner.run(wrapper())
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/app/anyio/_backends/_asyncio.py", line 2298, in wrapper
return await func(*args)
^^^^^^^^^^^^^^^^^
File "/app/orchestrator/websocket/__init__.py", line 103, in invalidate_subscription_cache
await broadcast_invalidate_cache({"type": "subscriptions"})
File "/app/orchestrator/websocket/__init__.py", line 94, in broadcast_invalidate_cache
await _broadcast_event("invalidateCache", cache_object)
File "/app/orchestrator/websocket/__init__.py", line 86, in _broadcast_event
await websocket_manager.broadcast_data([WS_CHANNELS.EVENTS], event)
File "/app/orchestrator/websocket/websocket_manager.py", line 70, in broadcast_data
await self._backend.broadcast_data(channels, data)
File "/app/orchestrator/websocket/managers/broadcast_websocket_manager.py", line 121, in broadcast_data
async with RedisBroadcast(self.broadcast_url).pipeline() as pipe:
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/contextlib.py", line 217, in __aexit__
await anext(self.gen)
File "/app/orchestrator/utils/redis.py", line 90, in pipeline
await pipe.execute()
File "/app/sentry_sdk/integrations/redis/_async_common.py", line 53, in _sentry_execute
return await old_execute(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/redis/asyncio/client.py", line 1530, in execute
conn = await self.connection_pool.get_connection("MULTI", self.shard_hint)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/redis/asyncio/connection.py", line 1058, in get_connection
await self.ensure_connection(connection)
File "/app/redis/asyncio/connection.py", line 1091, in ensure_connection
await connection.connect()
File "/app/redis/asyncio/connection.py", line 285, in connect
raise ConnectionError(exc) from exc
redis.exceptions.ConnectionError: cannot schedule new futures after interpreter shutdown
2025-08-01 13:59:54 [error ] Workflow returned an error. [orchestrator.workflow] class=ConnectionError error='cannot schedule new futures after interpreter shutdown' traceback='ConnectionError: cannot schedule new futures after interpreter shutdown\n File "/app/orchestrator/workflow.py", line 251, in wrapper\n result = step_in_inject_args(state)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/app/orchestrator/utils/state.py", line 315, in wrapper\n new_state = func(*args)\n ^^^^^^^^^^^\n File "/app/orchestrator/workflows/steps.py", line 98, in unsync\n sync_invalidate_subscription_cache(subscription.subscription_id)\n File "/app/orchestrator/websocket/__init__.py", line 98, in sync_invalidate_subscription_cache\n anyio.run(invalidate_subscription_cache, subscription_id, invalidate_all)\n File "/app/anyio/_core/_eventloop.py", line 74, in run\n return async_backend.run(func, args, {}, backend_options)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/app/anyio/_backends/_asyncio.py", line 2310, in run\n return runner.run(wrapper())\n ^^^^^^^^^^^^^^^^^^^^^\n File "/usr/local/lib/python3.12/asyncio/runners.py", line 118, in run\n return self._loop.run_until_complete(task)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/usr/local/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete\n return future.result()\n ^^^^^^^^^^^^^^^\n File "/app/anyio/_backends/_asyncio.py", line 2298, in wrapper\n return await func(*args)\n ^^^^^^^^^^^^^^^^^\n File "/app/orchestrator/websocket/__init__.py", line 103, in invalidate_subscription_cache\n await broadcast_invalidate_cache({"type": "subscriptions"})\n File "/app/orchestrator/websocket/__init__.py", line 94, in broadcast_invalidate_cache\n await _broadcast_event("invalidateCache", cache_object)\n File "/app/orchestrator/websocket/__init__.py", line 86, in _broadcast_event\n await websocket_manager.broadcast_data([WS_CHANNELS.EVENTS], event)\n File "/app/orchestrator/websocket/websocket_manager.py", line 70, in broadcast_data\n await self._backend.broadcast_data(channels, data)\n File "/app/orchestrator/websocket/managers/broadcast_websocket_manager.py", line 121, in broadcast_data\n async with RedisBroadcast(self.broadcast_url).pipeline() as pipe:\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/usr/local/lib/python3.12/contextlib.py", line 217, in __aexit__\n await anext(self.gen)\n File "/app/orchestrator/utils/redis.py", line 90, in pipeline\n await pipe.execute()\n File "/app/sentry_sdk/integrations/redis/_async_common.py", line 53, in _sentry_execute\n return await old_execute(self, *args, **kwargs)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/app/redis/asyncio/client.py", line 1530, in execute\n conn = await self.connection_pool.get_connection("MULTI", self.shard_hint)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/app/redis/asyncio/connection.py", line 1058, in get_connection\n await self.ensure_connection(connection)\n File "/app/redis/asyncio/connection.py", line 1091, in ensure_connection\n await connection.connect()\n File "/app/redis/asyncio/connection.py", line 285, in connect\n raise ConnectionError(exc) from exc\n'
2025-08-01 13:59:55 [warning ] Writing only process state to DB as step couldn't be found [orchestrator.services.processes] process_id=UUID('474c66f9-ac71-4c9a-8d82-d20e254dcd55')
2025-08-01 13:59:55 [error ] Unknown workflow failure [orchestrator.services.processes] process_id=UUID('474c66f9-ac71-4c9a-8d82-d20e254dcd55')
Traceback (most recent call last):
File "/app/redis/asyncio/connection.py", line 275, in connect
await self.retry.call_with_retry(
File "/app/redis/asyncio/retry.py", line 59, in call_with_retry
return await do()
^^^^^^^^^^
File "/app/redis/asyncio/connection.py", line 691, in _connect
reader, writer = await asyncio.open_connection(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/asyncio/streams.py", line 48, in open_connection
transport, _ = await loop.create_connection(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/asyncio/base_events.py", line 1079, in create_connection
infos = await self._ensure_resolved(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/asyncio/base_events.py", line 1455, in _ensure_resolved
return await loop.getaddrinfo(host, port, family=family, type=type,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/asyncio/base_events.py", line 901, in getaddrinfo
return await self.run_in_executor(
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/asyncio/base_events.py", line 863, in run_in_executor
executor.submit(func, *args), loop=self)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/concurrent/futures/thread.py", line 172, in submit
raise RuntimeError('cannot schedule new futures after '
RuntimeError: cannot schedule new futures after interpreter shutdown
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/app/orchestrator/services/processes.py", line 381, in run
result = f()
^^^
File "/app/orchestrator/services/processes.py", line 473, in <lambda>
return _run_process_async(pstat.process_id, lambda: runwf(pstat, _safe_logstep_with_func))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/orchestrator/workflow.py", line 1502, in runwf
invalidate_status_counts()
File "/app/orchestrator/workflow.py", line 1431, in invalidate_status_counts
broadcast_invalidate_status_counts()
File "/app/orchestrator/websocket/__init__.py", line 114, in broadcast_invalidate_status_counts
sync_broadcast_invalidate_cache({"type": "processStatusCounts"})
File "/app/orchestrator/websocket/__init__.py", line 90, in sync_broadcast_invalidate_cache
anyio.run(broadcast_invalidate_cache, cache_object)
File "/app/anyio/_core/_eventloop.py", line 74, in run
return async_backend.run(func, args, {}, backend_options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/anyio/_backends/_asyncio.py", line 2310, in run
return runner.run(wrapper())
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/app/anyio/_backends/_asyncio.py", line 2298, in wrapper
return await func(*args)
^^^^^^^^^^^^^^^^^
File "/app/orchestrator/websocket/__init__.py", line 94, in broadcast_invalidate_cache
await _broadcast_event("invalidateCache", cache_object)
File "/app/orchestrator/websocket/__init__.py", line 86, in _broadcast_event
await websocket_manager.broadcast_data([WS_CHANNELS.EVENTS], event)
File "/app/orchestrator/websocket/websocket_manager.py", line 70, in broadcast_data
await self._backend.broadcast_data(channels, data)
File "/app/orchestrator/websocket/managers/broadcast_websocket_manager.py", line 121, in broadcast_data
async with RedisBroadcast(self.broadcast_url).pipeline() as pipe:
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/contextlib.py", line 217, in __aexit__
await anext(self.gen)
File "/app/orchestrator/utils/redis.py", line 90, in pipeline
await pipe.execute()
File "/app/sentry_sdk/integrations/redis/_async_common.py", line 53, in _sentry_execute
return await old_execute(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/redis/asyncio/client.py", line 1530, in execute
conn = await self.connection_pool.get_connection("MULTI", self.shard_hint)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/redis/asyncio/connection.py", line 1058, in get_connection
await self.ensure_connection(connection)
File "/app/redis/asyncio/connection.py", line 1091, in ensure_connection
await connection.connect()
File "/app/redis/asyncio/connection.py", line 285, in connect
raise ConnectionError(exc) from exc
redis.exceptions.ConnectionError: cannot schedule new futures after interpreter shutdown
Thanks for the detailed bug report!
It indeed looks like the problem is that the CLI does not terminate "gracefully".
Preventing this error may be challenging. While it is problematic that the exit status is not 0, there isn't any significant impact - apart of missing a few cache invalidation messages in the frontend - so we'll assign this a low prio for now. But it will get picked up eventually.
Thanks, @Mark90, for looking into this. However, I don’t think this is a low-priority bug, as it prevents us from importing our existing data into our core database when using the CLI to run workflows and now we kind of blocked on this.
I had to do this in our code: 💃🏻
import orchestrator.websocket
orchestrator.websocket.sync_invalidate_subscription_cache = lambda *args, **kwargs: None
After the workflow finishes, the CLI process ends and Python starts shutting down the interpreter.
I gathered from this that the process ended, and that you're merely getting a RuntimeError on script exit?
Either way, it is not recommended to run workflows directly in a one-off CLI script. The correct way is to start it through the API so that it can be picked up by a dedicated threadpool/celery executor.