psycopg2
psycopg2 copied to clipboard
psycopg2.OperationalError: PQexec not allowed during COPY BOTH when running drop_replication_slot
This is a bug tracker
Please complete the following information:
- OS: Docker image - FROM python:3.9-slim-buster
- Psycopg version: psycopg2-binary==2.9.3
- Python version: 3.9
- PostgreSQL version: 11.13
- pip version: pip 22.0.4
Describe the bug
We are trying logical replication on a Postgres RDS, and are running a docker image on ECS to consume the wal2json data. We have a timeout condition to drop the replication slot, before exiting the process, so that we don't run out of storage on the RDS. We tried using cur.consume_stream previously, and we saw that eventually the image would be running for some time without printing anything, and storage on the RDS would explode.
When running
cur.drop_replication_slot('wal2json_test_slot')
We get
File "/usr/local/lib/python3.9/site-packages/psycopg2/extras.py", line 563, in drop_replication_slot
self.execute(command)
psycopg2.OperationalError: PQexec not allowed during COPY BOTH
The slot does seem to drop successfully, so I don't know why we get this error in our logs.
The script:
def consume(msg):
print("Attempting to put to Kinesis")
kinesis_client.put_record(StreamName=STREAM_NAME, Data=msg.payload, PartitionKey="default")
msg.cursor.send_feedback(flush_lsn=msg.data_start)
print("Success")
cur = my_connection.cursor()
try:
print("Attempting to drop old replication slot")
cur.drop_replication_slot('wal2json_test_slot')
print("Old replication slot dropped")
except:
pass
cur.create_replication_slot('wal2json_test_slot', output_plugin = 'wal2json')
cur.start_replication(slot_name = 'wal2json_test_slot', options = {'pretty-print' : 0}, decode= True)
# cur.consume_stream(consume)
last_msg_time = datetime.now()
status_interval = 10.0
print("Starting Replication")
while True:
print("Checking for messages")
if datetime.now() - last_msg_time > timedelta(minutes=10):
print("Not received a message in 10 minutes, dropping replication slot and exiting")
cur.drop_replication_slot('wal2json_test_slot')
sys.exit()
msg = cur.read_message()
if msg:
last_msg_time = msg.send_time
print(f"Message received at: {last_msg_time.isoformat()}")
consume(msg)
else:
print("No message received, waiting")
now = datetime.now()
timeout = status_interval - (now - cur.feedback_timestamp).total_seconds()
sel = select([cur], [], [], max(0, timeout))
Were you able to resolve it? I am having the same issues with dropping a slot.
I'm having same problem
Are you people using the same connection to run queries? I understand postgres doesn't like it and you must use a separate connection which is not interested by the replication.
Or am I not understanding correctly?
Are you people using the same connection to run queries? I understand postgres doesn't like it and you must use a separate connection which is not interested by the replication.
Or am I not understanding correctly?
I have a single connection and that is only for replication I am not running other queries with that
Ah ok, so it's the internal query to cur.drop_replication_slot()
to cause the problem.
Ah ok, so it's the internal query to
cur.drop_replication_slot()
to cause the problem.
I should do it with another connection?
I am not familiar with the replication stack, but I think that you should stop the generator before being able to run any command on the same connection.
You can try stopping the generator, or drop the slot from another connection, but the latter seems more brutal.
I am having the same problem. In my case the query connection and the replication connection are different.