apscheduler
apscheduler copied to clipboard
After exception AsyncScheduler crashes
Things to check first
-
[X] I have checked that my issue does not already have a solution in the FAQ
-
[X] I have searched the existing issues and didn't find my bug already reported there
-
[X] I have checked that my bug is still present in the latest release
Version
4.0.0a4
What happened?
After an exception the AsyncScheduler crashes. I'm tried reproduce this with throwing basic exception, but it works only with Sqlalchemy exception. Also I'm using Sqlalchemy datastore. After exception apscheduler is throwing this:
2023-12-31 09:43:20,954 | ERROR | apscheduler._schedulers.async_:931 - Job ade58903-05d3-45a1-b8b3-107531e0d1ae raised an exception
Traceback (most recent call last):
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 557, in _prepare_and_execute
self._rows = await prepared_stmt.fetch(*parameters)
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\asyncpg\prepared_stmt.py", line 176, in fetch
data = await self.__bind_execute(args, 0, timeout)
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\asyncpg\prepared_stmt.py", line 241, in __bind_execute
data, status, _ = await self.__do_execute(
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\asyncpg\prepared_stmt.py", line 230, in __do_execute
return await executor(protocol)
File "asyncpg\protocol\protocol.pyx", line 207, in bind_execute
asyncpg.exceptions.UniqueViolationError: повторювані значення ключа порушують обмеження унікальності "items_code_key"
DETAIL: Ключ (code)=(1a1a1a) вже існує.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1969, in _exec_single_context
self.dialect.do_execute(
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\default.py", line 922, in do_execute
cursor.execute(statement, parameters)
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 591, in execute
self._adapt_connection.await_(
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\util\_concurrency_py3k.py", line 125, in await_only
return current.driver.switch(awaitable) # type: ignore[no-any-return]
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\util\_concurrency_py3k.py", line 185, in greenlet_spawn
value = await result
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 569, in _prepare_and_execute
self._handle_exception(error)
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 520, in _handle_exception
self._adapt_connection._handle_exception(error)
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 808, in _handle_exception
raise translated_error from error
sqlalchemy.dialects.postgresql.asyncpg.AsyncAdapt_asyncpg_dbapi.IntegrityError: <class 'asyncpg.exceptions.UniqueViolationError'>: повторювані значення ключа порушують обмеження унікальності "items_code_key"
DETAIL: Ключ (code)=(1a1a1a) вже існує.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\_schedulers\async_.py", line 915, in _run_job
retval = await job_executor.run_job(func, job)
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\executors\async_.py", line 22, in run_job
retval = await retval
File "D:\PyProg\PizzaBotAPI\apscheduler_test.py", line 67, in job_2
await db.commit()
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\ext\asyncio\session.py", line 1011, in commit
await greenlet_spawn(self.sync_session.commit)
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\util\_concurrency_py3k.py", line 192, in greenlet_spawn
result = context.switch(value)
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\session.py", line 1969, in commit
trans.commit(_to_root=True)
File "<string>", line 2, in commit
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\state_changes.py", line 139, in _go
ret_value = fn(self, *arg, **kw)
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\session.py", line 1256, in commit
self._prepare_impl()
File "<string>", line 2, in _prepare_impl
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\state_changes.py", line 139, in _go
ret_value = fn(self, *arg, **kw)
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\session.py", line 1231, in _prepare_impl
self.session.flush()
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\session.py", line 4312, in flush
self._flush(objects)
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\session.py", line 4447, in _flush
with util.safe_reraise():
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\util\langhelpers.py", line 146, in __exit__
raise exc_value.with_traceback(exc_tb)
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\session.py", line 4408, in _flush
flush_context.execute()
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\unitofwork.py", line 466, in execute
rec.execute(self)
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\unitofwork.py", line 642, in execute
util.preloaded.orm_persistence.save_obj(
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\persistence.py", line 93, in save_obj
_emit_insert_statements(
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\persistence.py", line 1226, in _emit_insert_statements
result = connection.execute(
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1416, in execute
return meth(
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\sql\elements.py", line 516, in _execute_on_connection
return connection._execute_clauseelement(
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1639, in _execute_clauseelement
ret = self._execute_context(
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1848, in _execute_context
return self._exec_single_context(
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1988, in _exec_single_context
self._handle_dbapi_exception(
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 2343, in _handle_dbapi_exception
raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1969, in _exec_single_context
self.dialect.do_execute(
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\default.py", line 922, in do_execute
cursor.execute(statement, parameters)
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 591, in execute
self._adapt_connection.await_(
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\util\_concurrency_py3k.py", line 125, in await_only
return current.driver.switch(awaitable) # type: ignore[no-any-return]
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\util\_concurrency_py3k.py", line 185, in greenlet_spawn
value = await result
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 569, in _prepare_and_execute
self._handle_exception(error)
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 520, in _handle_exception
self._adapt_connection._handle_exception(error)
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 808, in _handle_exception
raise translated_error from error
sqlalchemy.exc.IntegrityError: (sqlalchemy.dialects.postgresql.asyncpg.IntegrityError) <class 'asyncpg.exceptions.UniqueViolationError'>: повторювані значення ключа порушують обмеження унікальності "items_code_key"
DETAIL: Ключ (code)=(1a1a1a) вже існує.
[SQL: INSERT INTO items (name, code) VALUES ($1::VARCHAR, $2::VARCHAR) RETURNING items.id]
[parameters: ('Item', '1a1a1a')]
(Background on this error at: https://sqlalche.me/e/20/gkpj)
2023-12-31 09:43:21,233 | INFO | apscheduler._schedulers.async_:917 - Job 9350e21c-059b-429f-be53-6af64f6696df was cancelled
2023-12-31 09:43:21,234 | INFO | apscheduler._schedulers.async_:917 - Job c36d50fc-1146-46c3-81ca-d3f0b66982fb was cancelled
2023-12-31 09:43:21,240 | ERROR | apscheduler._schedulers.async_:693 - Scheduler crashed
+ Exception Group Traceback (most recent call last):
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\_schedulers\async_.py", line 667, in run_until_stopped
| async with create_task_group() as task_group:
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\anyio\_backends\_asyncio.py", line 664, in __aexit__
| raise BaseExceptionGroup(
| exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\_schedulers\async_.py", line 889, in _process_jobs
| await wakeup_event.wait()
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\anyio\_backends\_asyncio.py", line 1621, in wait
| await self._event.wait()
| File "C:\Python310\lib\asyncio\locks.py", line 214, in wait
| await fut
| asyncio.exceptions.CancelledError: Cancelled by cancel scope 1dd93cd0b50
|
| During handling of the above exception, another exception occurred:
|
| Exception Group Traceback (most recent call last):
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\_schedulers\async_.py", line 863, in _process_jobs
| async with AsyncExitStack() as exit_stack:
| File "C:\Python310\lib\contextlib.py", line 714, in __aexit__
| raise exc_details[1]
| File "C:\Python310\lib\contextlib.py", line 697, in __aexit__
| cb_suppress = await cb(*exc_details)
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\anyio\_backends\_asyncio.py", line 664, in __aexit__
| raise BaseExceptionGroup(
| exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 557, in _prepare_and_execute
| self._rows = await prepared_stmt.fetch(*parameters)
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\asyncpg\prepared_stmt.py", line 176, in fetch
| data = await self.__bind_execute(args, 0, timeout)
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\asyncpg\prepared_stmt.py", line 241, in __bind_execute
| data, status, _ = await self.__do_execute(
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\asyncpg\prepared_stmt.py", line 230, in __do_execute
| return await executor(protocol)
| File "asyncpg\protocol\protocol.pyx", line 207, in bind_execute
| asyncpg.exceptions.UniqueViolationError: повторювані значення ключа порушують обмеження унікальності "items_code_key"
| DETAIL: Ключ (code)=(1a1a1a) вже існує.
|
| The above exception was the direct cause of the following exception:
|
| Traceback (most recent call last):
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1969, in _exec_single_context
| self.dialect.do_execute(
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\default.py", line 922, in do_execute
| cursor.execute(statement, parameters)
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 591, in execute
| self._adapt_connection.await_(
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\util\_concurrency_py3k.py", line 125, in await_only
| return current.driver.switch(awaitable) # type: ignore[no-any-return]
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\util\_concurrency_py3k.py", line 185, in greenlet_spawn
| value = await result
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 569, in _prepare_and_execute
| self._handle_exception(error)
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 520, in _handle_exception
| self._adapt_connection._handle_exception(error)
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 808, in _handle_exception
| raise translated_error from error
| sqlalchemy.dialects.postgresql.asyncpg.AsyncAdapt_asyncpg_dbapi.IntegrityError: <class 'asyncpg.exceptions.UniqueViolationError'>: повторювані значення ключа порушують обмеження унікальності "items_code_key"
| DETAIL: Ключ (code)=(1a1a1a) вже існує.
|
| The above exception was the direct cause of the following exception:
|
| Traceback (most recent call last):
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\_schedulers\async_.py", line 915, in _run_job
| retval = await job_executor.run_job(func, job)
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\executors\async_.py", line 22, in run_job
| retval = await retval
| File "D:\PyProg\PizzaBotAPI\apscheduler_test.py", line 67, in job_2
| await db.commit()
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\ext\asyncio\session.py", line 1011, in commit
| await greenlet_spawn(self.sync_session.commit)
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\util\_concurrency_py3k.py", line 192, in greenlet_spawn
| result = context.switch(value)
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\session.py", line 1969, in commit
| trans.commit(_to_root=True)
| File "<string>", line 2, in commit
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\state_changes.py", line 139, in _go
| ret_value = fn(self, *arg, **kw)
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\session.py", line 1256, in commit
| self._prepare_impl()
| File "<string>", line 2, in _prepare_impl
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\state_changes.py", line 139, in _go
| ret_value = fn(self, *arg, **kw)
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\session.py", line 1231, in _prepare_impl
| self.session.flush()
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\session.py", line 4312, in flush
| self._flush(objects)
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\session.py", line 4447, in _flush
| with util.safe_reraise():
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\util\langhelpers.py", line 146, in __exit__
| raise exc_value.with_traceback(exc_tb)
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\session.py", line 4408, in _flush
| flush_context.execute()
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\unitofwork.py", line 466, in execute
| rec.execute(self)
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\unitofwork.py", line 642, in execute
| util.preloaded.orm_persistence.save_obj(
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\persistence.py", line 93, in save_obj
| _emit_insert_statements(
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\orm\persistence.py", line 1226, in _emit_insert_statements
| result = connection.execute(
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1416, in execute
| return meth(
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\sql\elements.py", line 516, in _execute_on_connection
| return connection._execute_clauseelement(
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1639, in _execute_clauseelement
| ret = self._execute_context(
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1848, in _execute_context
| return self._exec_single_context(
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1988, in _exec_single_context
| self._handle_dbapi_exception(
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 2343, in _handle_dbapi_exception
| raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1969, in _exec_single_context
| self.dialect.do_execute(
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\engine\default.py", line 922, in do_execute
| cursor.execute(statement, parameters)
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 591, in execute
| self._adapt_connection.await_(
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\util\_concurrency_py3k.py", line 125, in await_only
| return current.driver.switch(awaitable) # type: ignore[no-any-return]
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\util\_concurrency_py3k.py", line 185, in greenlet_spawn
| value = await result
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 569, in _prepare_and_execute
| self._handle_exception(error)
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 520, in _handle_exception
| self._adapt_connection._handle_exception(error)
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 808, in _handle_exception
| raise translated_error from error
| sqlalchemy.exc.IntegrityError: (sqlalchemy.dialects.postgresql.asyncpg.IntegrityError) <class 'asyncpg.exceptions.UniqueViolationError'>: повторювані значення ключа порушують обмеження унікальності "items_code_key"
| DETAIL: Ключ (code)=(1a1a1a) вже існує.
| [SQL: INSERT INTO items (name, code) VALUES ($1::VARCHAR, $2::VARCHAR) RETURNING items.id]
| [parameters: ('Item', '1a1a1a')]
| (Background on this error at: https://sqlalche.me/e/20/gkpj)
|
| During handling of the above exception, another exception occurred:
|
| Traceback (most recent call last):
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\_schedulers\async_.py", line 947, in _run_job
| await self.event_broker.publish(
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\eventbrokers\asyncpg.py", line 176, in publish
| raise SerializationError(
| apscheduler.SerializationError: Serialized event object exceeds 7999 bytes in size
+------------------------------------
2023-12-31 09:43:21,305 | ERROR | uvicorn.error:134 - Traceback (most recent call last):
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\starlette\routing.py", line 714, in lifespan
await receive()
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\uvicorn\lifespan\on.py", line 137, in receive
return await self.receive_queue.get()
File "C:\Python310\lib\asyncio\queues.py", line 159, in get
await getter
asyncio.exceptions.CancelledError: Cancelled by cancel scope 1dd93bea0e0
So after this all schedules stop executing
How can we reproduce the bug?
Here is a minimal code example:
apscheduler_test.py
import asyncio
import datetime
import json
import logging
import typing
from contextlib import asynccontextmanager
import uvicorn
from apscheduler import AsyncScheduler
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker
from apscheduler.triggers.interval import IntervalTrigger
from fastapi import FastAPI, Response
from fastapi.middleware import Middleware
from pydantic import BaseModel
from sqlalchemy import Column, BigInteger, String
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.orm import declarative_base
from starlette.types import ASGIApp, Scope, Receive, Send
logger = logging.getLogger(__name__)
class CustomSerializer(json.JSONEncoder):
def default(self, obj: typing.Any) -> typing.Any:
if isinstance(obj, (datetime.date, datetime.datetime)):
return obj.isoformat()
elif isinstance(obj, BaseModel):
return obj.dict()
def database_json_serializer(v):
return json.dumps(v, indent=2, ensure_ascii=False, cls=CustomSerializer)
engine = create_async_engine(
"postgresql+asyncpg://postgres:{passwd}@{host}:{port}/{database}",
pool_size=20,
max_overflow=60,
json_serializer=database_json_serializer
)
Session = async_sessionmaker(autocommit=False, autoflush=False, bind=engine, class_=AsyncSession)
Base = declarative_base()
class Item(Base):
__tablename__ = "items"
id = Column(BigInteger, autoincrement=True, primary_key=True)
name = Column(String, nullable=False)
code = Column(String, unique=True, nullable=False)
async def job_1():
logger.info("JOB_1 EXECUTE START")
await asyncio.sleep(5)
logger.info("JOB_1 EXECUTE END")
async def job_2():
logger.info("JOB_2 EXECUTE START")
async with Session() as db:
new_obj = Item(name="Item", code="1a1a1a")
db.add(new_obj)
await db.commit()
await db.refresh(new_obj)
await asyncio.sleep(5)
logger.info("JOB_2 EXECUTE END")
async def job_3():
logger.info("JOB_3 EXECUTE START")
await asyncio.sleep(10)
logger.info("JOB_3 EXECUTE END")
class SchedulerMiddleware:
def __init__(
self,
app: ASGIApp,
scheduler: AsyncScheduler,
) -> None:
self.app = app
self.scheduler = scheduler
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] == "lifespan":
async with self.scheduler:
await self.scheduler.add_schedule(
job_1,
IntervalTrigger(minutes=1),
id="job_1"
)
await self.scheduler.add_schedule(
job_2,
IntervalTrigger(minutes=1),
id="job_2"
)
await self.scheduler.add_schedule(
job_3,
IntervalTrigger(minutes=1),
id="job_3"
)
await self.scheduler.start_in_background()
await self.app(scope, receive, send)
else:
await self.app(scope, receive, send)
@asynccontextmanager
async def lifespan(_: FastAPI):
logger.info("LIFESPAN EVENT")
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
data_store = SQLAlchemyDataStore(engine)
event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)
scheduler = AsyncScheduler(data_store, event_broker)
app = FastAPI(lifespan=lifespan, middleware=[
Middleware(SchedulerMiddleware, scheduler=scheduler)
])
@app.get("/")
async def route_main():
return Response(status_code=202)
if __name__ == "__main__":
uvicorn.run(
"apscheduler_test:app",
port=5000,
host="localhost",
reload=False,
proxy_headers=False,
log_config="logging.json"
)
logging.json
{
"version": 1,
"disable_existing_loggers": false,
"formatters": {
"default": {
"format": "%(asctime)s | %(levelname)-8s | %(name)s:%(lineno)s - %(message)s"
},
"basic": {
"()": "uvicorn.logging.DefaultFormatter",
"format": "%(asctime)s | %(levelname)-8s | %(name)s - %(message)s"
}
},
"handlers": {
"console_handler": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "default",
"stream": "ext://sys.stdout"
}
},
"loggers": {
"root": {
"level": "INFO",
"handlers": [
"console_handler"
]
}
}
}
I need you to minimize this. That is, to remove everything not needed to reproduce the issue.
engine = create_async_engine("postgresql+asyncpg://...")
Session = async_sessionmaker(autocommit=False, autoflush=False, bind=engine, class_=AsyncSession)
Base = declarative_base()
class Item(Base):
__tablename__ = "items"
id = Column(BigInteger, autoincrement=True, primary_key=True)
code = Column(String, unique=True, nullable=False)
async def job_1():
print("JOB_1 EXECUTE START")
await asyncio.sleep(5)
print("JOB_1 EXECUTE END")
async def job_2():
print("JOB_2 EXECUTE START")
async with Session() as db:
new_obj = Item(code="1a1a1a")
db.add(new_obj)
await db.commit()
print("JOB_2 EXECUTE END")
class SchedulerMiddleware:
def __init__(self, app: ASGIApp, scheduler: AsyncScheduler) -> None:
self.app = app
self.scheduler = scheduler
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] == "lifespan":
async with self.scheduler:
await self.scheduler.add_schedule(job_1, IntervalTrigger(minutes=1), id="job_1")
await self.scheduler.add_schedule(job_2, IntervalTrigger(minutes=1), id="job_2")
await self.scheduler.start_in_background()
await self.app(scope, receive, send)
else:
await self.app(scope, receive, send)
@asynccontextmanager
async def lifespan(_: FastAPI):
print("LIFESPAN EVENT")
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
data_store = SQLAlchemyDataStore(engine)
event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)
scheduler = AsyncScheduler(data_store, event_broker)
app = FastAPI(lifespan=lifespan, middleware=[Middleware(SchedulerMiddleware, scheduler=scheduler)])
In this code we're simulating sqalchemy error(ex. sqlalchemy.exc.IntegrityError: (sqlalchemy.dialects.postgresql.asyncpg.IntegrityError) <class 'asyncpg.exceptions.UniqueViolationError'>), after throwing this error the scheduler stops working.
Will it not trigger the error if job 2 doesn't use the database?
If, for example, job 2 throws a ValueError, the scheduler will continue work
But if job 2 will throw error like this raise ValueError("aaasfsdafsghw45hj6tr5wj6rthewh"*10000)
will be the same error
So, in other words, the database stuff isn't required, just raising a ValueError
will trigger the problem?
I think it's because the thrown error text is too long
apscheduler.SerializationError: Serialized event object exceeds 7999 bytes in size
| During handling of the above exception, another exception occurred:
|
| Traceback (most recent call last):
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\_schedulers\async_.py", line 947, in _run_job
| await self.event_broker.publish(
| File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\apscheduler\eventbrokers\asyncpg.py", line 176, in publish
| raise SerializationError(
| apscheduler.SerializationError: Serialized event object exceeds 7999 bytes in size
+------------------------------------
2023-12-31 09:43:21,305 | ERROR | uvicorn.error:134 - Traceback (most recent call last):
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\starlette\routing.py", line 714, in lifespan
await receive()
File "D:\PyProg\PizzaBotAPI\venv\lib\site-packages\uvicorn\lifespan\on.py", line 137, in receive
return await self.receive_queue.get()
File "C:\Python310\lib\asyncio\queues.py", line 159, in get
await getter
asyncio.exceptions.CancelledError: Cancelled by cancel scope 1dd93bea0e0
So if we can reproduce it without db, we can minimize code to this:
async def job_1():
print("JOB_1 EXECUTE START")
await asyncio.sleep(5)
print("JOB_1 EXECUTE END")
async def job_2():
print("JOB_2 EXECUTE START")
raise ValueError("aaasfsdafsghw45hj6tr5wj6rthewh"*10000)
class SchedulerMiddleware:
def __init__(self, app: ASGIApp, scheduler: AsyncScheduler) -> None:
self.app = app
self.scheduler = scheduler
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] == "lifespan":
async with self.scheduler:
await self.scheduler.add_schedule(job_1, IntervalTrigger(minutes=1), id="job_1")
await self.scheduler.add_schedule(job_2, IntervalTrigger(minutes=1), id="job_2")
await self.scheduler.start_in_background()
await self.app(scope, receive, send)
else:
await self.app(scope, receive, send)
@asynccontextmanager
async def lifespan(_: FastAPI):
print("LIFESPAN EVENT")
yield
engine = create_async_engine("postgresql+asyncpg://...")
data_store = SQLAlchemyDataStore(engine)
event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)
scheduler = AsyncScheduler(data_store, event_broker)
app = FastAPI(lifespan=lifespan, middleware=[Middleware(SchedulerMiddleware, scheduler=scheduler)])
Right – so in fact, the web framework shouldn't be required to reproduce this.
Yes, but if it's an error such as sqlalchemy error, then it stops the entire scheduler
I think I need to remove exception tracebacks and messages from the event structure.
Ellipsizing them might be a less intrusive change. Could you test with the limit-jobreleased-size branch to see if the problem is gone? I'm done for today.
limit-jobreleased-size branch is working correctly with this type of error. Thank you!