fastapi icon indicating copy to clipboard operation
fastapi copied to clipboard

how can i run scheduling tasks using fastapi's

Open nishtha03 opened this issue 4 years ago • 29 comments

Description

How can I [...]?

Is it possible to [...]?

Additional context Add any other context or screenshots about the feature request here.

nishtha03 avatar Sep 11 '19 06:09 nishtha03

it's not strictly speaking a FastAPI question but here are some options, find what works best for you

the best without any doubt :grin: https://linux.die.net/man/5/crontab

some python tools > https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html https://docs.python.org/3/library/sched.html https://github.com/dbader/schedule https://github.com/Bogdanp/dramatiq/tree/master/examples/scheduling

euri10 avatar Sep 11 '19 07:09 euri10

yes those methods I know but it is not working with an async function

nishtha03 avatar Sep 11 '19 07:09 nishtha03

https://github.com/aio-libs/aiojobs maybe ? never tried it

euri10 avatar Sep 11 '19 07:09 euri10

APScheduler has an asyncio implementation https://apscheduler.readthedocs.io/en/latest/

Example in their git https://github.com/agronholm/apscheduler/blob/master/examples/schedulers/asyncio_.py

steinitzu avatar Sep 14 '19 19:09 steinitzu

hey I am able to run background task but my unicorn server has stopped..how can I run both servers simultaneously

nishtha03 avatar Sep 17 '19 09:09 nishtha03

Thus far, the best task queue to use w/ FastAPI has been @samuelcolvin's Arq. It's really good. You should use it. https://github.com/samuelcolvin/arq

He's also the guy behind Pydantic.

leosussan avatar Oct 23 '19 19:10 leosussan

Thanks for all the help @euri10, @steinitzu, @leosussan!

@nishtha03 you can try the project generation templates, those include Celery and how to set everything up with Docker.

But if you can implement it yourself, I would definitely try Leo's suggestion and try Arq. I just haven't been able to try it myself, but it seems great. And Arq's architecture is very well thought, it allows you to distribute tasks and workers without requiring you to install all the dependencies everywhere.

tiangolo avatar Oct 26 '19 19:10 tiangolo

@tiangolo: Celery looks nice, though I notice how it does not support Windows (which wouldn't normally bother me, but you can't always decide what your application is going to be deployed on). Having something that can integrate with the web server to some degree (unlike cronjobs, systemd timers and Windows Scheduled Tasks) has some degree of conveinience if, say, some database is meant to be populated daily by a scheduled task and the server needs to be able to tell whether the task has already been run or not.

ARQ and APScheduler both look promising, though (thanks everyone for the reccomendations); it might be worth giving them a mention in the docs. I feel like saying "Just use Celery if the BackgroundTasks don't cut it for you" is a pretty tall step to climb and misses a lot of the intermediate solutions mentionned here.

sm-Fifteen avatar Nov 22 '19 19:11 sm-Fifteen

Good point about Celery for Windows @sm-Fifteen .

Well, in fact, I have been wanting to try ARQ, and replace Celery in the project generators with it. And then suggest ARQ everywhere, in the end, it uses Pydantic, so it would probably be the best match for FastAPI too... :tada:

