asyncpg
asyncpg copied to clipboard
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress when executing gino processes in parallel
- asyncpg version: 0.20.1
- PostgreSQL version: 11
- Do you use a PostgreSQL SaaS? If so, which? Can you reproduce the issue with a local PostgreSQL install?:no
- Python version: 3.7
- Platform: Win
- Do you use pgbouncer?: no
- Did you install asyncpg with pip?: yes
- If you built asyncpg locally, which version of Cython did you use?:
- Can the issue be reproduced under both asyncio and uvloop?: dont know, havent used uvloop
Hello! Thank you for your attention in advance.
Im trying to run parallel tasks in batches with asyncio, gino and asyncpg.
I have the magic starting from this entry point:
while batch:
tasks = {asyncio.create_task(JobService.run_job(job, connection_per_job=True, endpoint_connection=endpoint_request.db_connection)): job for job in batch}
result = await asyncio.gather(*tasks)
# executed_jobs.append({'name': job_instance.name, 'uuid': job_instance.uuid, 'result': job_execution.response})
start += batch_jobs_count
batch = jobs[start:start + batch_jobs_count]
endpoint_connection=endpoint_request.db_connection
is the endpoint connection started from the point of receiving the request in my application (I'm using an internal util library over asyncpg, quart and gino)
Below in JobService.run_job I have:
@classmethod
async def run_job(cls, job: Job, connection_per_job=False, endpoint_connection=None) -> (Job, JobExecution):
print(f'received {job.name}, time: {datetime.utcnow()}')
print(f'job: {job.name}, connection: {endpoint_connection.raw_connection._con._stmt_exclusive_section._acquired}')
if connection_per_job and bool(endpoint_connection.raw_connection._con._stmt_exclusive_section._acquired):
async with db_adapter.get_db().acquire() as conn:
print('in context')
job_instance, job_execution = await cls._run_job_internal(job, conn=conn)
else:
job_instance, job_execution = await cls._run_job_internal(job)
return job_instance, job_execution
I'm trying to optimize using db connections by first checking if the general endpoint connection is free and using it if so and if not acquiring a new one in a context manager. In order to be sure which connection is used, I'm passing a bind (connection) parameter to all of my methods related to db operations (gino usage) and they seem to fail on the second created connection in the context, i don't really understand also how a connection created explicitly can be used in another operation already (from what i understand from the error)
Can you show the full traceback of the exception, please?
Yes, sorry :)
Traceback (most recent call last):
File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\python_core\libraries\default_handler.py", line 54, in handle
endpoint_response = await func(endpoint_request, endpoint_response)
File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\v1\controllers\model_controllers\job\__init__.py", line 39, in run
result = await asyncio.gather(*tasks)
File "C:\Users\SimonaSimeonova\AppData\Local\Programs\Python\Python37-32\lib\asyncio\tasks.py", line 292, in __wakeup
future.result()
File "C:\Users\SimonaSimeonova\AppData\Local\Programs\Python\Python37-32\lib\asyncio\tasks.py", line 223, in __step
result = coro.send(None)
File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\v1\common\services\job.py", line 64, in run_job
job_instance, job_execution = await cls._run_job_internal(job)
File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\v1\common\services\job.py", line 78, in _run_job_internal
job_execution = await JobExecutionService.create(job, bind=conn)
File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\v1\common\services\job_execution.py", line 18, in create
result_instance = await JobExecution.create(**data, bind=bind)
File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\python_core\libraries\database\models\base_model.py", line 53, in _create_without_instance
return await cls(**values)._create(bind=bind, timeout=timeout)
File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\python_core\libraries\database\models\base_model.py", line 57, in _create
uuid = await self.__create_pre_operations(request_dict, validate=validate, bind=bind)
File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\python_core\libraries\database\models\base_model.py", line 73, in __create_pre_operations
revision = await cls.revision_cls.create_revision(cls, values, operation_type, bind=bind)
File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\python_core\libraries\database\models\entity_revision.py", line 70, in create_revision
await cls.create(bind=bind, **revision_data)
File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\gino\crud.py", line 452, in _create_without_instance
return await cls(**values)._create(bind=bind, timeout=timeout)
File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\gino\crud.py", line 487, in _create
row = await bind.first(q)
File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\gino\engine.py", line 745, in first
return await conn.first(clause, *multiparams, **params)
File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\gino\engine.py", line 325, in first
return await result.execute(one=True)
File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\gino\dialects\base.py", line 211, in execute
context.statement, context.timeout, args, 1 if one else 0
File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\gino\dialects\asyncpg.py", line 178, in async_execute
with getattr(conn, "_stmt_exclusive_section"):
File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\asyncpg\connection.py", line 1841, in __enter__
'cannot perform operation: another operation is in progress')
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress
Well, the general rule is that you are not supposed to share a connection between tasks, because PostgreSQL does not support interleaving queries on a single connection. You are trying to get around the critical section check, but that would only work if all code below the check is synchronous, i.e. you are not returning control to the event loop. That's not true in your case, there's plenty of opportunity between your check and the actual database operation for some other task to enter the critical section.
Is there a particular reason why a pool does not work for you?
Im actually using a pool:
async with db_adapter.get_db().acquire() as conn:
print('in context')
job_instance, job_execution = await cls._run_job_internal(job, conn=conn)
db_adapter.get_db()
is a wrapper of the gino engine and using acquire means the connection is always taken from the pool
https://github.com/python-gino/gino/blob/77e19f58e4295729937a46996c0630020b29b9a3/gino/engine.py#L619
Okay, so i started a separate engine specifically for this part of the application and the error seems to go away only when there are records in my table. Seems like in an empty table the first row gets locked until it gets populated, but parallel tasks try to access it too. So i tried checking if theres records in the table and sleep for a second, but the error occurs again.
@shsimeonova do you mind sharing a little more details on how you did it? I'm facing the very same issue with two SELECT queries
More specifically, I'm using the following code, which is raising the exception you have shown
import asyncio
async with db_pool.acquire() as conn:
async with conn.transaction():
res1 = conn.fetchval(
"""SELECT * FROM ...... ;"""
)
res2 = conn.fetchval(
"""SELECT * FROM ...... ;"""
)
await asyncio.gather(row, streak)
Though, I guess that the problem is the fact that I'm sharing the acquired connection ....
@shsimeonova I also faced the same issue using gino(with asyncpg driver). The issue for me was acquiring db connection with reuse=True
inside tasks.
To elaborate, the tasks which I ran using asyncio.gather had code async with pool.acquire(reuse=True) as conn
which somehow caused a connection to be shared between tasks and hence the error.
Acquiring connections with reuse=False
fixed it for me.
Also the the crud methods provided by gino, use GinoEngine which under the hood, always acquires connections with reuse=True
. So if you have code like User.create(**kwargs)
etc it will also fail because, internally it acquires a reusable connection.
For reference https://github.com/python-gino/gino/issues/313
@dony585 Which version of asyncpg are you using? In the latest version (0.23.0) pool.acquire()
takes no arguments.