asyncpg
asyncpg copied to clipboard
FR: cluster connection pool with fail-over
-
please consider inclusion of cluster connection pool with automatic fail-over, i.e. https://gist.github.com/Andrei-Pozolotin/f5be7dec56840428c6245a1cef4a25eb
-
the basic idea is to run per-cluster-member connection ping tasks in background and select next best available connection upon the failure of the current database connection
import psycopg2
from psycopg2 import OperationalError
import threading
import time
from typing import List, Dict, Optional
class ClusterConnectionPool:
def __init__(self, connection_strings: List[str], check_interval: int = 10):
self.connection_strings = connection_strings
self.check_interval = check_interval
self.connections = [self.create_connection(cs) for cs in connection_strings]
self.current_connection_index = 0
self.lock = threading.Lock()
self.health_checks = []
self._start_health_checks()
def create_connection(self, connection_string: str):
try:
conn = psycopg2.connect(connection_string)
return conn
except OperationalError as e:
print(f"Error creating connection: {e}")
return None
def _start_health_checks(self):
for conn in self.connections:
if conn is not None:
thread = threading.Thread(target=self._check_connection, args=(conn,))
thread.daemon = True
thread.start()
self.health_checks.append(thread)
def _check_connection(self, conn):
while True:
try:
with conn.cursor() as cursor:
cursor.execute('SELECT 1')
except OperationalError:
print("Connection failed. Switching to next available connection.")
self._switch_to_next_connection()
time.sleep(self.check_interval)
def _switch_to_next_connection(self):
with self.lock:
self.current_connection_index = (self.current_connection_index + 1) % len(self.connections)
while self.connections[self.current_connection_index] is None:
self.current_connection_index = (self.current_connection_index + 1) % len(self.connections)
def get_connection(self):
with self.lock:
return self.connections[self.current_connection_index]
def execute_query(self, query: str, params: Optional[Dict] = None):
conn = self.get_connection()
if conn is None:
raise Exception("No available connections.")
try:
with conn.cursor() as cursor:
cursor.execute(query, params)
return cursor.fetchall()
except OperationalError as e:
print(f"Error executing query: {e}")
self._switch_to_next_connection()
return self.execute_query(query, params)
# Usage Example:
if __name__ == "__main__":
connection_strings = [
"dbname=test user=postgres password=secret host=localhost port=5432",
"dbname=test user=postgres password=secret host=localhost port=5433",
"dbname=test user=postgres password=secret host=localhost port=5434"
]
pool = ClusterConnectionPool(connection_strings)
# Example query
result = pool.execute_query("SELECT * FROM my_table")
print(result)