cylc-flow icon indicating copy to clipboard operation
cylc-flow copied to clipboard

subprocpool: convert to async

Open oliver-sanders opened this issue 2 years ago • 1 comments

The subprocpool allows us to run subprocesses asynchronously in a limited pool to avoid overwhelming the scheduler host.

Proposed Change

Re-write the subprocpool as async code so we can await sub process calls from the code which issued them.

If required it would be possible to support both a call/callback and an async/await interface to the new pool.

I considered doing this during Cylc 8 development, however, at the time I figured this wasn't necessary and would bloat the project. In retrospect it would probably have saved time...

Why

The subprocpool was implemented in Cylc 7 using a call/callback pattern. Code that works with the subprocpool must issue commands, then, in a future main loop iteration check to see whether the callback has been fired. This is a slightly icky pattern but it's worked ok, up until we needed to add remote-init/remote-fileinstall/platform-selection/host-selection into the job submission pipeline.

Conceptually this is simple, here's an outline of how the job-submission / intelligent host-selection works:

async def submit(itask, bad_hosts):
    platform_string = itask.taskdef.rtconfig['platform']
    for platform in select_platform(platform_string):
        for host in select_host(platform, bad_hosts):
            ret = await remote_init(platform, host)
            if ret.return_code == 255:
                bad_hosts.add(host)
                continue
            ret = await remote_fileinstall(platform, host)
            if ret.return_code == 255:
                bad_hosts.add(host)
                continue
            await check_syntax(itask, platform, host)
            ret = await submit_job(itask, platform, host)
            if ret.return_code == 255:
                bad_hosts.add(host)
                continue
            break
    else:
        raise PlatformError(f'no hosts available for {platform}')

But when you try to write this using a call/callback pattern it gets messy. The actual code is much, much longer and spread between multiple functions (which issue the calls) and callbacks (which update the shared state).

  • We have to push tasks through the submission pipeline multiple times.
  • We can't maintain state in local variables so we have to store state elsewhere and have the callbacks edit that state when things happen.
  • This isn't just inefficient it convolutes the problem turning something simple into something complicated.

(Minor sidenote it also separates you from the subprocess.Popen object which would be really handy to have).

How

The async/await interfaces absorb the call/callback and state management aspects by abstracting them away.

This leaves us to write the business logic which is fairly straight forward, here's an example implementation:

import asyncio                                                                       
from subprocess import Popen


class ProcItem:
 
    def __init__(self, args, kwargs):
        self.args = args
        self.kwargs = kwargs
        self.future = asyncio.Future()
        self.proc = None


class ProcPool:
 
    def __init__(self, max_size=5):
        self._queued = asyncio.Queue()
        self._running = set()
        self.max_size = max_size
        self._slots = asyncio.Queue()
 
    async def run(self, *args, **kwargs):
        """Run a new process."""
        item = ProcItem(args, kwargs)
        await self._queued.put(item)
        await item.future
        return item.proc

    async def runner(self):
        """Event driven process submitter."""
        for _ in range(self.max_size):
            await self._slots.put(True)
        while True:
            # claim a slot in the pool
            await self._slots.get()
            # wait for an item to be queued
            item = await self._queued.get()
            print(f'$ {item.args}')
            item.proc = Popen(*item.args, **item.kwargs)
            self._running.add(item)
 
    async def poller(self):
        """Process poller."""
        while True:
            await asyncio.sleep(1)
            for item in set(self._running):
                if item.proc.poll() is not None:
                    item.future.set_result(item.proc.returncode)
                    self._running.remove(item)
                    await self._slots.put(True)


async def test():
    from subprocess import PIPE
 
    async def _run(pool, world):
        proc = await pool.run(['echo', 'hello', world], stdout=PIPE)
        print(proc.communicate())
 
    pool = ProcPool(2)
    await asyncio.gather(
        pool.poller(),
        pool.runner(),
        _run(pool, 'earth'),
        _run(pool, 'mars'),
        _run(pool, 'jupyter'),
        _run(pool, 'saturn'),
        _run(pool, 'pluto'),
    )


asyncio.run(test())  

Long Term Context

This would help us to break the main loop lockstep: https://github.com/cylc/cylc-flow/issues/3498

Submissions would go through faster and the load/complexity of the main loop would be reduced.

Decoupling job submission from the main loop would simplify the logic but also provide us with a lot more flexility over how we run job submission. E.G. we could push it out into another process or (using zmq queues) even push it onto another host (lightweight submission on another host controlled by the remote scheduler, i.e. very interesting cloud possibilities).

Pull requests welcome!

oliver-sanders avatar Jul 26 '22 17:07 oliver-sanders

:tada: yes this would be a great improvement!

(We did not have the option to use asyncio when the current system was devised, of course).

hjoliver avatar Jul 26 '22 23:07 hjoliver

