pq
pq copied to clipboard
connection pool exhausted
Hi, I want to run multiple threads that work on jobs in a queue. If a job fails I want it to retry. This is my code:
pool = ThreadedConnectionPool(4, 4, connection_string)
pq = PQ(pool=pool)
try:
pq.create()
print("created queue table")
except:
print("queue table already exists")
job_queue = pq["queue"]
def worker():
while True:
try:
with job_queue:
for job in job_queue:
if job is None:
print("job is None")
break
# do stuff
except Exception as e:
print(f"worker {current_thread()} died because: {str(e)} -- restarting...")
for i in range(4):
Thread(target=worker).start()
However, I'm getting connection pool exhausted exceptions. My code probably throws exceptions but this ideally it would be able to retry without any issues.
I already looked at the tests but couldn't find the issue.
This does seem rather weird. Try using psql to introspect the active connections using e.g. select * from pg_stat_activity.
I ran \COPY (SELECT * from pg_stat_activity) TO 'pg_stat_activity.csv' CSV HEADER while my application was running (and the exceptions came in). Attached the output file.
Are the connections returned properly to the pool if my code throws an exception?