asyncpg icon indicating copy to clipboard operation
asyncpg copied to clipboard

NOTIFY/LISTEN and connection loss

Open sergeyspatar opened this issue 5 years ago • 7 comments

I need to watch data changes in multiple tables. One option is to add triggers and log changes to another table, which can be polled by the app every few seconds. A better option is to send PG notifications and set up a handler using conn.add_listener(). Do I need a dedicated connection for this approach, as opposed to a connection pool? How should I handle connection loss? If I kill a PG server process, asyncpg won't notice it either until TCP timeout (which can be very long) or until I try to execute another SQL command using the same connection object. Can you please provide a code snippet or a hint how to work with PG notifications more reliably?

sergeyspatar avatar Jan 06 '20 17:01 sergeyspatar

The only option that comes to mind is implementing options similar to libpq's keepalives* and utilize OS controls for connection liveness check.

elprans avatar Jan 07 '20 22:01 elprans

As a quick solution, I came up with a coroutine that periodically checks if connection is alive by executing simple query and reconnects and sets up notification listener again when needed:

import asyncio
import asyncpg


async def asyncpg_listen(
    create_conn, channel, notification_handler, reconnect_handler=None,
    *, conn_check_interval=60, conn_check_timeout=5, reconnect_delay=0):

    while True:
        try:
            conn = await create_conn()
            await conn.add_listener(channel, notification_handler)

            if reconnect_handler is not None:
                reconnect_handler(conn)
        
            while True:
                await asyncio.sleep(conn_check_interval)
                await conn.execute('select 1', timeout=conn_check_timeout)

        except asyncio.CancelledError:
            raise

        except:
            # Probably lost connection
            pass

        if reconnect_delay > 0:
            await asyncio.sleep(reconnect_delay)


async def create_conn():
    return await asyncpg.connect(...)


def notification_handler(conn, pid, channel, payload):
    rowset = await conn.fetch('...')


def reconnect_handler(conn):
    rowset = await conn.fetch('...')


task = asyncio.create_task(
    asyncpg_listen(create_conn, 'channel', notification_handler, reconnect_handler)
)

...

task.cancel()

sergeyspatar avatar Jan 07 '20 23:01 sergeyspatar

Question about your notification_handler.

def notification_handler(conn, pid, channel, payload):
    rowset = await conn.fetch('...')

It seems that conn.add_listener does not work with asynchronous functions. So your notification_handler is synchronous, but you use await inside it. That is a syntax error since you cannot use await inside a non-async function.

So what's the solution? Acquire the event loop inside the notification_handler?

CMCDragonkai avatar Apr 27 '21 08:04 CMCDragonkai

I'd hope in the future that asyncpg can support asynchronous listener callbacks.

CMCDragonkai avatar Apr 27 '21 08:04 CMCDragonkai

So what's the solution?

Use `asyncio.get_event_loop().create_task(async_handler(*args))

elprans avatar Apr 27 '21 15:04 elprans

Take a look at https://github.com/anna-money/asyncpg-listen

andrew222651 avatar May 02 '22 20:05 andrew222651