databases
databases copied to clipboard
MySQL - asyncio.Lock get error
when use databases and aiomysql I get Task <Task pending coro=<RequestResponseCycle.run_asgi() running at /usr/local/lib/python3.7/site-packages/uvicorn/protocols/http/httptools_impl.py:370> cb=[set.discard()]> got Future <Future pending> attached to a different loop
detail:
Traceback (most recent call last):
File "/app/services/currency.py", line 120, in update_currency_ticker_by_id
return await database.execute(q)
File "/usr/local/lib/python3.7/site-packages/databases/core.py", line 122, in execute
return await connection.execute(query, values)
File "/usr/local/lib/python3.7/site-packages/databases/core.py", line 209, in execute
async with self._query_lock:
File "/usr/local/lib/python3.7/asyncio/locks.py", line 92, in __aenter__
await self.acquire()
File "/usr/local/lib/python3.7/asyncio/locks.py", line 192, in acquire
await fut
RuntimeError: Task <Task pending coro=<RequestResponseCycle.run_asgi() running at /usr/local/lib/python3.7/site-packages/uvicorn/protocols/http/httptools_impl.py:370> cb=[set.discard()]> got Future <Future pending> attached to a different loop
packages:
aiomysql==0.0.20
databases==0.2.3
alembic==1.0.11
fastapi==0.31.0
db.py
from databases import Database
database = Database(DB_URL, min_size=5, max_size=20)
execute code snippet
async def update_currency_ticker_by_id(currency_id: int, ticker: CurrencyTickerUpdateRequest):
tbl_c = currency
values = {k: v for k, v in ticker.dict().items() if v is not None}
if values:
q = tbl_c.update().where(tbl_c.c.id == currency_id).values(values)
try:
return await database.execute(q)
except Exception as e:
print('update error', e, "detail\n", traceback.format_exc())
run in docker with base image: python 3.7.3-alpine.
why did this happend? and how to fix it ... it's urgent.
got this error 2. is there any update here?
Can either of you reduce this to a really simple, reproducible example case? @lexee Are you also seeing this using FastAPI same as @watsy0007, or not?
from fastapi import FastAPI
from databases import Database
from os import environ as env
database = Database(env.get('DB_URL', 'mysql://root:@127.0.0.1:3306/db'), min_size=5, max_size=10)
app = FastAPI()
@database.transaction()
async def blocked():
return {}
@app.get('/')
async def index():
return await blocked()
@app.on_event('startup')
async def setup_db():
await database.connect()
test.py
import threading
import requests
[threading.Thread(target=requests.get, args=["http://127.0.0.1:8000"]).start() for _ in range(5)]
when I comment @database.transaction()
. it's works.
@lexee @tomchristie
Any reason the issue got closed?
i have the same problem, do we have any solution?
@NingziSlay Can you confirm if this is specific to MySQL?
@NingziSlay Can you confirm if this is specific to MySQL?
sorry about poor English, hope you could understand.. yes, i only use mysql to my project.
i use fastapi too, if request come one by one, it works fine. but it cannot work concurrently
here is main.py
from fastapi import FastAPI
from apps import apis
from libs.db_tools import session
app = FastAPI()
app.include_router(apis, prefix="/api")
@app.on_event("startup")
async def startup():
await session.connect()
@app.on_event("shutdown")
async def shutdown():
await session.disconnect()
if __name__ == '__main__':
import uvicorn
uvicorn.run(
app,
host='0.0.0.0',
port=8080,
)
libs.db_tools.py
session = databases.Database("mysql+pymysql://root:[email protected]/beat_dev?charset=utf8&use_unicode=1")
i write this
from libs.db_tools import session
...
@router.get("/test")
async def test():
query = Track.select()
return await session.fetch_all(query)
and use ab
ab -n 4 -c 2 http://127.0.0.1:8000/test
will raise exception
...
File "/Users/ningzi/workspace/beats/.beats/lib/python3.7/site-packages/starlette/middleware/errors.py", line 178, in __call__
raise exc from None
File "/Users/ningzi/workspace/beats/.beats/lib/python3.7/site-packages/starlette/middleware/errors.py", line 156, in __call__
await self.app(scope, receive, _send)
File "/Users/ningzi/workspace/beats/.beats/lib/python3.7/site-packages/starlette/exceptions.py", line 73, in __call__
raise exc from None
File "/Users/ningzi/workspace/beats/.beats/lib/python3.7/site-packages/starlette/exceptions.py", line 62, in __call__
await self.app(scope, receive, sender)
File "/Users/ningzi/workspace/beats/.beats/lib/python3.7/site-packages/starlette/routing.py", line 590, in __call__
await route(scope, receive, send)
File "/Users/ningzi/workspace/beats/.beats/lib/python3.7/site-packages/starlette/routing.py", line 208, in __call__
await self.app(scope, receive, send)
File "/Users/ningzi/workspace/beats/.beats/lib/python3.7/site-packages/starlette/routing.py", line 41, in app
response = await func(request)
File "/Users/ningzi/workspace/beats/.beats/lib/python3.7/site-packages/fastapi/routing.py", line 133, in app
raw_response = await dependant.call(**values)
File "/Users/ningzi/workspace/beats/apps/access.py", line 68, in test
File "/Users/ningzi/workspace/beats/.beats/lib/python3.7/site-packages/databases/core.py", line 131, in fetch_all
return await connection.fetch_all(query, values)
File "/Users/ningzi/workspace/beats/.beats/lib/python3.7/site-packages/databases/core.py", line 217, in fetch_all
async with self._query_lock:
File "/usr/local/Cellar/python/3.7.7/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/locks.py", line 92, in __aenter__
await self.acquire()
File "/usr/local/Cellar/python/3.7.7/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/locks.py", line 192, in acquire
await fut
RuntimeError: Task <Task pending coro=<RequestResponseCycle.run_asgi() running at /Users/ningzi/workspace/beats/.beats/lib/python3.7/site-packages/uvicorn/protocols/http/httptools_impl.py:385> cb=[set.discard()]> got Future <Future pending> attached to a different loop
Is there any progress on this issue?
I change my code like this, and it's work.
class SessionMaker(object):
_session = None
@classmethod
def get_session(cls):
if not cls._session:
cls._session = databases.Database(settings.DB_URL)
return cls._session
on main.py
@app.on_event("startup")
async def startup():
session = SessionMaker.get_session()
await session.connect()
@app.on_event("shutdown")
async def shutdown():
session = SessionMaker.get_session()
await session.disconnect()