asyncpg icon indicating copy to clipboard operation
asyncpg copied to clipboard

Listener callback that accesses database does not complete fetch

Open theo-brown opened this issue 4 years ago • 3 comments

  • asyncpg version: 0.24.0
  • PostgreSQL version: 12.8
  • Do you use a PostgreSQL SaaS? If so, which? Can you reproduce the issue with a local PostgreSQL install?: Local PostgreSQL install
  • Python version: 3.9.1
  • Platform: Linux Mint
  • Do you use pgbouncer?: No
  • Did you install asyncpg with pip?: Yes
  • If you built asyncpg locally, which version of Cython did you use?: N/A
  • Can the issue be reproduced under both asyncio and uvloop?: Have not tested with uvloop yet

When a row in my table is inserted or updated, I need a listener to pull data from the table and perform operations with it. The callback is successfully triggered on updates and inserts, but within the callback the database cannot be accessed. A new connection is successfully acquired from the pool (line [1]) but the fetch operation does not complete (line [2]).

Is this an issue with my code, or is it a limitation of asyncpg?

Sorry for an enormous code example - it's in a microservice application with a number of Python applications that all access the database so this was about as minimal I could make things while keeping it vaguely resembling the existing program structure.

import asyncio
import asyncpg
from os import getenv
from dotenv import load_dotenv
import json


load_dotenv("../../.env")


class DatabaseInterface:
    """
    Multiple different applications need to access the database, and will be performing similar operations.
    Methods and properties to do with interacting with the database are all put into the DatabaseInterface class,
    with each application creating a new instance of the class.
    """
    def __init__(self):
        self.pool = None
        self.listeners = []

    async def connect(self):
        self.pool = await asyncpg.create_pool(port=int(getenv("POSTGRES_PORT")),
                                              user=getenv("POSTGRES_USER"),
                                              password=getenv("POSTGRES_PASSWORD"),
                                              database=getenv("POSTGRES_DB"))

    async def create_schema(self):
        async with self.pool.acquire() as connection:
            await connection.execute("""
                                     DROP TABLE IF EXISTS table1;
                                     
                                     CREATE TABLE table1 (key INTEGER,
                                                          value INTEGER,
                                                          PRIMARY KEY (key));
                                                                        
                                     CREATE OR REPLACE FUNCTION notify_mychannel()
                                     RETURNS TRIGGER AS
                                     $$
                                     DECLARE
                                         payload TEXT;
                                     BEGIN
                                         -- This is a simplified payload. In the actual program, 
                                         -- the payload contains a key and an event label
                                         payload := json_build_object('key', NEW.key);
                                         PERFORM pg_notify('mychannel', payload);
                                         RETURN NULL;
                                     END
                                     $$
                                     LANGUAGE plpgsql;
                                  
                                    CREATE TRIGGER update_value
                                      AFTER UPDATE OF value ON table1  
                                      FOR EACH ROW
                                      EXECUTE PROCEDURE notify_mychannel();
                                
                                    CREATE TRIGGER new_value
                                      AFTER INSERT ON table1
                                      FOR EACH ROW
                                      EXECUTE PROCEDURE notify_mychannel();
                                    """)

    async def add_listener(self, channel, callback):
        """
        This wrapper function saves the connection in self.listeners, so that it is not garbage-collected.
        """
        connection = await self.pool.acquire()
        await connection.add_listener(channel, callback)
        self.listeners.append({'connection': connection,
                               'channel': channel,
                               'callback': callback})

    async def upsert(self, key: int, value: int):
        async with self.pool.acquire() as connection:
            await connection.execute("INSERT INTO table1(key, value)"
                                     " VALUES ($1, $2)"
                                     " ON CONFLICT (key)"
                                     " DO UPDATE SET value = $2;",
                                     key, value)

    async def fetch(self, key: int):
        async with self.pool.acquire() as connection:  # [1]
            record = await connection.fetch("SELECT key, value FROM table1" 
                                            " WHERE key = $1;")  # [2]
        return record['value']


class ExampleApplication:
    def __init__(self):
        self.database_interface = DatabaseInterface()

    async def start(self):
        await self.database_interface.connect()
        await self.database_interface.create_schema()
        await self.database_interface.add_listener('mychannel', self.callback)

    async def callback(self, connection: asyncpg.Connection,
                       pid: int, channel: str, payload: str):
        """
        This is an example callback function. In the actual app, the payload contains a key and event value,
        which, depending on the event, should trigger fetches from multiple different tables in the database,
        return data which will be used in other async operations.
        """
        payload_json = json.loads(payload)
        value = await self.database_interface.fetch(payload_json['key']) # This is the line that does not complete
        print(f"Value {value}") # This line is never run


async def main():
    app = ExampleApplication()
    await app.start()
    await app.database_interface.upsert(1, 10)


if __name__ == "__main__":
    asyncio.run(main())

theo-brown avatar Aug 22 '21 13:08 theo-brown

What I think is happening is that the connection [1] is released back to the pool before [2] happens.

theo-brown avatar Aug 26 '21 01:08 theo-brown

Could the issue be to do with the fact that the pool is running in the main event loop, rather than in a sub-loop of the listener?

theo-brown avatar Aug 26 '21 01:08 theo-brown