NOTIFY/LISTEN and connection loss
I need to watch data changes in multiple tables. One option is to add triggers and log changes to another table, which can be polled by the app every few seconds. A better option is to send PG notifications and set up a handler using conn.add_listener(). Do I need a dedicated connection for this approach, as opposed to a connection pool? How should I handle connection loss? If I kill a PG server process, asyncpg won't notice it either until TCP timeout (which can be very long) or until I try to execute another SQL command using the same connection object. Can you please provide a code snippet or a hint how to work with PG notifications more reliably?
The only option that comes to mind is implementing options similar to libpq's keepalives* and utilize OS controls for connection liveness check.
As a quick solution, I came up with a coroutine that periodically checks if connection is alive by executing simple query and reconnects and sets up notification listener again when needed:
import asyncio
import asyncpg
async def asyncpg_listen(
create_conn, channel, notification_handler, reconnect_handler=None,
*, conn_check_interval=60, conn_check_timeout=5, reconnect_delay=0):
while True:
try:
conn = await create_conn()
await conn.add_listener(channel, notification_handler)
if reconnect_handler is not None:
reconnect_handler(conn)
while True:
await asyncio.sleep(conn_check_interval)
await conn.execute('select 1', timeout=conn_check_timeout)
except asyncio.CancelledError:
raise
except:
# Probably lost connection
pass
if reconnect_delay > 0:
await asyncio.sleep(reconnect_delay)
async def create_conn():
return await asyncpg.connect(...)
def notification_handler(conn, pid, channel, payload):
rowset = await conn.fetch('...')
def reconnect_handler(conn):
rowset = await conn.fetch('...')
task = asyncio.create_task(
asyncpg_listen(create_conn, 'channel', notification_handler, reconnect_handler)
)
...
task.cancel()
Question about your notification_handler.
def notification_handler(conn, pid, channel, payload):
rowset = await conn.fetch('...')
It seems that conn.add_listener does not work with asynchronous functions. So your notification_handler is synchronous, but you use await inside it. That is a syntax error since you cannot use await inside a non-async function.
So what's the solution? Acquire the event loop inside the notification_handler?
I'd hope in the future that asyncpg can support asynchronous listener callbacks.
So what's the solution?
Use `asyncio.get_event_loop().create_task(async_handler(*args))
Take a look at https://github.com/anna-money/asyncpg-listen