asyncpg icon indicating copy to clipboard operation
asyncpg copied to clipboard

asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress when executing gino processes in parallel

Open shsimeonova opened this issue 4 years ago • 8 comments

  • asyncpg version: 0.20.1
  • PostgreSQL version: 11
  • Do you use a PostgreSQL SaaS? If so, which? Can you reproduce the issue with a local PostgreSQL install?:no
  • Python version: 3.7
  • Platform: Win
  • Do you use pgbouncer?: no
  • Did you install asyncpg with pip?: yes
  • If you built asyncpg locally, which version of Cython did you use?:
  • Can the issue be reproduced under both asyncio and uvloop?: dont know, havent used uvloop

Hello! Thank you for your attention in advance.

Im trying to run parallel tasks in batches with asyncio, gino and asyncpg.

I have the magic starting from this entry point:

 while batch:
            tasks = {asyncio.create_task(JobService.run_job(job, connection_per_job=True, endpoint_connection=endpoint_request.db_connection)): job for job in batch}
            result = await asyncio.gather(*tasks)
            # executed_jobs.append({'name': job_instance.name, 'uuid': job_instance.uuid, 'result': job_execution.response})

            start += batch_jobs_count
            batch = jobs[start:start + batch_jobs_count]

endpoint_connection=endpoint_request.db_connection is the endpoint connection started from the point of receiving the request in my application (I'm using an internal util library over asyncpg, quart and gino)

Below in JobService.run_job I have:

@classmethod
    async def run_job(cls, job: Job, connection_per_job=False, endpoint_connection=None) -> (Job, JobExecution):
        print(f'received {job.name}, time: {datetime.utcnow()}')
        print(f'job: {job.name}, connection: {endpoint_connection.raw_connection._con._stmt_exclusive_section._acquired}')

        if connection_per_job and bool(endpoint_connection.raw_connection._con._stmt_exclusive_section._acquired):
            async with db_adapter.get_db().acquire() as conn:
                print('in context')
                job_instance, job_execution = await cls._run_job_internal(job, conn=conn)
        else:
            job_instance, job_execution = await cls._run_job_internal(job)

        return job_instance, job_execution

I'm trying to optimize using db connections by first checking if the general endpoint connection is free and using it if so and if not acquiring a new one in a context manager. In order to be sure which connection is used, I'm passing a bind (connection) parameter to all of my methods related to db operations (gino usage) and they seem to fail on the second created connection in the context, i don't really understand also how a connection created explicitly can be used in another operation already (from what i understand from the error)

shsimeonova avatar May 18 '20 17:05 shsimeonova

Can you show the full traceback of the exception, please?

elprans avatar May 18 '20 18:05 elprans

Yes, sorry :)

Traceback (most recent call last):
  File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\python_core\libraries\default_handler.py", line 54, in handle
    endpoint_response = await func(endpoint_request, endpoint_response)
  File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\v1\controllers\model_controllers\job\__init__.py", line 39, in run
    result = await asyncio.gather(*tasks)
  File "C:\Users\SimonaSimeonova\AppData\Local\Programs\Python\Python37-32\lib\asyncio\tasks.py", line 292, in __wakeup
    future.result()
  File "C:\Users\SimonaSimeonova\AppData\Local\Programs\Python\Python37-32\lib\asyncio\tasks.py", line 223, in __step
    result = coro.send(None)
  File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\v1\common\services\job.py", line 64, in run_job
    job_instance, job_execution = await cls._run_job_internal(job)
  File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\v1\common\services\job.py", line 78, in _run_job_internal
    job_execution = await JobExecutionService.create(job, bind=conn)
  File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\v1\common\services\job_execution.py", line 18, in create
    result_instance = await JobExecution.create(**data, bind=bind)
  File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\python_core\libraries\database\models\base_model.py", line 53, in _create_without_instance
    return await cls(**values)._create(bind=bind, timeout=timeout)
  File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\python_core\libraries\database\models\base_model.py", line 57, in _create
    uuid = await self.__create_pre_operations(request_dict, validate=validate, bind=bind)
  File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\python_core\libraries\database\models\base_model.py", line 73, in __create_pre_operations
    revision = await cls.revision_cls.create_revision(cls, values, operation_type, bind=bind)
  File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\python_core\libraries\database\models\entity_revision.py", line 70, in create_revision
    await cls.create(bind=bind, **revision_data)
  File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\gino\crud.py", line 452, in _create_without_instance
    return await cls(**values)._create(bind=bind, timeout=timeout)
  File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\gino\crud.py", line 487, in _create
    row = await bind.first(q)
  File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\gino\engine.py", line 745, in first
    return await conn.first(clause, *multiparams, **params)
  File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\gino\engine.py", line 325, in first
    return await result.execute(one=True)
  File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\gino\dialects\base.py", line 211, in execute
    context.statement, context.timeout, args, 1 if one else 0
  File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\gino\dialects\asyncpg.py", line 178, in async_execute
    with getattr(conn, "_stmt_exclusive_section"):
  File "C:\Users\SimonaSimeonova\Documents\projects\be__cronjob\venv\lib\site-packages\asyncpg\connection.py", line 1841, in __enter__
    'cannot perform operation: another operation is in progress')
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress

