aiopg icon indicating copy to clipboard operation
aiopg copied to clipboard

Should aiopg work correctly with connection_factory parameter?

Open GregEremeev opened this issue 8 years ago • 13 comments

Hi!

I want to use logical replication that's why I want to use connection_factory=LogicalReplicationConnection (from psycopg2.extras import LogicalReplicationConnection). I tried to debug app and I saw that psycopg2.connect initialized fine but code from method Connection._ready raised exception.

Without this parameter app works fine. Is it a bug?

Python 3.5.2 PostgreSQL (9.5.4) Fedora release 24 (Twenty Four)

async def go():
    async with aiopg.create_pool(dsn, connection_factory=LogicalReplicationConnection) as pool:
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                await cur.execute("SELECT 1")
                async for row in cur:
                    print(row)
Traceback (most recent call last):
  File "1.py", line 27, in <module>
    loop.run_until_complete(go())
  File "/usr/lib64/python3.5/asyncio/base_events.py", line 387, in run_until_complete
    return future.result()
  File "/usr/lib64/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/usr/lib64/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
  File "1.py", line 16, in go
    async with aiopg.create_pool(dsn, connection_factory=LogicalReplicationConnection) as pool:
  File "/home/budulianin/Documents/projects/pypgbus/pypgbus_venv/lib64/python3.5/site-packages/aiopg/utils.py", line 77, in __aenter__
    self._obj = yield from self._coro
  File "/home/budulianin/Documents/projects/pypgbus/pypgbus_venv/lib64/python3.5/site-packages/aiopg/pool.py", line 46, in _create_pool
    yield from pool._fill_free_pool(False)
  File "/home/budulianin/Documents/projects/pypgbus/pypgbus_venv/lib64/python3.5/site-packages/aiopg/pool.py", line 203, in _fill_free_pool
    **self._conn_kwargs)
  File "/home/budulianin/Documents/projects/pypgbus/pypgbus_venv/lib64/python3.5/site-packages/aiopg/utils.py", line 67, in __iter__
    resp = yield from self._coro
  File "/home/budulianin/Documents/projects/pypgbus/pypgbus_venv/lib64/python3.5/site-packages/aiopg/connection.py", line 83, in _connect
    oids = yield from _enable_hstore(conn)
  File "/home/budulianin/Documents/projects/pypgbus/pypgbus_venv/lib64/python3.5/site-packages/aiopg/connection.py", line 39, in _enable_hstore
    """)
  File "/home/budulianin/Documents/projects/pypgbus/pypgbus_venv/lib64/python3.5/site-packages/aiopg/cursor.py", line 113, in execute
    yield from self._conn._poll(waiter, timeout)
  File "/home/budulianin/Documents/projects/pypgbus/pypgbus_venv/lib64/python3.5/site-packages/aiopg/connection.py", line 237, in _poll
    yield from asyncio.wait_for(self._waiter, timeout, loop=self._loop)
  File "/usr/lib64/python3.5/asyncio/tasks.py", line 392, in wait_for
    return fut.result()
  File "/usr/lib64/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/home/budulianin/Documents/projects/pypgbus/pypgbus_venv/lib64/python3.5/site-packages/aiopg/connection.py", line 134, in _ready
    state = self._conn.poll()
psycopg2.ProgrammingError: syntax error

Unclosed connection
connection: <aiopg.connection.Connection object at 0x7f9ef7966f98>

GregEremeev avatar Mar 20 '17 15:03 GregEremeev

Hi We never tested such setup. What about pure psycopg2? is it working? As far as I can see from docs, PG server should be configured appropriately too.

jettify avatar Mar 20 '17 16:03 jettify

What about pure psycopg2? is it working?

Yea, it works fine.

As far as I can see from docs, PG server should be configured appropriately too.

Yea, I know. I did all things right. psycopg2 works but aiopg don't.

In [1]: from psycopg2.extras import LogicalReplicationConnection
In [2]: import psycopg2
In [3]: conn = psycopg2.connect('dbname=test_replic user=budulianin password=budulianin', connection_factory=LogicalReplicationConnection)
   ...: 
In [4]: cur = conn.cursor()
In [5]: cur.start_replication(slot_name ='regression_slot', decode=True)
In [7]: def consume(msg):
   ...:     print(msg.payload, msg.data_start)
   ...:     
In [8]: cur.consume_stream(consume)
{"type":"transaction.begin","xid":"55509","committed":"2017-03-20 16:04:04.256532+03"} 4250379032
{"type":"table","schema":"public","name":"data","change":"INSERT","data":{"text":"lololo1"}} 4250379032
{"type":"transaction.commit","xid":"55509","committed":"2017-03-20 16:04:04.256532+03"} 4250379568

GregEremeev avatar Mar 20 '17 16:03 GregEremeev

Oh, it seems that I can't execute simple queries with this factory The same error

In [1]: from psycopg2.extras import LogicalReplicationConnection
In [2]: import psycopg2
In [3]: conn = psycopg2.connect('dbname=test_replic user=budulianin password=budulianin', connection_factory=LogicalReplicationConnection)
   ...: 
In [4]: cur = conn.cursor()
In [5]: cur.execute("SELECT 1")
---------------------------------------------------------------------------
ProgrammingError                          Traceback (most recent call last)
<ipython-input-5-d8dde3a84962> in <module>()
----> 1 cur.execute("SELECT 1")

ProgrammingError: syntax error

GregEremeev avatar Mar 20 '17 16:03 GregEremeev

But if I try to execute this simple code I'll catch the same exception Without connection_factory parameter this code works fine. I think there is a bug in aiopg. Am I right?

I just execute connection.

async def go():
    async with aiopg.create_pool(dsn, connection_factory=LogicalReplicationConnection) as pool:
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                print('1')

GregEremeev avatar Mar 20 '17 17:03 GregEremeev

aiopg only adds async=True flag, to the connect psycopg2.connect and polls fd, so error should be the same.

jettify avatar Mar 20 '17 17:03 jettify

Actually, exception happened when invoked _enable_hstore from aiopg/connection.py. This function requests info about hstore, but for logical replication it's forbidden request.

Anyway, psycopg2 don't allow use consume_stream method in async mode. When you try to invoke this method, raise exception psycopg2.ProgrammingError: consume_stream cannot be used in asynchronous mode

Unfortunately, the bottom line is you can't use logical decoding via psycopg2/aiopg in async mode.

GregEremeev avatar Mar 21 '17 12:03 GregEremeev

Thanks for update! We need to document this explicitly in aiopg docs.

On Tue, Mar 21, 2017, 14:13 Grigorii Eremeev [email protected] wrote:

Actually, exception happened when invoked _enable_hstore from aiopg/connection.py. This function requests info about hstore, but for logical replication it's forbidden request.

Anyway, psycopg2 don't allow use consume_stream method in async mode. When you try to invoke this method, raise exception psycopg2.ProgrammingError: consume_stream cannot be used in asynchronous mode

Unfortunately, the bottom line is you can't use logical decoding via psycopg2/aiopg in async mode.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/aio-libs/aiopg/issues/287#issuecomment-288060332, or mute the thread https://github.com/notifications/unsubscribe-auth/AANoZ_QMwc_gttvkE_8TcjrT62C5VZ8yks5rn77-gaJpZM4MikAe .

jettify avatar Mar 21 '17 12:03 jettify

@jettify can I help with documentation? Where should this info be in documentation?

GregEremeev avatar Mar 21 '17 13:03 GregEremeev

Sure! Docs contribution very appreciated! Lets add section about into core.rst just after Server-side notifications, separate section about logical replication, that it is supported by psycopg2 but not in aiopg.

jettify avatar Mar 21 '17 21:03 jettify

@Budulianin we are trying to be contributor friendly, feel free to contribute to any aio-libs project

jettify avatar Mar 21 '17 21:03 jettify

I read psycopg2 docs more detail and found that for asynchronous code need to use read_message method instead of consume_stream. It means, that most likely will need to fix code in aiopg to avoid invocation _enable_hstore when was passed connection_factory=LogicalReplicationConnection parameter. I'll try code with read_message and write results later.

GregEremeev avatar Mar 23 '17 21:03 GregEremeev

For _enable_hstore this is an optional parameter for connection http://aiopg.readthedocs.io/en/stable/core.html#aiopg.connect.

Reading the code, unfortunately I do not think aiopg supports ReplicationCursor. I tested and will not fail when using connection_factory=LogicalReplicationConnection, but kwarg is lost and does not do anything (the aiopg Cursor is returned, which does not have ReplicationCursor properties).

zvxr avatar Jul 26 '18 22:07 zvxr

I tested and will not fail when using connection_factory=LogicalReplicationConnection, but kwarg is lost and does not do anything (the aiopg Cursor is returned, which does not have ReplicationCursor properties).

This is wrong. The cursor returned is of LogicalReplicationCursor type. However, this is encapsulated within aiopg's Cursor class and cannot be accessed using the async APIs. However, it still can be accessed by cur.raw

Reading the code, unfortunately I do not think aiopg supports ReplicationCursor.

This is correct, because if I am to access the raw cursor without the async APIs, it definitely defeats the purpose of using aiopg.

isen-ng avatar Sep 09 '19 07:09 isen-ng