Should aiopg work correctly with connection_factory parameter?
Hi!
I want to use logical replication that's why I want to use connection_factory=LogicalReplicationConnection (from psycopg2.extras import LogicalReplicationConnection). I tried to debug app and I saw that psycopg2.connect initialized fine but code from method Connection._ready raised exception.
Without this parameter app works fine. Is it a bug?
Python 3.5.2 PostgreSQL (9.5.4) Fedora release 24 (Twenty Four)
async def go():
async with aiopg.create_pool(dsn, connection_factory=LogicalReplicationConnection) as pool:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT 1")
async for row in cur:
print(row)
Traceback (most recent call last):
File "1.py", line 27, in <module>
loop.run_until_complete(go())
File "/usr/lib64/python3.5/asyncio/base_events.py", line 387, in run_until_complete
return future.result()
File "/usr/lib64/python3.5/asyncio/futures.py", line 274, in result
raise self._exception
File "/usr/lib64/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(None)
File "1.py", line 16, in go
async with aiopg.create_pool(dsn, connection_factory=LogicalReplicationConnection) as pool:
File "/home/budulianin/Documents/projects/pypgbus/pypgbus_venv/lib64/python3.5/site-packages/aiopg/utils.py", line 77, in __aenter__
self._obj = yield from self._coro
File "/home/budulianin/Documents/projects/pypgbus/pypgbus_venv/lib64/python3.5/site-packages/aiopg/pool.py", line 46, in _create_pool
yield from pool._fill_free_pool(False)
File "/home/budulianin/Documents/projects/pypgbus/pypgbus_venv/lib64/python3.5/site-packages/aiopg/pool.py", line 203, in _fill_free_pool
**self._conn_kwargs)
File "/home/budulianin/Documents/projects/pypgbus/pypgbus_venv/lib64/python3.5/site-packages/aiopg/utils.py", line 67, in __iter__
resp = yield from self._coro
File "/home/budulianin/Documents/projects/pypgbus/pypgbus_venv/lib64/python3.5/site-packages/aiopg/connection.py", line 83, in _connect
oids = yield from _enable_hstore(conn)
File "/home/budulianin/Documents/projects/pypgbus/pypgbus_venv/lib64/python3.5/site-packages/aiopg/connection.py", line 39, in _enable_hstore
""")
File "/home/budulianin/Documents/projects/pypgbus/pypgbus_venv/lib64/python3.5/site-packages/aiopg/cursor.py", line 113, in execute
yield from self._conn._poll(waiter, timeout)
File "/home/budulianin/Documents/projects/pypgbus/pypgbus_venv/lib64/python3.5/site-packages/aiopg/connection.py", line 237, in _poll
yield from asyncio.wait_for(self._waiter, timeout, loop=self._loop)
File "/usr/lib64/python3.5/asyncio/tasks.py", line 392, in wait_for
return fut.result()
File "/usr/lib64/python3.5/asyncio/futures.py", line 274, in result
raise self._exception
File "/home/budulianin/Documents/projects/pypgbus/pypgbus_venv/lib64/python3.5/site-packages/aiopg/connection.py", line 134, in _ready
state = self._conn.poll()
psycopg2.ProgrammingError: syntax error
Unclosed connection
connection: <aiopg.connection.Connection object at 0x7f9ef7966f98>
Hi
We never tested such setup. What about pure psycopg2? is it working? As far as I can see from docs, PG server should be configured appropriately too.
What about pure psycopg2? is it working?
Yea, it works fine.
As far as I can see from docs, PG server should be configured appropriately too.
Yea, I know. I did all things right. psycopg2 works but aiopg don't.
In [1]: from psycopg2.extras import LogicalReplicationConnection
In [2]: import psycopg2
In [3]: conn = psycopg2.connect('dbname=test_replic user=budulianin password=budulianin', connection_factory=LogicalReplicationConnection)
...:
In [4]: cur = conn.cursor()
In [5]: cur.start_replication(slot_name ='regression_slot', decode=True)
In [7]: def consume(msg):
...: print(msg.payload, msg.data_start)
...:
In [8]: cur.consume_stream(consume)
{"type":"transaction.begin","xid":"55509","committed":"2017-03-20 16:04:04.256532+03"} 4250379032
{"type":"table","schema":"public","name":"data","change":"INSERT","data":{"text":"lololo1"}} 4250379032
{"type":"transaction.commit","xid":"55509","committed":"2017-03-20 16:04:04.256532+03"} 4250379568
Oh, it seems that I can't execute simple queries with this factory The same error
In [1]: from psycopg2.extras import LogicalReplicationConnection
In [2]: import psycopg2
In [3]: conn = psycopg2.connect('dbname=test_replic user=budulianin password=budulianin', connection_factory=LogicalReplicationConnection)
...:
In [4]: cur = conn.cursor()
In [5]: cur.execute("SELECT 1")
---------------------------------------------------------------------------
ProgrammingError Traceback (most recent call last)
<ipython-input-5-d8dde3a84962> in <module>()
----> 1 cur.execute("SELECT 1")
ProgrammingError: syntax error
But if I try to execute this simple code I'll catch the same exception Without connection_factory parameter this code works fine. I think there is a bug in aiopg. Am I right?
I just execute connection.
async def go():
async with aiopg.create_pool(dsn, connection_factory=LogicalReplicationConnection) as pool:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
print('1')
aiopg only adds async=True flag, to the connect psycopg2.connect and polls fd, so error should be the same.
Actually, exception happened when invoked _enable_hstore from aiopg/connection.py.
This function requests info about hstore, but for logical replication it's forbidden request.
Anyway, psycopg2 don't allow use consume_stream method in async mode.
When you try to invoke this method, raise exception psycopg2.ProgrammingError: consume_stream cannot be used in asynchronous mode
Unfortunately, the bottom line is you can't use logical decoding via psycopg2/aiopg in async mode.
Thanks for update! We need to document this explicitly in aiopg docs.
On Tue, Mar 21, 2017, 14:13 Grigorii Eremeev [email protected] wrote:
Actually, exception happened when invoked _enable_hstore from aiopg/connection.py. This function requests info about hstore, but for logical replication it's forbidden request.
Anyway, psycopg2 don't allow use consume_stream method in async mode. When you try to invoke this method, raise exception psycopg2.ProgrammingError: consume_stream cannot be used in asynchronous mode
Unfortunately, the bottom line is you can't use logical decoding via psycopg2/aiopg in async mode.
— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/aio-libs/aiopg/issues/287#issuecomment-288060332, or mute the thread https://github.com/notifications/unsubscribe-auth/AANoZ_QMwc_gttvkE_8TcjrT62C5VZ8yks5rn77-gaJpZM4MikAe .
@jettify can I help with documentation? Where should this info be in documentation?
Sure! Docs contribution very appreciated! Lets add section about into core.rst just after Server-side notifications, separate section about logical replication, that it is supported by psycopg2 but not in aiopg.
@Budulianin we are trying to be contributor friendly, feel free to contribute to any aio-libs project
I read psycopg2 docs more detail and found that for asynchronous code need to use read_message method instead of consume_stream. It means, that most likely will need to fix code in aiopg to avoid invocation _enable_hstore when was passed connection_factory=LogicalReplicationConnection parameter. I'll try code with read_message and write results later.
For _enable_hstore this is an optional parameter for connection http://aiopg.readthedocs.io/en/stable/core.html#aiopg.connect.
Reading the code, unfortunately I do not think aiopg supports ReplicationCursor. I tested and will not fail when using connection_factory=LogicalReplicationConnection, but kwarg is lost and does not do anything (the aiopg Cursor is returned, which does not have ReplicationCursor properties).
I tested and will not fail when using connection_factory=LogicalReplicationConnection, but kwarg is lost and does not do anything (the aiopg Cursor is returned, which does not have ReplicationCursor properties).
This is wrong. The cursor returned is of LogicalReplicationCursor type. However, this is encapsulated within aiopg's Cursor class and cannot be accessed using the async APIs. However, it still can be accessed by cur.raw
Reading the code, unfortunately I do not think aiopg supports ReplicationCursor.
This is correct, because if I am to access the raw cursor without the async APIs, it definitely defeats the purpose of using aiopg.