rocketry
rocketry copied to clipboard
Scheduler don´t run after_success(fuc) after some time.
When I start the scheduler, it runs stable for some time, but withouht any msg, the after_success(fuc) stop running. Only the periodic timer functions are executed.
Screenshots from the DB log
The Scheduler implementation:
sec_db_engine = create_engine(db_settings.DB_URL, pool_pre_ping=True)
app_scheduler = Rocketry(
execution="async",
# logger_repo=MemoryRepo(),
logger_repo=SQLRepo(engine=sec_db_engine, table="scheduler_log", model=MinimalRunRecord, id_field="id"),
config = {
"task_execution": "async",
'silence_task_prerun': False,
'silence_task_logging': False,
'silence_cond_check': False,
"force_status_from_logs": True
}
)
from .tasks import scheduler
app_scheduler.include_grouper(scheduler)
The Task implementation
scheduler= Grouper()
@scheduler.task(
minutely,
execution="main",
)
def check_starting_condition():
if check_first() and check_second():
logging.info(f"Check starting condition passed!")
return "OK"
else:
raise Exception("Im not allowed to run the Taks")
@scheduler.task(after_fail(check_starting_condition),
execution="main")
def failed_check():
logging.info("Der report_check failed!")
@scheduler.task(after_success(check_starting_condition),
execution="async")
async def main_exc_function(
):
# an async function call with aiohttp
return some_stuff
I integrated it along with Fastapi
class Server(uvicorn.Server):
"""Customized uvicorn.Server
Uvicorn server overrides signals and we need to include
Rocketry to the signals."""
def handle_exit(self, sig: int, frame) -> None:
app_scheduler.session.shut_down()
return super().handle_exit(sig, frame)
async def run():
server = Server(config=uvicorn.Config(
app,
loop="asyncio",
host=server_settings.SERVER_HOST,
port=server_settings.SERVER_PORT,
reload=server_settings.SERVER_RELOAD,
workers=server_settings.SERVER_WORKER,
timeout_keep_alive=server_settings.SERVER_TIME_OUT,
access_log = False))
api = asyncio.create_task(server.serve())
sched = asyncio.create_task(app_scheduler.serve())
await asyncio.wait([sched, api])
if __name__ == "__main__":
asyncio.run(run())
Now it gets even stranger. The Tasks started again after 22 missed runs.
I have now tested the scheduler for several days. The problem only occurs when I use the SQL repo.
I tried to set the SQLalchemy engine to log the queries via echo=true. Unfortunately I get the error message here "AttributeError: 'scheduler_log' object has no attribute '_sa_instance_state'" see logs.txt logs.txt
After using the logs from the DB, I noticed that despite the setting of the id_field with the config:
repo=SQLRepo(engine=sec_db_engine, table="scheduler_log", model=MinimalRunRecord, id_field="id")
the logs are queryed from the DB using the "created" column.
Did i miss something in the config?
The documentation mentions that it is better to use an ID column, this may be the issue of the missed runs with after_success(fuc).