conductor-python icon indicating copy to clipboard operation
conductor-python copied to clipboard

Asyncio workers

Open v1r3n opened this issue 2 months ago • 1 comments

Draft for the AsyncIO workers

  • See examples/asyncio_workers.py for example
  • Design: WORKER_CONCURRENCY_DESIGN.md

Pending Items

  • Tests
  • Performance tests against a server, running for at-least 7 days to ensure no memory leaks or other issues
  • Benchmark against sync workers and publish the results
  • More examples

v1r3n avatar Nov 09 '25 08:11 v1r3n

Codecov Report

:x: Patch coverage is 80.09479% with 294 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
.../conductor/client/automator/task_runner_asyncio.py 80.90% 105 Missing :warning:
src/conductor/client/worker/worker_loader.py 0.00% 94 Missing :warning:
...conductor/client/automator/task_handler_asyncio.py 78.45% 39 Missing :warning:
...rc/conductor/client/telemetry/metrics_collector.py 82.50% 21 Missing :warning:
src/conductor/client/event/listeners.py 67.64% 11 Missing :warning:
src/conductor/client/event/listener_register.py 66.66% 10 Missing :warning:
src/conductor/client/worker/worker_config.py 89.55% 7 Missing :warning:
src/conductor/client/context/task_context.py 95.00% 3 Missing :warning:
src/conductor/client/event/event_dispatcher.py 95.55% 2 Missing :warning:
src/conductor/client/workflow/task/task.py 0.00% 2 Missing :warning:
Files with missing lines Coverage Δ
src/conductor/client/automator/task_handler.py 97.56% <100.00%> (+32.22%) :arrow_up:
src/conductor/client/automator/task_runner.py 100.00% <100.00%> (+20.27%) :arrow_up:
src/conductor/client/event/conductor_event.py 100.00% <100.00%> (ø)
src/conductor/client/event/task_events.py 100.00% <100.00%> (ø)
src/conductor/client/event/task_runner_events.py 100.00% <100.00%> (ø)
src/conductor/client/event/workflow_events.py 100.00% <100.00%> (ø)
src/conductor/client/http/api_client.py 98.97% <100.00%> (+44.17%) :arrow_up:
...rc/conductor/client/http/models/integration_api.py 97.79% <ø> (-0.14%) :arrow_down:
src/conductor/client/http/models/schema_def.py 91.81% <ø> (-0.15%) :arrow_down:
src/conductor/client/http/models/workflow_def.py 85.19% <ø> (-0.38%) :arrow_down:
... and 16 more

... and 4 files with indirect coverage changes

:rocket: New features to boost your workflow:
  • :snowflake: Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

codecov[bot] avatar Nov 09 '25 08:11 codecov[bot]

@v1r3n Overall looks good, though could you tell why we continue using threads for async workers instead of one thread and event loop in it for handling all? From what we are seeing currently, Async workers use BackgroundEventLoop (separate thread) for coroutine execution, while worker threads handle task submission and polling. As a suggestion, for pure async workers we could use a single-threaded event loop architecture (one process = one thread + event loop) which should be faster for I/O-bound async workloads by eliminating thread switching overhead and GIL contention.

IgorChvyrov-sm avatar Nov 26 '25 15:11 IgorChvyrov-sm

@v1r3n Overall looks good, though could you tell why we continue using threads for async workers instead of one thread and event loop in it for handling all? From what we are seeing currently, Async workers use BackgroundEventLoop (separate thread) for coroutine execution, while worker threads handle task submission and polling. As a suggestion, for pure async workers we could use a single-threaded event loop architecture (one process = one thread + event loop) which should be faster for I/O-bound async workloads by eliminating thread switching overhead and GIL contention.

https://github.com/conductor-oss/python-sdk/blob/async_io_workers/docs/design/WORKER_ARCHITECTURE.md Each worker runs in its own process, and has its own poll/execute/update loop. This avoids GIL contention across threads for multiple workers (with an expense of having a separate process per worker). The actual execution of the task can be in an async loop (if marked as async) or thread (otherwise). So that is the only difference.

One thing to consider is that marking a task async makes it run in an ayncio pool -- alternatively we can make it explicit, but I don't really see much benefit and introduces another flag that you have to maintain and then creates the complication that now you have to have all or nothing implementation for workers.

v1r3n avatar Nov 26 '25 19:11 v1r3n

