fastapi-mysql-generator
fastapi-mysql-generator copied to clipboard
项目中以接口的形式添加任务没有问题,但如果单独写一个文件测试添加任务,任务添加成功了,可是没有执行是什么原因
项目中以接口的形式添加任务没有问题,但如果单独写一个文件测试添加任务,任务添加成功了,可是没有执行是什么原因
有具体的代码示例吗?
# write_task
def write_task(task_name):
"""测试任务,写文件"""
try:
with open(os.path.join(static_path, f'{task_name}.txt'), mode='a') as f:
f.write(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
f.write('\n')
except Exception as e:
print(e)
# schedule拷贝您的代码
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from app.core.config import settings
class ScheduleCli(object):
def __init__(self):
# 对象 在 @app.on_event("startup") 中初始化
self._schedule = None
def init_scheduler(self) -> None:
"""
初始化 apscheduler
:return:
"""
job_stores = {
"mongo": MongoDBJobStore(database=settings.MONGODB_DATABASE, collection='jobs', host=settings.MONGODB_URI)
}
self._schedule = AsyncIOScheduler(jobstores=job_stores)
self._schedule.start()
# 使实例化后的对象 赋予apscheduler对象的方法和属性
def __getattr__(self, name):
return getattr(self._schedule, name)
def __getitem__(self, name):
return self._schedule[name]
def __setitem__(self, name, value):
self._schedule[name] = value
def __delitem__(self, name):
del self._schedule[name]
# 创建schedule对象
schedule: AsyncIOScheduler = ScheduleCli()
# 只允许导出 schedule_client 实例化对象
__all__ = ["schedule"]
# 测试文件
from datetime import datetime
from app.utils.cron_statistic import schedule
from app.utils.cron_task import write_task
schedule.init_scheduler()
task_name = 'test-cron'
res = schedule.add_job(write_task, 'interval', args=(task_name,), id=task_name, name=task_name, seconds=10,
next_run_time=datetime.now(), jobstore='mongo')
print(res)
执行结果:
test-cron (trigger: interval[0:00:10], next run at: 2021-06-25 09:55:46 CST)
- 可是发现并没有生成文件,觉得任务没有执行
- 如何知道某个任务的执行状态,譬如想知道这个任务是否执行了
- 我的项目初始化的时候,已经初始化scheduler
@app.on_event("startup")
async def startup_db_client():
# 初始化 apscheduler
schedule.init_scheduler()
你应该是单独 python your_test.py
这样启动测试文件的吧? 这样是不行的。(原因可能是导入的app里面的文件属于整个项目的,如果要使用,必须得从入口文件启动才行,这个是我猜测的,不一定准确)
你如果想要默认启动这个定时任务,可以把添加定时任务的操作,放到项目启动执行的操作中。
假如我想单独测试apscheduer,跟在不在fastapi项目没有关系吧。因为我这个测试文件,我也引入scheduler执行init_scheduler了
还有一个问题就是能知道某个任务的执行状态吗
单独测试 可以参考一下 stackoverflow上的回答。
还有一个问题就是能知道某个任务的执行状态吗
可以根据任务ID去查看某个任务执行状态,可以参考我这个示例 ,具体更多的状态信息可以参考 apscheduler官方文档。