shsimeonova avatar May 19 '20 08:05 shsimeonova

Well, the general rule is that you are not supposed to share a connection between tasks, because PostgreSQL does not support interleaving queries on a single connection. You are trying to get around the critical section check, but that would only work if all code below the check is synchronous, i.e. you are not returning control to the event loop. That's not true in your case, there's plenty of opportunity between your check and the actual database operation for some other task to enter the critical section.

Is there a particular reason why a pool does not work for you?

elprans avatar May 19 '20 23:05 elprans

Im actually using a pool:

async with db_adapter.get_db().acquire() as conn:
                print('in context')
                job_instance, job_execution = await cls._run_job_internal(job, conn=conn)

db_adapter.get_db() is a wrapper of the gino engine and using acquire means the connection is always taken from the pool https://github.com/python-gino/gino/blob/77e19f58e4295729937a46996c0630020b29b9a3/gino/engine.py#L619

shsimeonova avatar May 20 '20 08:05 shsimeonova

Okay, so i started a separate engine specifically for this part of the application and the error seems to go away only when there are records in my table. Seems like in an empty table the first row gets locked until it gets populated, but parallel tasks try to access it too. So i tried checking if theres records in the table and sleep for a second, but the error occurs again.

shsimeonova avatar May 20 '20 14:05 shsimeonova

@shsimeonova do you mind sharing a little more details on how you did it? I'm facing the very same issue with two SELECT queries

More specifically, I'm using the following code, which is raising the exception you have shown

import asyncio
async with db_pool.acquire() as conn:
            async with conn.transaction():
                res1 = conn.fetchval(
                    """SELECT * FROM ...... ;"""
                )
                res2 = conn.fetchval(
                    """SELECT * FROM ...... ;"""
                )
                await asyncio.gather(row, streak)

Though, I guess that the problem is the fact that I'm sharing the acquired connection ....

lsabi avatar May 28 '20 21:05 lsabi

@shsimeonova I also faced the same issue using gino(with asyncpg driver). The issue for me was acquiring db connection with reuse=True inside tasks.

To elaborate, the tasks which I ran using asyncio.gather had code async with pool.acquire(reuse=True) as conn which somehow caused a connection to be shared between tasks and hence the error.

Acquiring connections with reuse=False fixed it for me.

Also the the crud methods provided by gino, use GinoEngine which under the hood, always acquires connections with reuse=True. So if you have code like User.create(**kwargs) etc it will also fail because, internally it acquires a reusable connection.

For reference https://github.com/python-gino/gino/issues/313

dony585 avatar Mar 24 '21 09:03 dony585

@dony585 Which version of asyncpg are you using? In the latest version (0.23.0) pool.acquire() takes no arguments.

waydegg avatar Jul 23 '21 02:07 waydegg