postgres_scanner icon indicating copy to clipboard operation
postgres_scanner copied to clipboard

Can't call a procedure with postgres_execute after retrieving an attached table in a transaction

Open c-meier opened this issue 1 year ago • 4 comments

What happens?

In a transaction, when calling a procedure (via postgres_execute) after retrieving from a table there is the following error:

duckdb.duckdb.Error: Failed to execute query "CALL test()": another command is already in progress

To Reproduce

Create procedure and table on postgres instance (db: postgresscanner):

create table log ( comment text );
create procedure test() language sql as $$ SELECT 'hello'; $$;

Launch the following python script:

import duckdb
con = duckdb.connect(database=':memory:')
tr = con.begin()
tr.execute("""ATTACH 'dbname=postgresscanner' AS s (TYPE postgres);""")
tr.sql("""SELECT * FROM s.public.log;""").fetchall()
tr.execute("CALL postgres_execute('s', 'CALL test()')")
tr.commit()

OS:

Archlinux

PostgreSQL Version:

16.2

DuckDB Version:

10.2

DuckDB Client:

Python

Full Name:

Christopher Meier

Affiliation:

HEIG-VD

Have you tried this on the latest main branch?

  • [ ] I agree

Have you tried the steps to reproduce? Do they include all relevant data and configuration? Does the issue you report still appear there?

  • [X] I agree

c-meier avatar Apr 22 '24 16:04 c-meier

After further tests, any call to postgres_execute after a select in a transaction throws the same error:

import duckdb
con = duckdb.connect(database=':memory:')
con.execute("""ATTACH 'dbname=postgresscanner' AS s (TYPE postgres);""")
tr = con.begin()
tr.sql("""SELECT * FROM s.public.log;""").fetchall()
tr.execute("CALL postgres_execute('s', 'SELECT 1;')")
# tr.execute("CALL postgres_execute('s', 'CALL test()')")
# tr.execute("CALL postgres_execute('s', 'CREATE TABLE log2(id INT)')")
tr.commit()

Throws duckdb.duckdb.Error: Failed to execute query "SELECT 1;": another command is already in progress

c-meier avatar May 02 '24 11:05 c-meier

Thanks for the report! Could you try running SET pg_connection_cache = false to see if that fixes the issue?

Mytherin avatar May 02 '24 11:05 Mytherin

Same issue if I add at the start

con = duckdb.connect(database=':memory:')
con.execute("""SET pg_connection_cache  = false""")

c-meier avatar May 02 '24 11:05 c-meier

I am also seeing the another command is already in progress when trying to use transactions.

Use case: I have an attached postgres database with a queue table, I have multiple workers running at once reading/writing/updating/deleting from the queue. I would like to interface directly between postgres and duckdb without having to pop out to Python which is executing the duckdb commands.

My postgres table has been created with:

CREATE TABLE IF NOT EXISTS queues.entity_queue (
    id SERIAL PRIMARY KEY,
    entity_type TEXT NOT NULL,
    entity_id TEXT NOT NULL,
    processing BOOLEAN DEFAULT FALSE NOT NULL,
    locked_until TIMESTAMP WITH TIME ZONE DEFAULT NULL,
    enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);

I attach the remote postgres instance with ATTACH as warehouse.

My overall query flow looks like:

USE warehouse;
BEGIN;

Open a transaction on the attached postgres database

        SELECT
            *
        FROM POSTGRES_QUERY('warehouse', $$
            SELECT
                DISTINCT entity_id
            FROM
                queues.entity_queue
            WHERE
                entity_type = '{entity_type}'
                AND (
                    (processing = FALSE AND ((locked_until < CURRENT_TIMESTAMP) OR locked_until IS NULL)) 
                    OR (processing = TRUE AND locked_until < CURRENT_TIMESTAMP)
                )
            LIMIT {num_entities}
            FOR UPDATE
        $$)

Fetch num_entities of an entity_type from the queue and mark with FOR UPDATE. We will pull out the entity_ids into a python list of strs for use in the next query

CALL POSTGRES_EXECUTE('warehouse', $$
    UPDATE queues.entity_queue
    SET
        processing = TRUE,
        locked_until = NOW() + INTERVAL '2 minutes'
    WHERE
        entity_id in ({','.join(queue_entity_ids)})
$$)

Use POSTGRES_EXECUTE to execute the command and get another command is already in progress errors with or without setting pg_connection_cache = false. Attempting to do the update not within a POSTGRES_ function with or without pg_experimental_filter_pushdown set to True leads to the whole table attempting to be copied to duckdb afaict?

UPDATE warehouse.queues.entity_queue
    SET
        processing = TRUE,
        locked_until = NOW() + INTERVAL '2 minutes'
    WHERE
        entity_id in ({','.join(queue_entity_ids)})
COPY (SELECT "id", ctid FROM "queues"."entity_queue" WHERE ctid BETWEEN '(0,0)'::tid AND '(4294967295,0)'::tid) TO STDOUT (FORMAT binary);

I would be happy to hear I am approaching this the incorrect way, but the behavior seems like it should be supported/work

clayton-cc avatar Jun 05 '24 21:06 clayton-cc

This should now be fixed in https://github.com/duckdb/postgres_scanner/pull/258 - essentially there is some odd internal state in libpq that is not entirely clear to me that leads to this error message - switching from PQsendQuery to PQexec fixes the issue.

Mytherin avatar Sep 03 '24 21:09 Mytherin