apscheduler icon indicating copy to clipboard operation
apscheduler copied to clipboard

Scale-out AsyncIOScheduler/AsyncIOExecutor to more than 1 CPU

Open b4stien opened this issue 3 years ago • 8 comments

Is your feature request related to a problem? Please describe.

We've been proud users of apscheduler for around four years now, and as my company is growing, so is our usage of apscheduler. Most of our jobs (95%) are IO-bound, and as our codebase is heavily asyncio-oriented we've been successfully using AsyncIOScheduler/AsyncIOExecutor.

4 years later the 5% of the jobs that still need CPU (the overhead around data manipulation mostly) are overflowing a single CPU core (of a Z1D AWS instance) and we need to make adjustments. Hence the question: is there a built-in or an easy way with apscheduler to share the load among multiple core (without hitting the GIL, obv)?

Describe the solution you'd like

Ideally we'd love to use a mix between AsyncIOExecutor and ProcessPoolExecutor, each job being dispatched to a process running its own event loop and reporting to the main process (holding the scheduler) once it's done.

Describe alternatives you've considered

As a quick fix for our problem: We will probably have a list with the jobs we want to run, pre-fork a main process to split this list in N, and give 1/N-th of the list of jobs to each forked process, each having its own instance of apscheduler.

Feel free to redirect me to somewhere else, a previous issue, a doc, a discussion...

b4stien avatar May 24 '21 16:05 b4stien

Hey, did you implement the distributed scheduler architecture you mentioned above? Were there any pitfalls or unexpected bugs you encountered if you did?

RohinBhargava avatar Aug 04 '21 03:08 RohinBhargava

One possibility would be implementing such an executor using AnyIO and its to_process.run_sync() function.

agronholm avatar Aug 04 '21 07:08 agronholm

Hey, did you implement the distributed scheduler architecture you mentioned above? Were there any pitfalls or unexpected bugs you encountered if you did?

We did, no particular roadblock to mention (it's been running in production for the last two months). The main stuff we've identified and focused-on were:

  • Tear-down ceremony for the forked processes to be stopped graciously when you stop the app
  • Start-up ceremony which has to happen in the main process as well as in the forked process, esp for:
    • Error monitoring (we use Sentry, it has to be setup in each process)
    • Global state in general
  • Logging

b4stien avatar Aug 04 '21 09:08 b4stien

Great! Thank you very much for the information

RohinBhargava avatar Aug 04 '21 17:08 RohinBhargava

Great! Thank you very much for the information

You're welcome, apscheduler is a great-rock-solid project, we're happy if we can somewhat contribute back to the community.

I needed, this is a stripped down version of the code we use:

import asyncio
import multiprocessing as mp
import os
import time
import traceback
import typing
import sys

from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from attr import attrs
from apscheduler.events import EVENT_JOB_ERROR
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from more_itertools import divide


def mandatory_setup():
    """Init global state, called pre and post fork."""
    pass


@attrs(frozen=True, kw_only=True, auto_attribs=True)
class Job:
    coroutine: typing.Callable[..., typing.Coroutine]
    trigger: typing.Union[IntervalTrigger, CronTrigger, None]


# This is where you list the jobs to split
APSCHEDULER_JOBS: typing.List[Job] = []


class PreforkedApschedulerRunner(object):
    def __init__(self) -> None:
        mp.set_start_method("spawn")
        self.processes: typing.List[mp.Process] = []

    def analyze_processes(self):
        ok_processes = []
        dead_processes = []
        for p in self.processes:
            if p.is_alive():
                ok_processes.append(p)
            else:
                dead_processes.append(p)
        return ok_processes, dead_processes

    def start(self):
        subprocess_count = max(1, mp.cpu_count() - 1)
        logger.info(f"We'll have {subprocess_count} subprocesses")
        divided_jobs = list(divide(subprocess_count, APSCHEDULER_JOBS))

        for process_index, job_batch in enumerate(divided_jobs):
            p = mp.Process(
                target=run_apscheduler_post_fork,
                kwargs={"process_index": str(process_index), "jobs": list(job_batch)},
            )
            p.start()
            self.processes.append(p)

        while True:
            ok_processes, dead_processes = self.analyze_processes()

            if dead_processes:
                break

            time.sleep(0.5)

        logger.info("Got dead processes, we'll stop")

        self.stop()

        logger.info("Sleeping 10s to give other processes time to stop")

        time.sleep(10)

        ok_processes, dead_processes = self.analyze_processes()

        if ok_processes:
            logger.info("There are still ok processes, time to kill")
            for p in ok_processes:
                p.kill()
        else:
            logger.info("Ok we're good")

        logger.info("Going away")
        sys.exit(1)

    def stop(self):
        ok_processes, _ = self.analyze_processes()
        for p in ok_processes:
            p.terminate()


async def subworker_pre_run():
    """Some more global state with the event loop."""
    pass


def run_apscheduler_post_fork(*, process_index, jobs: typing.List[Job]):
    mandatory_setup()

    add_logging_context(process_index=process_index)
    logger.info(f"Booting-up sub-worker pid #{os.getpid()}")

    loop = asyncio.get_event_loop()
    loop.run_until_complete(subworker_pre_run())
    scheduler = AsyncIOScheduler(
        logger=get_logger("apscheduler", level=logging.WARNING),
        job_defaults={"misfire_grace_time": 5},
    )

    def exception_listener(event):
        exc_tuple = (
            event.exception.__class__,
            event.exception,
            event.exception.__traceback__,
        )
        # Enable Sentry here
        # capture_exception(exc_tuple)
        traceback.print_exception(*exc_tuple)

    scheduler.add_listener(exception_listener, EVENT_JOB_ERROR)
    scheduler.start()

    for job in jobs:
        scheduler.add_job(job.coroutine, trigger=job.trigger)

    loop.run_forever()


def main():
    mandatory_setup()

    add_logging_context(process_index="main")

    preforked_apscheduler_runner = PreforkedApschedulerRunner()
    try:
        preforked_apscheduler_runner.start()
    except KeyboardInterrupt:
        print("Shutdown requested...exiting")
        preforked_apscheduler_runner.stop()
        sys.exit(0)
    except Exception:
        traceback.print_exc(file=sys.stdout)
        sys.exit(1)

b4stien avatar Aug 05 '21 08:08 b4stien

You're welcome, apscheduler is a great-rock-solid project

Haha, I wish I could believe that myself :smirk: Hopefully 4.0 will turn out to be more like that, but progress has been much slower than I anticipated.

agronholm avatar Aug 05 '21 09:08 agronholm

@agronholm @b4stien A bit late to the party but have you taken a look at aiomultiprocess. Perhaps that could be a feasible solution to scale out AsyncIOScheduler?

NotoriousRebel avatar Oct 24 '21 01:10 NotoriousRebel

Somebody could try and make an APScheduler executor (worker in 4.0 lingo) using that.

agronholm avatar Oct 24 '21 09:10 agronholm

On 4.0, you can now run multiple schedulers against the same data store now and achieve scalability that way.

agronholm avatar Oct 01 '23 21:10 agronholm