I had a look at this today as a training exercise, conclusions:

  • The async approach allows for much nicer separation of concerns:
    • The submission pipeline can be defined in one place.
    • The batching logic in another.
    • The state change logic in another.
    • The stages of the pipeline (remote-init, remote-fileinstall, submission, etc), can all be written as standalone functions.
    • The TaskRemoteManager can be completely replaced.
    • Logic for handling things like submission-failure would be done in one place, rather than implemented in many different callbacks with potential for code to get out of sync or create strange state interactions.
  • But it's a substantial refactor and the logic won't end up 100% the same.
    • So there's an element of risk, we'll would need to dedicate a little time to this one.
    • We can prepare for this change by making bits of the code more functional. E.G. the bits which actually do the work can be partially chipped off of TaskRemoteMgr into standalone functions to some extent.
  • The job submission would become an async loop which would run alongside the main loop.
    • The main loop would queue jobs for submission, the submission loop would pick them up and submit them.
    • But these two loops would be decoupled.
    • We could put the submission loop into its own process for complete decoupling.
    • If we used zmq queues and communicated updates from the submission loop back using task messages (i.e. send the task message preparing rather than itask.state_reset(TASK_STATUS_PREPARING), then we could actually run the submission loop on another host if we wanted to which is an interesting potential.

Here's how the top-level code would look after this change:

async def submitter(bad_hosts, submission_queue, subprocpool):
    """Job submitter thinggy.

    When you want jobs to be submitted, push the corresponding tasks
    into the submission_queue and lean back, the submitter does the work
    for you.
    """
    cache = SelectCache()
    while True:
        for itask in submission_queue.get()
            try:
                await _submit(cache, itask, bad_hosts, subprocpool)
            except (JobSyntaxError, PlatformError, SubmissionError):
                # TODO: => submit-fail
            else:
                # TODO: => submitted


async def _submit(cache, itask, bad_hosts, subprocpool):
    """The job submission pipeline for a single task."""
    rtconfig = _get_rtconfig(itask, broadcast_mgr)
    select_host = await _get_host_host_selector(cache, itask, rtconfig, bad_hosts, subprocpool)
    for platform, host in select_host:
        try:
            await check_syntax(itask, platform, host)
            await remote_init(cache, platform, host)
            await remote_file_install(cache, platform, host)
            await submit_job(cache, itask, platform, host)
            break
        except SSHError:
            # LOG.warning()
            bad_hosts.add(host)
            continue
    else:
        raise PlatformError(f'no hosts available for {platform}')

We can keep batching behaviour by maintaining mappings as we currently do. E.G. the remote-init map logic can be handled like this (note that the caching part of the logic has been removed from the actual implementation of remote-init itself):

async def remote_init(cache, platform, host):                                                                     
     with suppress(KeyError):                                                                                                 
         return await cache.remote_init_cache[platform['install target']]
     coro = _remote_init(platform, host)                                                                            
     cache.remote_init_cache[platform['install target']] = coro
     return await coro      

Batching the job-submit commands is a little funkier, but I think we might be able to do it something like this:

async def submit_job(cache, itask, platform, host):
    """Run the job submission command."""
    key = (platform['name'], host)
    with suppress(KeyError):
        cache.job_submission_queue.append(itask)
        return await cache.job_submission_queue[key]
    cache.job_submission_queue[key] = [itask]

    # wait for other submissions to queue up behind us
    await asyncio.sleep(0)

    # submit the batch
    _submit_jobs(cache.job_submission_queue, platform, host)

So that we can continue to write the code from the perspective of a single job submission, rather than having to define this batching in the top-level code.

Code
import asyncio
from contextlib import suppress
import os
import os.path
import re
import typing as ty
from weakref import WeakValueDictionary

from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
from cylc.flow.parsec.util import (
    pdeepcopy,
    poverride,
)
from cylc.flow.platforms import (
    HOST_REC_COMMAND,
    PLATFORM_REC_COMMAND,
    platform_name_from_job_info2,
)
from cylc.flow.remote import (
    is_remote_host,
)
from cylc.flow.submit.remote import (
    remote_init as _remote_init,
    remote_file_install as _remote_file_install,
)


class SelectCache():

    def __init__(self):
        self.remote_init_cache = {}
        self.remote_file_install_cache = {}
        self.host_selection_batches = WeakValueDictionary()
        self.job_submission_queue = {}
        self.platforms = glbl_cfg().get(['platforms']),


async def remote_init(cache, platform, host):
    with suppress(KeyError):
        return await cache.remote_init_cache[platform['install target']]
    coro = _remote_init(platform, host)
    cache.remote_init_cache[platform['install target']] = coro
    return await coro


async def remote_file_install(cache, platform, host):
    with suppress(KeyError):
        return await cache.remote_file_install_cache[
            platform['install target']
        ]
    coro = _remote_file_install(platform, host)
    cache.remote_file_install_cache[platform['install target']] = coro
    return await coro


async def submit_job(cache, itask, platform, host):
    """Run the job submission command."""
    key = (platform['name'], host)
    with suppress(KeyError):
        cache.job_submission_queue.append(itask)
        return await cache.job_submission_queue[key]
    cache.job_submission_queue[key] = [itask]

    # wait for other submissions to queue up behind us
    await asyncio.sleep(0)

    # submit the batch
    _submit_jobs(cache.job_submission_queue, platform, host)


def _get_rtconfig(itask, broadcast_mgr):
    """Return the runtime config for a task."""
    # Handle broadcasts
    overrides = broadcast_mgr.get_broadcast(
        itask.tokens
    )
    if overrides:
        rtconfig = pdeepcopy(itask.tdef.rtconfig)
        poverride(rtconfig, overrides, prepend=True)
    else:
        rtconfig = itask.tdef.rtconfig
    return rtconfig


def _host_selector(platform_string, bad_hosts):
    """Yields hosts for a platform string.

    Note that platform_string could be a platform_group.

    """
    for platform in get_platform(platform_string):
        for host in get_host(platform, bad_hosts):
            yield (platform, host)


async def eval_host(host_str: str, subprocpool) -> ty.Optional[str]:
    """Evaluate a host from a possible subshell string.

    Args:
        host_str: An explicit host name, a command in back-tick or
            $(command) format, or an environment variable holding
            a hostname.

    Returns 'localhost' if evaluated name is equivalent
    (e.g. localhost4.localdomain4).
    """
    host = await subshell_eval(host_str, HOST_REC_COMMAND, subprocpool)
    if host is not None and not is_remote_host(host):
        return 'localhost'
    return host


async def eval_platform(platform_str: str, subprocpool) -> ty.Optional[str]:
    """Evaluate a platform from a possible subshell string.

    Args:
        platform_str: An explicit platform name, a command in $(command)
            format, or an environment variable holding a platform name.
    """
    return await subshell_eval(platform_str, PLATFORM_REC_COMMAND, subprocpool)


async def subshell_eval(
    eval_str: str,
    command_pattern: re.Pattern,
    subprocpool,
) -> ty.Optional[str]:
    """Evaluate a platform or host from a possible subshell string.

    Arguments:
        eval_str:
            An explicit host/platform name, a command, or an environment
            variable holding a host/patform name.
        command_pattern:
            A compiled regex pattern designed to match subshell strings.

    Return:
        - None if evaluation of command is still taking place.
        - 'localhost' if string is empty/not defined.
        - Otherwise, return the evaluated host/platform name on success.

    Raise PlatformError on error.

    """
    if not eval_str:
        return 'localhost'

    # Host selection command: $(command) or `command`
    match_ = command_pattern.match(eval_str)
    if match_:
        eval_str = await subprocpool.run(
            ['bash', '-c', match_.groups()[1]],
            env=dict(os.environ)
        )

    # Environment variable substitution
    return os.path.expandvars(eval_str)


async def _get_host_host_selector(cache, itask, rtconfig, bad_hosts, subprocpool):
    """"Return a generator which picks hosts a task could submit to."""
    # get the platform expression
    platform_expr = rtconfig['platform'] 
    host_expr = rtconfig['remote']['host']
    if platform_expr and host_expr:
        raise WorkflowConfigError(
            "A mixture of Cylc 7 (host) and Cylc 8 (platform)"
            " logic should not be used. In this case for the task "
            f"\"{itask.identity}\" the following are not compatible:\n"
        )

    # check whether we are currently submitting other jobs for this expression
    key = (platform_expr, host_expr)
    with suppress(KeyError):
        return cache.host_selection_batches[key]

    # evaludate the platform/host expression
    if host_expr:
        host_string = await eval_host(host_expr, subprocpool)
        platform_string = platform_name_from_job_info2(
            cache.platforms,
            host_string,
            rtconfig,
        )
    else:
        platform_string = await eval_platform(platform_expr, subprocpool)

    # return the host selector
    sel = _host_selector(platform_string, bad_hosts)
    # TODO: cache the PlatformError
    cache.host_selection_batches[key] = sel
    return sel


async def submitter(bad_hosts, submission_queue, subprocpool):
    """Job submitter thinggy.

    When you want jobs to be submitted, push the corresponding tasks
    into the submission_queue and lean back, the submitter does the work
    for you.
    """
    cache = SelectCache()
    while True:
        for itask in submission_queue.get()
            try:
                await _submit(cache, itask, bad_hosts, subprocpool)
            except JobSyntaxError:
                pass  # submit-fail
            except PlatformError:
                pass  # submit-fail
            except SubmissionError:
                pass  # submit-fail
            else:
                pass # submitted


async def _submit(cache, itask, bad_hosts, subprocpool):
    """The job submission pipeline for a single task."""
    rtconfig = _get_rtconfig(itask, broadcast_mgr)
    select_host = await _get_host_host_selector(cache, itask, rtconfig, bad_hosts, subprocpool)
    for platform, host in select_host:
        try:
            await check_syntax(itask, platform, host)
            await remote_init(cache, platform, host)
            await remote_file_install(cache, platform, host)
            await submit_job(cache, itask, platform, host)
            break
        except SSHError:
            # LOG.warning()
            bad_hosts.add(host)
            continue
    else:
        raise PlatformError(f'no hosts available for {platform}')

https://github.com/oliver-sanders/cylc-flow/pull/new/async-subproc-pool-early-experiment

oliver-sanders avatar Jul 07 '23 15:07 oliver-sanders

Very nice!

hjoliver avatar Jul 10 '23 06:07 hjoliver