fastapi
fastapi copied to clipboard
how can i run scheduling tasks using fastapi's
Description
How can I [...]?
Is it possible to [...]?
Additional context Add any other context or screenshots about the feature request here.
it's not strictly speaking a FastAPI question but here are some options, find what works best for you
the best without any doubt :grin: https://linux.die.net/man/5/crontab
some python tools > https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html https://docs.python.org/3/library/sched.html https://github.com/dbader/schedule https://github.com/Bogdanp/dramatiq/tree/master/examples/scheduling
yes those methods I know but it is not working with an async function
https://github.com/aio-libs/aiojobs maybe ? never tried it
APScheduler has an asyncio implementation https://apscheduler.readthedocs.io/en/latest/
Example in their git https://github.com/agronholm/apscheduler/blob/master/examples/schedulers/asyncio_.py
hey I am able to run background task but my unicorn server has stopped..how can I run both servers simultaneously
Thus far, the best task queue to use w/ FastAPI has been @samuelcolvin's Arq. It's really good. You should use it. https://github.com/samuelcolvin/arq
He's also the guy behind Pydantic.
Thanks for all the help @euri10, @steinitzu, @leosussan!
@nishtha03 you can try the project generation templates, those include Celery and how to set everything up with Docker.
But if you can implement it yourself, I would definitely try Leo's suggestion and try Arq. I just haven't been able to try it myself, but it seems great. And Arq's architecture is very well thought, it allows you to distribute tasks and workers without requiring you to install all the dependencies everywhere.
@tiangolo: Celery looks nice, though I notice how it does not support Windows (which wouldn't normally bother me, but you can't always decide what your application is going to be deployed on). Having something that can integrate with the web server to some degree (unlike cronjobs, systemd timers and Windows Scheduled Tasks) has some degree of conveinience if, say, some database is meant to be populated daily by a scheduled task and the server needs to be able to tell whether the task has already been run or not.
ARQ and APScheduler both look promising, though (thanks everyone for the reccomendations); it might be worth giving them a mention in the docs. I feel like saying "Just use Celery if the BackgroundTask
s don't cut it for you" is a pretty tall step to climb and misses a lot of the intermediate solutions mentionned here.
Good point about Celery for Windows @sm-Fifteen .
Well, in fact, I have been wanting to try ARQ, and replace Celery in the project generators with it. And then suggest ARQ everywhere, in the end, it uses Pydantic, so it would probably be the best match for FastAPI too... :tada:
I first have to try it out, and I wanted to suggest (in a PR) to handle normal def
functions in a threadpool automatically (as it's done in FastAPI), that could help making it easier to adopt (and similar).
As ARQ is based on names (strings) instead of the function objects, it's the perfect match for modern distributed systems (e.g. Docker containers) that don't need to have everything installed in all the (possibly distributed) components. That's a huge advantage, for example when compared to the default model in Celery, RQ, Dramatiq, Huey. :sparkles:
The other "important" feature from other tools is an admin/monitor UI. As Celery's Flower or RQ's rq-dashboard
. But maybe we can build a FastAPI with a frontend for that :smile: :man_shrugging:
Hopefully, I'll be able to try all that soon :crossed_fingers:
@nishtha03 did you solve your problem?
To add to this - I added Arq to my FastAPI project template, incase y'all want an example to look at: https://github.com/leosussan/fastapi-gino-arq-uvicorn
https://github.com/samuelcolvin/arq/issues/120
Very happy to consider a monitor, but personally I'd recommend this takes the form of an API that existing monitoring tools can use; perhaps with a simple dashboard.
I found a package fastapi-utils supporting repeated tasks
Thanks for the discussion here everyone!
Does that solve your question @nishtha03 ? May we close this issue?
@tiangolo I'm giving a try
To add to this - I added Arq to my FastAPI project template, incase y'all want an example to look at: https://github.com/leosussan/fastapi-gino-arq-uvicorn
I'm giving it a try now...Thanks for contribution.
Thanks for the discussion here everyone!
Does that solve your question @nishtha03 ? May we close this issue?
This topic discussion is damn important. Working on the solution given by @leosussan . Probably, I'll bump here again. In case of any issue :)
I realize I can run an async future with
app = FastAPI(title='TestAPI', on_startup=async_func())
How can I add a non-async blocking function to the worker from there? I just want to run a couple of batch files every night and process the output.
subprocess.run('my_file.exe')
will block for some time so I can't call it directly.
I realize I can run an async future with
app = FastAPI(title='TestAPI', on_startup=async_func())
How can I add a non-async blocking function to the worker from there? I just want to run a couple of batch files every night and process the output.
subprocess.run('my_file.exe')
will block for some time so I can't call it directly.
See subprocess.popen instead of subprocess.run
It I want to process the result it will block, too. I was looking for a way to run a function in the threadpool or however it is implemented in FastAPI
@spacemanspiff2007 You can run a function in the same thread pool used by Starlette/FastAPI using fastapi_utils.tasks.repeat_every by @dmontagu.
With this example.py
import logging
import time
from fastapi import FastAPI
from fastapi_utils.tasks import repeat_every
logger = logging.getLogger(__name__)
app = FastAPI()
counter = 0
@app.get('/')
def hello():
return 'Hello'
@app.on_event("startup")
@repeat_every(seconds=1, logger=logger, wait_first=True)
def periodic():
global counter
print('counter is', counter)
counter += 1
you get
$ uvicorn example:app
INFO: Started server process [85016]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
counter is 0
counter is 1
counter is 2
counter is 3
The implementation is quite simple (source here): it runs the given function periodically, using Starlette's starlette.concurrency.run_in_threadpool
. This is precisely how Starlette and FastAPI run background tasks (source here).
But, since this is the same thread pool used to serve requests (of endpoints defined with def
), if your task is heavy you may want to run it in a separate process. For example,
- You could start a separate process with
subprocess.Popen
and periodically check its status from FastAPI's thread pool usingrepeat_every
(this could become messy when you have many tasks to check upon); - You could use a task queue like Celery or Arq, which run as a separate process (or many processes if you use multiple workers). Celery has periodic tasks but honestly, if all you want is to run a batch script,
crond
should do.
Thank you very much for this detailed answer!
starlette.concurrency.run_in_threadpool
will definitely do the job!
@tiangolo I'd recommend to consider integrating the Advanced Python Scheduler. In my experience it offers a lot of features and is very stable/works very well. It offers a lot of schedulers, job stores, executors and triggers, has support for event listeners and is extensible. Consequently APScheduler is the go-to integration for flask flask-apscheduler and django django-apscheduler as well. The test coverage of 94% is very good as well.
I agree with @fkromer
I have bein using APScheduler for scheduled tasks.
There are really good features such as being able to see when the next schedule runs, cancelling schedules without rebooting the FastAPI Process.
I have been pinning certain endpoints to the schedule and fixing the IDs so it doesn't get scheduled twice.
This can be updated to be a query param or route.
Also it would be worthwhile persisting the schedules in some way as every time the FastAPI service restarts it doesn't automatically pick it up.
Here is some basic boostrap code.
Create Schedule Object:
from apscheduler.schedulers.asyncio import AsyncIOScheduler
Schedule = AsyncIOScheduler()
Schedule.start()
View All Scheduled Jobs:
@app.get("/schedule/show_schedules/",tags=["schedule"])
async def get_scheduled_syncs():
schedules = []
for job in Schedule.get_jobs():
schedules.append({"Name": str(job.id), "Run Frequency": str(job.trigger), "Next Run": str(job.next_run_time)})
return schedules
Add A Scheduled Job
@app.post("/schedule/schedule1/",tags=["schedule"])
async def add_chat_job_to_scheduler(time_in_seconds:int=300):
schedule1_job = Schedule.add_job(schedule1_function, 'interval', seconds=time_in_seconds,id="schedule1")
return {"Scheduled":True,"JobID":schedule1_job.id}
@app.post("/schedule/schedule2/",tags=["schedule"])
async def add_schedule2(time_in_hours:int=24):
schedule2_job = Schedule.add_job(schedule2_function,, 'interval', hours=time_in_hours,id="schedule2")
return {"Scheduled":True,"JobID":schedule2_job.id}
Delete a scheduled Job
@app.delete("/schedule/schedule1/",tags=["schedule"])
async def remove_schedule1():
Schedule.remove_job("schedule1")
return {"Scheduled":False,"JobID":"schedule1"}
@app.delete("/schedule/schedule2/",tags=["schedule"])
async def remove_schedule2(time_in_hours:int=24):
Schedule.remove_job("schedule2")
return {"Scheduled":False,"JobID":"schedule2"}
I was using the schedule functionality to do SSL Checks. The article and code can be found here. https://ahaw021.medium.com/scheduled-jobs-with-fastapi-and-apscheduler-5a4c50580b0e. Hopefully it helps others with similar problems.
@fkromer @cryptoroo I tried AsyncIOScheduler, but if you work with gunicorn there is a problem. Gunicorn runs X processes with fastapi so you have X schedulers instead of 1.
I'm currently using Arq workers in my FastAPI app for scheduled jobs, based on https://github.com/samuelcolvin/arq/issues/182
Example code:
example_main.py
import logging
from fastapi import FastAPI
from example_scheduler import arq_worker
logger = logging.getLogger(__name__)
app = FastAPI()
@app.get("/")
async def root():
await arq_worker.worker.pool.enqueue_job("hello_world")
return "Welcome to root"
@app.on_event("startup")
async def startup_event():
await arq_worker.start(handle_signals=False)
@app.on_event("shutdown")
async def shutdown_event():
await arq_worker.close()
def run_dev_server():
logger.info("Running Uvicorn without Gunicorn and with reloading")
import uvicorn
app_module = "example_main:app"
uvicorn.run(
app_module,
reload=True,
)
if __name__ == "__main__":
run_dev_server()
example_scheduler.py
import logging
import asyncio
from arq.worker import create_worker
from arq.connections import RedisSettings
from arq import cron
logger = logging.getLogger(__name__)
async def hello_world(ctx):
job_id = ctx["job_id"]
print(f"hello world ({job_id})")
class ArqWorkerSettings:
functions = [hello_world]
# TODO: Issue with unique cron jobs and multiple arq workers: https://github.com/samuelcolvin/arq/issues/196
cron_jobs = [
cron(hello_world, second=1, unique=True)
]
redis_settings = RedisSettings()
class ArqWorker:
def __init__(self):
self.worker = None
self.task = None
async def start(self, **kwargs):
self.worker = create_worker(ArqWorkerSettings, **kwargs)
self.task = asyncio.create_task(self.worker.async_run())
async def close(self):
await self.worker.close()
arq_worker = ArqWorker()
@kob22 -- have a look at fixes here: https://stackoverflow.com/questions/16053364/make-sure-only-one-worker-launches-the-apscheduler-event-in-a-pyramid-web-app-ru
@tiangolo suggestion to close this issue (or move it to a discussion). This is probably more of a discussion on how to approach task schedule (Celery, Arq, RQ etc all do the job), I imagine it won't be part of the core code base for now.
If you want a lightweight asyncio wrapper around the very nice croniter library, I'll drop my lib here acron
So I just found out there is a decorator and it works even with async functions.
$ python -m pip install fastapi_utils
from fastapi_utils.tasks import repeat_every
...
@app.get("/videos")
async def get_videos(name: str = "") -> dict[str, int | Videos]:
"""Get videos from fixed URL endpoint."""
...
return {"count": count, "videos": videos}
@repeat_every(seconds=60 * 60) # 1 hour
async def save_videos_periodically():
"""Save videos (JSON response from remote API) to database periodically."""
data = await get_videos()
videos = data["videos"]
save_videos(videos)
the reason periodic tasks are not supported by most web frameworks is because server process are spawned in a "stateless" way, so if one of your process spawns a scheduled task, how do you guarantee that other worker process didn't also start the same task? To de-duplicate and make sure tasks are fired only once, the cluster must be made aware of all worker's state. This is far beyond the scope of conventional web framework. Thus I suggest we close this issue.