blog
blog copied to clipboard
Python 定时任务框架 APScheduler
环境:
- Python 3.7.3
- APScheduler 3.6.3
- MySQL 5.7.28
参考:
- 官方文档:https://apscheduler.readthedocs.io/en/latest/index.html
- API 参考:https://apscheduler.readthedocs.io/en/stable/py-modindex.html
- 官方 examples:https://github.com/agronholm/apscheduler/tree/master/examples
- APScheduler 参数或方法(例如页面内搜索查看 misfire_grace_time, replace_existing 参数的作用):https://apscheduler.readthedocs.io/en/stable/modules/schedulers/base.html
一、简单的定时任务
使用 while 死循环
while True:
if (满足定时条件):
do something
break # 中断,执行一次即可,否则会无限执行
else:
time.sleep(60) # 等待60秒(每60秒轮询一次)
# 测试轮询时间间隔对 CPU 占用率的影响
import time
import random
while True:
random_int = random.randint(0,1000000)
if (random_int == 1):
print("stop when random_int is 1")
else:
time.sleep(0.002)
二、Python 定时任务框架选型
- [ ] GitHub Star 14.6k 适合中大型项目的分布式任务队列(易与Django集成-可视化,可拓展性强):https://github.com/celery/celery
- [ ] GitHub Star 7.4k schedule库使用简单,但无法动态添加任务、持久化任务:https://github.com/dbader/schedule
- [ ] GitHub Star 4.3k Django 官方的,提供 WebSocket 支持和处理任务队列:https://github.com/django/channels
- [x] GitHub Star 2.5k 适合中小型项目的定时任务:https://github.com/agronholm/apscheduler
附:可以与 Django 框架配合使用的定时任务框架(不在本文介绍):
- [ ] GitHub Star 204 APScheduler for Django,不建议使用。主要是在 Django Admin 页面多了一个任务执行进度情况的可视化界面,作用不大,而且会在数据库中不断自动生成任务执行的数据记录:https://github.com/jarekwg/django-apscheduler
- [ ] GitHub Star 598 :https://github.com/kraiz/django-crontab
注:GitHub Star 数量统计日期为 2020-03-13
三、安装 APScheduler
这里安装的是最新版(截止 2020-03-14 ):apscheduler-3.6.3
版本对比:https://apscheduler.readthedocs.io/en/stable/migration.html
The 3.0 series is API incompatible(不兼容) with previous releases due to a design overhaul(大修).
$ pip install apscheduler
# 或者:
$ conda install apscheduler # 如果安装了 Miniconda 或 Anaconda
四、APScheduler 基本概念
The development of APScheduler was heavily influenced by the Quartz task scheduler written in Java. APScheduler provides most of the major features that Quartz does, but it also provides features not present in Quartz (such as multiple job stores).
Advanced Python Scheduler (APScheduler) is a Python library that lets you schedule your Python code to be executed(执行) later, either just once or periodically(定期).
APScheduler 的 4 个组件:
-
触发器(triggers):包含调度逻辑,描述一个任务何时被触发,按日期、或者时间间隔、或者 Cron 表达式三种方式触发。
-
作业存储器(job stores):指定作业被存放的位置,默认情况下作业保存在内存,也可将作业保存在各种数据库中。当作业被存放在数据库中时,它会被序列化,当被重新加载时会反序列化。除了下面 2 种常用的 store ,还提供 mongodb 和 redis 等 store 方式。
- MemoryJobStore (default,内存)
- SQLAlchemyJobStore (如 SQLite、MySQL)
-
执行器(executors):将指定的作业(调用函数)提交到线程池或进程池中运行,当任务完成时,执行器通知调度器触发相应的事件。 the default
ThreadPoolExecutor
should be good enough for most purposes(目的). If your workload involves(涉及) CPU intensive(密集的) operations, you should consider usingProcessPoolExecutor
instead to make use of multiple CPU cores. You could even use both at once, adding the process pool executor as a secondary(次要的,辅助) executor. Python 线程/进程可参考:Python 多线程与 GIL- ThreadPoolExecutor (default,线程)
- ProcessPoolExecutor(进程)
-
调度器(schedulers):任务调度器,属于控制角色,通过它配置作业存储器、执行器和触发器,添加、修改和删除任务。调度器协调触发器、作业存储器、执行器的运行,通常只有一个调度程序运行在应用程序中,开发人员通常不需要直接处理作业存储器、执行器或触发器,配置作业存储器和执行器是通过调度器来完成的。
常用的调度器:
- BlockingScheduler(作为独立进程时使用)
- BackgroundScheduler(在应用程序后台运行时使用,可在 Django、Flask 等框架中使用)
调度器的工作流程:
五、APScheduler 简单实例
不对 job stores 和 executors 进行配置,即使用默认配置:MemoryJobStore (内存)和 ThreadPoolExecutor (线程)。
apscheduler_interval_BlockingScheduler.py
# 官方实例参考: https://github.com/agronholm/apscheduler/blob/master/examples/schedulers/blocking.py
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
def job_function():
print(datetime.utcnow().strftime("%H:%M:%S") + " Hello World")
sched = BlockingScheduler()
# Schedule job_function to be called every two seconds
sched.add_job(job_function, 'interval', seconds=2)
# The scheduled_job() decorator works nicely too:
@sched.scheduled_job('interval', id='my_job_id', seconds=2)
# The id argument must be given if scheduling a job in a persistent(持久的) job
def job_function_02():
print("Goodbye World")
# sched.start()
try:
sched.start()
except (KeyboardInterrupt, SystemExit):
# pass
sched.shutdown()
apscheduler_date_BlockingScheduler.py
from datetime import date
from apscheduler.schedulers.blocking import BlockingScheduler
sched = BlockingScheduler()
def my_job(text):
print(text)
# The job will be executed on November 6th, 2009
# sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text'])
# You can specify(指定) the exact(精确) time when the job should be run:
sched.add_job(my_job, 'date', run_date='2020-03-14 23:17:58', args=['Hello World'])
sched.start()
apscheduler_interval_BackgroundScheduler.py
# 本实例源码地址:https://github.com/agronholm/apscheduler/blob/master/examples/schedulers/background.py
"""
Demonstrates(展示) how to use the background scheduler to
schedule(调度) a job that executes on 3 second intervals.
"""
from datetime import datetime
import time
import os
from apscheduler.schedulers.background import BackgroundScheduler
def tick():
print('Tick! The time is: %s' % datetime.now())
if __name__ == '__main__':
scheduler = BackgroundScheduler()
scheduler.add_job(tick, 'interval', seconds=3)
scheduler.start()
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
try:
# This is here to simulate(模拟) application activity (which keeps the main thread alive).
while True:
time.sleep(2)
except (KeyboardInterrupt, SystemExit):
# Not strictly necessary if daemonic mode is enabled but should be done if possible
scheduler.shutdown()
六、APScheduler with MySQL 实例
参考:
-
官方 SQLAlchemyJobStore example:https://github.com/agronholm/apscheduler/blob/master/examples/jobstores/sqlalchemy_.py
-
关于时区问题,参考:Django Time zones(使用 pytz)
-
此处使用的 Python 驱动 MySQL 的驱动程序是 PyMySQL 。配置参考:https://docs.sqlalchemy.org/en/13/dialects/mysql.html#module-sqlalchemy.dialects.mysql.pymysql
# url Connect String 语法: mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] # url Connect String 实例: url='mysql+pymysql://root:Pwd...123456@localhost:3306/dbname?charset=utf8'
1. 实例 - 配置:
apscheduler_config.py
# import pymysql # Python 驱动 MySQL 的驱动程序
# Django 时区支持模式使用 pytz ,在安装 Django 的时候就已经安装好它了。
# 时区支持模式默认是关闭的,如果要启用它,在配置文件里设置 USE_TZ = True 。
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
jobstores = {
# 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
'default': SQLAlchemyJobStore(
# 将在数据库中创建一个名为 apscheduler_jobs 的数据表,含有字段:id, next_run_time, job_state
url='mysql+pymysql://root:Pwd...123456@localhost:3306/dbname?charset=utf8'
)
}
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
scheduler = BackgroundScheduler(
jobstores=jobstores,
executors=executors,
job_defaults=job_defaults,
# timezone=utc,
timezone='Asia/Shanghai',
)
关于 job_defaults
配置,参数说明:
参考(注意这个链接的 APScheduler 版本为 2 ):https://apscheduler.readthedocs.io/en/v2.1.2/#job-options 参考:https://apscheduler.readthedocs.io/en/stable/modules/schedulers/base.html
-
coalesce
coalesce 合并。Run once instead of many times if the scheduler determines(确定) that the job should be run more than once in succession(连续).
当由于某种原因导致某个 job 积攒了好几次没有实际运行(比如说系统挂了 5 分钟后恢复,有一个任务是每分钟跑一次的,按道理说这 5 分钟内本来是“计划”运行 5 次的,但实际没有执行)。如果 coalesce 为 True ,下次这个 job 被 submit 给 executor 时,只会执行 1 次,也就是最后这次。如果为 False ,那么会执行5次(不一定,因为还有其他条件,看
misfire_grace_time
的解释)。 -
misfire_grace_time
Time in seconds(以秒为准) that the job is allowed to miss the the designated(指定的) run time before being considered to have misfired.
参考:https://apscheduler.readthedocs.io/en/stable/userguide.html#missed-job-executions-and-coalescing
假设
misfire_grace_time
被设置为 30 。设想和上述coalesce
类似的场景,如果一个 job 本来 14:00 有一次执行,但是由于某种原因没有被调度上,现在 14:01 了,这个 14:00 的运行实例被提交时,会检查它预订运行的时间和当下时间的差值(这里是1分钟),大于我们设置的 30 秒限制,那么这个运行实例不会被执行。 -
max_instances
Maximum number of concurrently(同时,并发) running instances allowed for this job.
Limiting the number of concurrently(同时) executing instances of a job: By default, only one instance of each job is allowed to be run at the same time. This means that if the job is about to(将要) be run but the previous run hasn’t finished yet, then the latest run is considered a misfire. It is possible to set the maximum number of instances for a particular(特定) job that the scheduler will let run concurrently(同时,并发), by using the
max_instances
keyword argument when adding the job.限制任务执行的实例并行数: 默认情况下,在同一时间,一个任务只允许一个执行中的实例在运行。比如说,一个任务是每5秒执行一次,但是这个任务在第一次执行的时候花了6秒,也就是说前一次任务还没执行完,后一次任务又触发了,由于默认一次只允许一个实例执行,所以第二次就丢失了。为了杜绝这种情况,可以在添加任务时,设置max_instances参数,为指定任务设置最大实例并行数。
max_instances
默认值为 1 ,例如我们设置一个定时任务每秒运行一次,有时候该任务运行时长需要超过 1 秒,则会报异常(程序依然能继续运行)。这时候,我们可以设置max_instances
并发数为 2 或以上。
2. 实例 - 使用配置
apscheduler_use_config_with_interval.py
将在数据库中创建一个名为 apscheduler_jobs 的数据表,生成一行记录: 含有字段:id, next_run_time, job_state 每运行一次,都会自动增加一条记录。
from datetime import datetime
import time
import os
# 导入上面设置的配置
from apscheduler_config import scheduler
def tick():
print('Tick! The time is: %s' % datetime.now())
if __name__ == '__main__':
# scheduler = BackgroundScheduler()
scheduler.add_job(tick, 'interval', seconds=3)
scheduler.start()
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
try:
# This is here to simulate(模拟) application activity (which keeps the main thread alive).
while True:
time.sleep(2)
except (KeyboardInterrupt, SystemExit):
# Not strictly necessary if daemonic mode is enabled but should be done if possible
scheduler.shutdown()
apscheduler_use_config_with_date.py
将在数据库中创建一个名为 apscheduler_jobs 的数据表,生成一行记录: 含有字段:id, next_run_time(是 add_job 中指定的 run_date ), job_state 当到了指定时间执行完 job 之后,数据表中的当前记录将会被自动删除。
from datetime import date
from datetime import datetime
import time
import os
from apscheduler_config import scheduler
# def my_job(text):
# print(text)
# # scheduler = BlockingScheduler()
# # 如果不设置 `while True: time.sleep(2)` , apscheduler_config 中需要使用 BlockingScheduler 替代 BackgroundScheduler
# scheduler.add_job(my_job, 'date', run_date='2020-03-16 01:17:30', args=['Hello World'])
# scheduler.start()
def tick():
print('Tick! The time is: %s' % datetime.now())
if __name__ == '__main__':
# scheduler = BackgroundScheduler()
scheduler.add_job(tick, 'date', run_date='2020-03-16 01:32:10')
scheduler.start()
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
try:
# This is here to simulate(模拟) application activity (which keeps the main thread alive).
while True:
time.sleep(2)
except (KeyboardInterrupt, SystemExit):
# Not strictly necessary if daemonic mode is enabled but should be done if possible
scheduler.shutdown()
七、附:使用 uWSGI 部署 Django 项目时,使用 APScheduler ,uWSGI 配置需开启使用线程
参考:
- https://uwsgi-docs.readthedocs.io/en/latest/WSGIquickstart.html#a-note-on-python-threads
- 中文文档:https://uwsgi-docs-cn.readthedocs.io/zh_CN/latest/WSGIquickstart.html#python
- https://uwsgi-docs.readthedocs.io/en/latest/ThingsToKnow.html
- https://apscheduler.readthedocs.io/en/latest/faq.html#how-can-i-use-apscheduler-with-uwsgi
如果使用 uWSGI 部署 Django 项目时,使用了线程运行 APScheduler 定时调度任务,例如如下配置:
......
executors = {
'default': ThreadPoolExecutor(20),
}
scheduler = BackgroundScheduler(
......
executors=executors,
......
)
需要在对应 uWSGI 的 .ini 配置文件中,加入启用(多)线程语句:
enable-threads = true
否则访问IP/网址时报错(Django 项目的 settings.py 中 DEBUG = True
才会显示):
The scheduler seems to be running under uWSGI, but threads have been disabled.
You must run uWSGI with the --enable-threads option for the scheduler to work.