@v1r3n Overall looks good, though could you tell why we continue using threads for async workers instead of one thread and event loop in it for handling all? From what we are seeing currently, Async workers use BackgroundEventLoop (separate thread) for coroutine execution, while worker threads handle task submission and polling. As a suggestion, for pure async workers we could use a single-threaded event loop architecture (one process = one thread + event loop) which should be faster for I/O-bound async workloads by eliminating thread switching overhead and GIL contention.

https://github.com/conductor-oss/python-sdk/blob/async_io_workers/docs/design/WORKER_ARCHITECTURE.md Each worker runs in its own process, and has its own poll/execute/update loop. This avoids GIL contention across threads for multiple workers (with an expense of having a separate process per worker). The actual execution of the task can be in an async loop (if marked as async) or thread (otherwise). So that is the only difference.

One thing to consider is that marking a task async makes it run in an ayncio pool -- alternatively we can make it explicit, but I don't really see much benefit and introduces another flag that you have to maintain and then creates the complication that now you have to have all or nothing implementation for workers.

Totally agree that the current design (one process per worker, threads inside for poll/execute/update) solves the “multi-worker GIL contention” problem well. My question was a bit narrower: within a single worker that’s already marked async def, we still keep the per-task threads (ThreadPoolExecutor.submit(...) + BackgroundEventLoop) even though the work after polling is 100% asyncio. That thread handoff is what adds context-switch overhead and keeps the poll loop in a different thread from the async execution. I’m not proposing another global flag that forces all workers into async mode. I’m suggesting an optional AsyncTaskRunner that’s picked automatically when the decorated function is async def. It would still be one process per worker (no change there) but switch from:

Main thread: poll → submit to ThreadPoolExecutor ThreadPool thread: start coroutine → hand to BackgroundEventLoop thread BackgroundEventLoop thread: actually run coroutine to: Single thread/event loop in that process: async poll() → async execute() → async update()

So the initial suggestion is about sync workers to keep the current code path (polling thread + worker threads) and Async workers to get “one process = one thread + asyncio loop”, which removes the extra thread switches and BackgroundEventLoop plumbing while still keeping process isolation. That was the motivation behind the question.

IgorChvyrov-sm avatar Nov 27 '25 10:11 IgorChvyrov-sm

@v1r3n Overall looks good, though could you tell why we continue using threads for async workers instead of one thread and event loop in it for handling all? From what we are seeing currently, Async workers use BackgroundEventLoop (separate thread) for coroutine execution, while worker threads handle task submission and polling. As a suggestion, for pure async workers we could use a single-threaded event loop architecture (one process = one thread + event loop) which should be faster for I/O-bound async workloads by eliminating thread switching overhead and GIL contention.

https://github.com/conductor-oss/python-sdk/blob/async_io_workers/docs/design/WORKER_ARCHITECTURE.md Each worker runs in its own process, and has its own poll/execute/update loop. This avoids GIL contention across threads for multiple workers (with an expense of having a separate process per worker). The actual execution of the task can be in an async loop (if marked as async) or thread (otherwise). So that is the only difference. One thing to consider is that marking a task async makes it run in an ayncio pool -- alternatively we can make it explicit, but I don't really see much benefit and introduces another flag that you have to maintain and then creates the complication that now you have to have all or nothing implementation for workers.

Totally agree that the current design (one process per worker, threads inside for poll/execute/update) solves the “multi-worker GIL contention” problem well. My question was a bit narrower: within a single worker that’s already marked async def, we still keep the per-task threads (ThreadPoolExecutor.submit(...) + BackgroundEventLoop) even though the work after polling is 100% asyncio. That thread handoff is what adds context-switch overhead and keeps the poll loop in a different thread from the async execution. I’m not proposing another global flag that forces all workers into async mode. I’m suggesting an optional AsyncTaskRunner that’s picked automatically when the decorated function is async def. It would still be one process per worker (no change there) but switch from:

Main thread: poll → submit to ThreadPoolExecutor ThreadPool thread: start coroutine → hand to BackgroundEventLoop thread BackgroundEventLoop thread: actually run coroutine to: Single thread/event loop in that process: async poll() → async execute() → async update()

So the initial suggestion is about sync workers to keep the current code path (polling thread + worker threads) and Async workers to get “one process = one thread + asyncio loop”, which removes the extra thread switches and BackgroundEventLoop plumbing while still keeping process isolation. That was the motivation behind the question.

These are good points. I updated the PR to take care of this, the new architecture https://github.com/conductor-oss/python-sdk/blob/async_io_workers/docs/design/WORKER_DESIGN.md

v1r3n avatar Nov 30 '25 03:11 v1r3n