I first have to try it out, and I wanted to suggest (in a PR) to handle normal def functions in a threadpool automatically (as it's done in FastAPI), that could help making it easier to adopt (and similar).

As ARQ is based on names (strings) instead of the function objects, it's the perfect match for modern distributed systems (e.g. Docker containers) that don't need to have everything installed in all the (possibly distributed) components. That's a huge advantage, for example when compared to the default model in Celery, RQ, Dramatiq, Huey. :sparkles:

The other "important" feature from other tools is an admin/monitor UI. As Celery's Flower or RQ's rq-dashboard. But maybe we can build a FastAPI with a frontend for that :smile: :man_shrugging:

Hopefully, I'll be able to try all that soon :crossed_fingers:

@nishtha03 did you solve your problem?

tiangolo avatar Feb 10 '20 18:02 tiangolo

To add to this - I added Arq to my FastAPI project template, incase y'all want an example to look at: https://github.com/leosussan/fastapi-gino-arq-uvicorn

leosussan avatar Feb 10 '20 18:02 leosussan

https://github.com/samuelcolvin/arq/issues/120

Very happy to consider a monitor, but personally I'd recommend this takes the form of an API that existing monitoring tools can use; perhaps with a simple dashboard.

samuelcolvin avatar Feb 10 '20 19:02 samuelcolvin

I found a package fastapi-utils supporting repeated tasks

EqualMa avatar Mar 14 '20 15:03 EqualMa

Thanks for the discussion here everyone!

Does that solve your question @nishtha03 ? May we close this issue?

tiangolo avatar Apr 12 '20 15:04 tiangolo

@tiangolo I'm giving a try

To add to this - I added Arq to my FastAPI project template, incase y'all want an example to look at: https://github.com/leosussan/fastapi-gino-arq-uvicorn

I'm giving it a try now...Thanks for contribution.

codesutras avatar Apr 20 '20 19:04 codesutras

Thanks for the discussion here everyone!

Does that solve your question @nishtha03 ? May we close this issue?

This topic discussion is damn important. Working on the solution given by @leosussan . Probably, I'll bump here again. In case of any issue :)

codesutras avatar Apr 20 '20 19:04 codesutras

I realize I can run an async future with

app = FastAPI(title='TestAPI', on_startup=async_func())

How can I add a non-async blocking function to the worker from there? I just want to run a couple of batch files every night and process the output.

subprocess.run('my_file.exe')

will block for some time so I can't call it directly.

spacemanspiff2007 avatar Apr 23 '20 08:04 spacemanspiff2007

I realize I can run an async future with

app = FastAPI(title='TestAPI', on_startup=async_func())

How can I add a non-async blocking function to the worker from there? I just want to run a couple of batch files every night and process the output.

subprocess.run('my_file.exe')

will block for some time so I can't call it directly.

See subprocess.popen instead of subprocess.run

hantusk avatar May 05 '20 11:05 hantusk

It I want to process the result it will block, too. I was looking for a way to run a function in the threadpool or however it is implemented in FastAPI

spacemanspiff2007 avatar May 05 '20 11:05 spacemanspiff2007

@spacemanspiff2007 You can run a function in the same thread pool used by Starlette/FastAPI using fastapi_utils.tasks.repeat_every by @dmontagu.

With this example.py

import logging
import time
from fastapi import FastAPI
from fastapi_utils.tasks import repeat_every

logger = logging.getLogger(__name__)
app = FastAPI()
counter = 0

@app.get('/')
def hello():
    return 'Hello'

@app.on_event("startup")
@repeat_every(seconds=1, logger=logger, wait_first=True)
def periodic():
    global counter
    print('counter is', counter)
    counter += 1

you get

$ uvicorn example:app
INFO:     Started server process [85016]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
counter is 0
counter is 1
counter is 2
counter is 3

The implementation is quite simple (source here): it runs the given function periodically, using Starlette's starlette.concurrency.run_in_threadpool. This is precisely how Starlette and FastAPI run background tasks (source here).

But, since this is the same thread pool used to serve requests (of endpoints defined with def), if your task is heavy you may want to run it in a separate process. For example,

  • You could start a separate process with subprocess.Popen and periodically check its status from FastAPI's thread pool using repeat_every (this could become messy when you have many tasks to check upon);
  • You could use a task queue like Celery or Arq, which run as a separate process (or many processes if you use multiple workers). Celery has periodic tasks but honestly, if all you want is to run a batch script, crond should do.

paolieri avatar Jul 31 '20 23:07 paolieri

Thank you very much for this detailed answer! starlette.concurrency.run_in_threadpool will definitely do the job!

spacemanspiff2007 avatar Aug 04 '20 12:08 spacemanspiff2007

@tiangolo I'd recommend to consider integrating the Advanced Python Scheduler. In my experience it offers a lot of features and is very stable/works very well. It offers a lot of schedulers, job stores, executors and triggers, has support for event listeners and is extensible. Consequently APScheduler is the go-to integration for flask flask-apscheduler and django django-apscheduler as well. The test coverage of 94% is very good as well.

fkromer avatar Oct 26 '20 16:10 fkromer

I agree with @fkromer

I have bein using APScheduler for scheduled tasks.

There are really good features such as being able to see when the next schedule runs, cancelling schedules without rebooting the FastAPI Process.

I have been pinning certain endpoints to the schedule and fixing the IDs so it doesn't get scheduled twice.

This can be updated to be a query param or route.

Also it would be worthwhile persisting the schedules in some way as every time the FastAPI service restarts it doesn't automatically pick it up.

Here is some basic boostrap code.

Create Schedule Object:

from apscheduler.schedulers.asyncio import AsyncIOScheduler

Schedule = AsyncIOScheduler()
Schedule.start()

View All Scheduled Jobs:

@app.get("/schedule/show_schedules/",tags=["schedule"])
async def get_scheduled_syncs():
    schedules = []
    for job in Schedule.get_jobs():
        schedules.append({"Name": str(job.id), "Run Frequency": str(job.trigger), "Next Run": str(job.next_run_time)})
    return schedules

Add A Scheduled Job

@app.post("/schedule/schedule1/",tags=["schedule"])
async def add_chat_job_to_scheduler(time_in_seconds:int=300):
    schedule1_job = Schedule.add_job(schedule1_function, 'interval', seconds=time_in_seconds,id="schedule1")
    return {"Scheduled":True,"JobID":schedule1_job.id}

@app.post("/schedule/schedule2/",tags=["schedule"])
async def add_schedule2(time_in_hours:int=24):
    schedule2_job = Schedule.add_job(schedule2_function,, 'interval', hours=time_in_hours,id="schedule2")
    return {"Scheduled":True,"JobID":schedule2_job.id}

Delete a scheduled Job

@app.delete("/schedule/schedule1/",tags=["schedule"])
async def remove_schedule1():
    Schedule.remove_job("schedule1")
    return {"Scheduled":False,"JobID":"schedule1"}

@app.delete("/schedule/schedule2/",tags=["schedule"])
async def remove_schedule2(time_in_hours:int=24):
    Schedule.remove_job("schedule2")
    return {"Scheduled":False,"JobID":"schedule2"}

cryptoroo avatar Oct 27 '20 04:10 cryptoroo

I was using the schedule functionality to do SSL Checks. The article and code can be found here. https://ahaw021.medium.com/scheduled-jobs-with-fastapi-and-apscheduler-5a4c50580b0e. Hopefully it helps others with similar problems.

cryptoroo avatar Nov 10 '20 14:11 cryptoroo

@fkromer @cryptoroo I tried AsyncIOScheduler, but if you work with gunicorn there is a problem. Gunicorn runs X processes with fastapi so you have X schedulers instead of 1.

kob22 avatar Feb 05 '21 11:02 kob22

I'm currently using Arq workers in my FastAPI app for scheduled jobs, based on https://github.com/samuelcolvin/arq/issues/182

Example code:

example_main.py

import logging
from fastapi import FastAPI

from example_scheduler import arq_worker

logger = logging.getLogger(__name__)
app = FastAPI()

@app.get("/")
async def root():
    await arq_worker.worker.pool.enqueue_job("hello_world")
    return "Welcome to root"

@app.on_event("startup")
async def startup_event():
    await arq_worker.start(handle_signals=False)

@app.on_event("shutdown")
async def shutdown_event():
    await arq_worker.close()

def run_dev_server():
    logger.info("Running Uvicorn without Gunicorn and with reloading")
    import uvicorn

    app_module = "example_main:app"
    uvicorn.run(
        app_module,
        reload=True,
    )

if __name__ == "__main__":
    run_dev_server()

example_scheduler.py

import logging
import asyncio
from arq.worker import create_worker
from arq.connections import RedisSettings
from arq import cron

logger = logging.getLogger(__name__)

async def hello_world(ctx):
    job_id = ctx["job_id"]
    print(f"hello world ({job_id})")

class ArqWorkerSettings:
    functions = [hello_world]
    # TODO: Issue with unique cron jobs and multiple arq workers: https://github.com/samuelcolvin/arq/issues/196
    cron_jobs = [
        cron(hello_world, second=1, unique=True)
    ]
    redis_settings = RedisSettings()

class ArqWorker:
    def __init__(self):
        self.worker = None
        self.task = None

    async def start(self, **kwargs):
        self.worker = create_worker(ArqWorkerSettings, **kwargs)
        self.task = asyncio.create_task(self.worker.async_run())

    async def close(self):
        await self.worker.close()

arq_worker = ArqWorker()

havardthom avatar Feb 15 '21 13:02 havardthom

@kob22 -- have a look at fixes here: https://stackoverflow.com/questions/16053364/make-sure-only-one-worker-launches-the-apscheduler-event-in-a-pyramid-web-app-ru

cryptoroo avatar Feb 16 '21 00:02 cryptoroo

@tiangolo suggestion to close this issue (or move it to a discussion). This is probably more of a discussion on how to approach task schedule (Celery, Arq, RQ etc all do the job), I imagine it won't be part of the core code base for now.

ccrvlh avatar Dec 08 '21 22:12 ccrvlh

If you want a lightweight asyncio wrapper around the very nice croniter library, I'll drop my lib here acron

thomascellerier avatar Aug 19 '22 13:08 thomascellerier

So I just found out there is a decorator and it works even with async functions.

$ python -m pip install fastapi_utils
from fastapi_utils.tasks import repeat_every
...

@app.get("/videos")
async def get_videos(name: str = "") -> dict[str, int | Videos]:
    """Get videos from fixed URL endpoint."""
	...
    return {"count": count, "videos": videos}


@repeat_every(seconds=60 * 60)  # 1 hour
async def save_videos_periodically():
    """Save videos (JSON response from remote API) to database periodically."""
    data = await get_videos()
    videos = data["videos"]
    save_videos(videos)

SonGokussj4 avatar Sep 15 '22 16:09 SonGokussj4

the reason periodic tasks are not supported by most web frameworks is because server process are spawned in a "stateless" way, so if one of your process spawns a scheduled task, how do you guarantee that other worker process didn't also start the same task? To de-duplicate and make sure tasks are fired only once, the cluster must be made aware of all worker's state. This is far beyond the scope of conventional web framework. Thus I suggest we close this issue.

lambdaq avatar Oct 19 '22 09:10 lambdaq