tortoise-orm
tortoise-orm copied to clipboard
tortoise中使用celery
tortoise支持celery吗?
我用了芹菜,本身是不支持,我通过修改代码,使其支持,例如:
from celery import Celery, platforms
platforms.C_FORCE_ROOT = True
celery_app = Celery("celery_worker")
celery_app.config_from_object('backend.celery.celery_config', namespace='celery')
async def register_mysql_fastapi(app: FastAPI):
# 注册数据库
register_tortoise(
app,
config=DB_ORM_CONFIG,
generate_schemas=os.getenv('INIT_DB', False),
add_exception_handlers=False,
)
async def register_mysql_to_script():
# 注册数据库供脚本调用
await Tortoise.init(config=DB_ORM_CONFIG)
@celery_app.task(name='scan_code_task', serializer='json')
def scan_code_task(uuid):
async def run_task():
await register_mysql_to_script()
await scan_code(uuid)
loop = asyncio.new_event_loop() # 创建新的事件循环
asyncio.set_event_loop(loop) # 设置事件循环为当前线程的事件循环
loop.run_until_complete(run_task())
希望这能帮到你
我用了芹菜,本身是不支持,我通过修改代码,使其支持,例如:
from celery import Celery, platforms platforms.C_FORCE_ROOT = True celery_app = Celery("celery_worker") celery_app.config_from_object('backend.celery.celery_config', namespace='celery') async def register_mysql_fastapi(app: FastAPI): # 注册数据库 register_tortoise( app, config=DB_ORM_CONFIG, generate_schemas=os.getenv('INIT_DB', False), add_exception_handlers=False, ) async def register_mysql_to_script(): # 注册数据库供脚本调用 await Tortoise.init(config=DB_ORM_CONFIG) @celery_app.task(name='scan_code_task', serializer='json') def scan_code_task(uuid): async def run_task(): await register_mysql_to_script() await scan_code(uuid) loop = asyncio.new_event_loop() # 创建新的事件循环 asyncio.set_event_loop(loop) # 设置事件循环为当前线程的事件循环 loop.run_until_complete(run_task()) 希望这能帮到你
app: FastAPI这个是怎么传进去的?
我用了芹菜,本身是不支持,我通过修改代码,使其支持,例如:
from celery import Celery, platforms platforms.C_FORCE_ROOT = True celery_app = Celery("celery_worker") celery_app.config_from_object('backend.celery.celery_config', namespace='celery') async def register_mysql_fastapi(app: FastAPI): # 注册数据库 register_tortoise( app, config=DB_ORM_CONFIG, generate_schemas=os.getenv('INIT_DB', False), add_exception_handlers=False, ) async def register_mysql_to_script(): # 注册数据库供脚本调用 await Tortoise.init(config=DB_ORM_CONFIG) @celery_app.task(name='scan_code_task', serializer='json') def scan_code_task(uuid): async def run_task(): await register_mysql_to_script() await scan_code(uuid) loop = asyncio.new_event_loop() # 创建新的事件循环 asyncio.set_event_loop(loop) # 设置事件循环为当前线程的事件循环 loop.run_until_complete(run_task()) 希望这能帮到你
app: FastAPI这个是怎么传进去的?
你说的传进去指的是使用orm嘛?
我用了芹菜,本身是不支持,我通过修改代码,使其支持,例如:
from celery import Celery, platforms platforms.C_FORCE_ROOT = True celery_app = Celery("celery_worker") celery_app.config_from_object('backend.celery.celery_config', namespace='celery') async def register_mysql_fastapi(app: FastAPI): # 注册数据库 register_tortoise( app, config=DB_ORM_CONFIG, generate_schemas=os.getenv('INIT_DB', False), add_exception_handlers=False, ) async def register_mysql_to_script(): # 注册数据库供脚本调用 await Tortoise.init(config=DB_ORM_CONFIG) @celery_app.task(name='scan_code_task', serializer='json') def scan_code_task(uuid): async def run_task(): await register_mysql_to_script() await scan_code(uuid) loop = asyncio.new_event_loop() # 创建新的事件循环 asyncio.set_event_loop(loop) # 设置事件循环为当前线程的事件循环 loop.run_until_complete(run_task()) 希望这能帮到你
app: FastAPI这个是怎么传进去的?
你说的传进去指的是使用orm嘛?
async def register_mysql_to_script(): ###注册数据库供脚本调用 await Tortoise.init(config=database_config)
@celery_app.task(name='task', serializer='json') def process_messaging_queue(result_mysql): try: async def run_async_code(): try: ###初始化数据库连接 await register_mysql_to_script() ###在这里数据库插入操作 await mysql_create(result_mysql) except Exception as e: logger.error(e) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(run_async_code()) except Exception as e: logger.error(e)
你好,我按照上述方法修改,提示我数据库连接失败,我确定我的数据库配置文件没问题,就很奇怪。 Can't connect to MySQL server: {'host': 'localhost', 'port': 3306...} celery的任务函数中,可以这样初始化数据库的连接吗,我用的是tortoise-orm
我用了芹菜,本身是不支持,我通过修改代码,使其支持,例如:
from celery import Celery, platforms platforms.C_FORCE_ROOT = True celery_app = Celery("celery_worker") celery_app.config_from_object('backend.celery.celery_config', namespace='celery') async def register_mysql_fastapi(app: FastAPI): # 注册数据库 register_tortoise( app, config=DB_ORM_CONFIG, generate_schemas=os.getenv('INIT_DB', False), add_exception_handlers=False, ) async def register_mysql_to_script(): # 注册数据库供脚本调用 await Tortoise.init(config=DB_ORM_CONFIG) @celery_app.task(name='scan_code_task', serializer='json') def scan_code_task(uuid): async def run_task(): await register_mysql_to_script() await scan_code(uuid) loop = asyncio.new_event_loop() # 创建新的事件循环 asyncio.set_event_loop(loop) # 设置事件循环为当前线程的事件循环 loop.run_until_complete(run_task()) 希望这能帮到你
app: FastAPI这个是怎么传进去的?
你说的传进去指的是使用orm嘛?
async def register_mysql_to_script(): ###注册数据库供脚本调用 await Tortoise.init(config=database_config)
@celery_app.task(name='task', serializer='json') def process_messaging_queue(result_mysql): try: async def run_async_code(): try: ###初始化数据库连接 await register_mysql_to_script() ###在这里数据库插入操作 await mysql_create(result_mysql) except Exception as e: logger.error(e) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(run_async_code()) except Exception as e: logger.error(e)
你好,我按照上述方法修改,提示我数据库连接失败,我确定我的数据库配置文件没问题,就很奇怪。 Can't connect to MySQL server: {'host': 'localhost', 'port': 3306...} celery的任务函数中,可以这样初始化数据库的连接吗,我用的是tortoise-orm
我用了芹菜,本身是不支持,我通过修改代码,使其支持,例如:
from celery import Celery, platforms platforms.C_FORCE_ROOT = True celery_app = Celery("celery_worker") celery_app.config_from_object('backend.celery.celery_config', namespace='celery') async def register_mysql_fastapi(app: FastAPI): # 注册数据库 register_tortoise( app, config=DB_ORM_CONFIG, generate_schemas=os.getenv('INIT_DB', False), add_exception_handlers=False, ) async def register_mysql_to_script(): # 注册数据库供脚本调用 await Tortoise.init(config=DB_ORM_CONFIG) @celery_app.task(name='scan_code_task', serializer='json') def scan_code_task(uuid): async def run_task(): await register_mysql_to_script() await scan_code(uuid) loop = asyncio.new_event_loop() # 创建新的事件循环 asyncio.set_event_loop(loop) # 设置事件循环为当前线程的事件循环 loop.run_until_complete(run_task()) 希望这能帮到你
app: FastAPI这个是怎么传进去的?
你说的传进去指的是使用orm嘛?
async def register_mysql_to_script(): ###注册数据库供脚本调用 await Tortoise.init(config=database_config) @celery_app.task(name='task', serializer='json') def process_messaging_queue(result_mysql): try: async def run_async_code(): try: ###初始化数据库连接 await register_mysql_to_script() ###在这里数据库插入操作 await mysql_create(result_mysql) except Exception as e: logger.error(e) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(run_async_code()) except Exception as e: logger.error(e) 你好,我按照上述方法修改,提示我数据库连接失败,我确定我的数据库配置文件没问题,就很奇怪。 Can't connect to MySQL server: {'host': 'localhost', 'port': 3306...} celery的任务函数中,可以这样初始化数据库的连接吗,我用的是tortoise-orm
![]()
![]()
已解决,谢谢。服务请求频繁的时候,是否需要把数据库断开写入celery的任务函数
写了个简单的Demo,有需要的可以参考一下:
#!/usr/bin/env python
import shlex
import subprocess
from pathlib import Path
import anyio
from celery import Celery
from celery.signals import worker_process_init, worker_process_shutdown
from tortoise import Tortoise
from config import DB_CONFIG
from models import Users
REDIS_URL = "redis://localhost:6379"
app = Celery(__name__, broker=REDIS_URL, backend=REDIS_URL)
async def init_db() -> None:
"""初始化数据库连接"""
await Tortoise.init(db_url=DB_CONFIG["db_url"], modules=DB_CONFIG["modules"])
@worker_process_init.connect
def init_worker(**kwargs):
anyio.run(init_db)
@worker_process_shutdown.connect
def close_worker(**kwargs):
anyio.run(Tortoise.close_connections)
async def _create_user(data) -> int:
user_obj = await Users.create(**data)
return user_obj.id
@app.task
def save_user_to_db(data) -> int:
return anyio.run(_create_user, data)
def main():
subprocess.run(shlex.split(f"celery -A {Path(__file__).stem} worker"))
if __name__ == "__main__":
main()
这样应该可以,不过用这个orm最好别用celery,我们公司现在用的是darq,跑了两年还算稳定。
async def do_something():
await Tortoise.init(config=DB_ORM_CONFIG)
print(await MyModel.all())
@celery_app.task(name='scan_code_task', serializer='json')
def scan_code_task():
run_async(do_something())