postgres_scanner
postgres_scanner copied to clipboard
Can't call a procedure with postgres_execute after retrieving an attached table in a transaction
What happens?
In a transaction, when calling a procedure (via postgres_execute) after retrieving from a table there is the following error:
duckdb.duckdb.Error: Failed to execute query "CALL test()": another command is already in progress
To Reproduce
Create procedure and table on postgres instance (db: postgresscanner):
create table log ( comment text );
create procedure test() language sql as $$ SELECT 'hello'; $$;
Launch the following python script:
import duckdb
con = duckdb.connect(database=':memory:')
tr = con.begin()
tr.execute("""ATTACH 'dbname=postgresscanner' AS s (TYPE postgres);""")
tr.sql("""SELECT * FROM s.public.log;""").fetchall()
tr.execute("CALL postgres_execute('s', 'CALL test()')")
tr.commit()
OS:
Archlinux
PostgreSQL Version:
16.2
DuckDB Version:
10.2
DuckDB Client:
Python
Full Name:
Christopher Meier
Affiliation:
HEIG-VD
Have you tried this on the latest main branch?
- [ ] I agree
Have you tried the steps to reproduce? Do they include all relevant data and configuration? Does the issue you report still appear there?
- [X] I agree
After further tests, any call to postgres_execute after a select in a transaction throws the same error:
import duckdb
con = duckdb.connect(database=':memory:')
con.execute("""ATTACH 'dbname=postgresscanner' AS s (TYPE postgres);""")
tr = con.begin()
tr.sql("""SELECT * FROM s.public.log;""").fetchall()
tr.execute("CALL postgres_execute('s', 'SELECT 1;')")
# tr.execute("CALL postgres_execute('s', 'CALL test()')")
# tr.execute("CALL postgres_execute('s', 'CREATE TABLE log2(id INT)')")
tr.commit()
Throws duckdb.duckdb.Error: Failed to execute query "SELECT 1;": another command is already in progress
Thanks for the report! Could you try running SET pg_connection_cache = false to see if that fixes the issue?
Same issue if I add at the start
con = duckdb.connect(database=':memory:')
con.execute("""SET pg_connection_cache = false""")
I am also seeing the another command is already in progress when trying to use transactions.
Use case: I have an attached postgres database with a queue table, I have multiple workers running at once reading/writing/updating/deleting from the queue. I would like to interface directly between postgres and duckdb without having to pop out to Python which is executing the duckdb commands.
My postgres table has been created with:
CREATE TABLE IF NOT EXISTS queues.entity_queue (
id SERIAL PRIMARY KEY,
entity_type TEXT NOT NULL,
entity_id TEXT NOT NULL,
processing BOOLEAN DEFAULT FALSE NOT NULL,
locked_until TIMESTAMP WITH TIME ZONE DEFAULT NULL,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
I attach the remote postgres instance with ATTACH as warehouse.
My overall query flow looks like:
USE warehouse;
BEGIN;
Open a transaction on the attached postgres database
SELECT
*
FROM POSTGRES_QUERY('warehouse', $$
SELECT
DISTINCT entity_id
FROM
queues.entity_queue
WHERE
entity_type = '{entity_type}'
AND (
(processing = FALSE AND ((locked_until < CURRENT_TIMESTAMP) OR locked_until IS NULL))
OR (processing = TRUE AND locked_until < CURRENT_TIMESTAMP)
)
LIMIT {num_entities}
FOR UPDATE
$$)
Fetch num_entities of an entity_type from the queue and mark with FOR UPDATE. We will pull out the entity_ids into a python list of strs for use in the next query
CALL POSTGRES_EXECUTE('warehouse', $$
UPDATE queues.entity_queue
SET
processing = TRUE,
locked_until = NOW() + INTERVAL '2 minutes'
WHERE
entity_id in ({','.join(queue_entity_ids)})
$$)
Use POSTGRES_EXECUTE to execute the command and get another command is already in progress errors with or without setting pg_connection_cache = false. Attempting to do the update not within a POSTGRES_ function with or without pg_experimental_filter_pushdown set to True leads to the whole table attempting to be copied to duckdb afaict?
UPDATE warehouse.queues.entity_queue
SET
processing = TRUE,
locked_until = NOW() + INTERVAL '2 minutes'
WHERE
entity_id in ({','.join(queue_entity_ids)})
COPY (SELECT "id", ctid FROM "queues"."entity_queue" WHERE ctid BETWEEN '(0,0)'::tid AND '(4294967295,0)'::tid) TO STDOUT (FORMAT binary);
I would be happy to hear I am approaching this the incorrect way, but the behavior seems like it should be supported/work
This should now be fixed in https://github.com/duckdb/postgres_scanner/pull/258 - essentially there is some odd internal state in libpq that is not entirely clear to me that leads to this error message - switching from PQsendQuery to PQexec fixes the issue.