asyncpg icon indicating copy to clipboard operation
asyncpg copied to clipboard

FR: cluster connection pool with fail-over

Open Andrei-Pozolotin opened this issue 5 years ago • 1 comments

  1. please consider inclusion of cluster connection pool with automatic fail-over, i.e. https://gist.github.com/Andrei-Pozolotin/f5be7dec56840428c6245a1cef4a25eb

  2. the basic idea is to run per-cluster-member connection ping tasks in background and select next best available connection upon the failure of the current database connection

Andrei-Pozolotin avatar Jan 24 '20 16:01 Andrei-Pozolotin


import psycopg2
from psycopg2 import OperationalError
import threading
import time
from typing import List, Dict, Optional

class ClusterConnectionPool:
    def __init__(self, connection_strings: List[str], check_interval: int = 10):
        self.connection_strings = connection_strings
        self.check_interval = check_interval
        self.connections = [self.create_connection(cs) for cs in connection_strings]
        self.current_connection_index = 0
        self.lock = threading.Lock()
        self.health_checks = []
        self._start_health_checks()

    def create_connection(self, connection_string: str):
        try:
            conn = psycopg2.connect(connection_string)
            return conn
        except OperationalError as e:
            print(f"Error creating connection: {e}")
            return None

    def _start_health_checks(self):
        for conn in self.connections:
            if conn is not None:
                thread = threading.Thread(target=self._check_connection, args=(conn,))
                thread.daemon = True
                thread.start()
                self.health_checks.append(thread)

    def _check_connection(self, conn):
        while True:
            try:
                with conn.cursor() as cursor:
                    cursor.execute('SELECT 1')
            except OperationalError:
                print("Connection failed. Switching to next available connection.")
                self._switch_to_next_connection()
            time.sleep(self.check_interval)

    def _switch_to_next_connection(self):
        with self.lock:
            self.current_connection_index = (self.current_connection_index + 1) % len(self.connections)
            while self.connections[self.current_connection_index] is None:
                self.current_connection_index = (self.current_connection_index + 1) % len(self.connections)

    def get_connection(self):
        with self.lock:
            return self.connections[self.current_connection_index]

    def execute_query(self, query: str, params: Optional[Dict] = None):
        conn = self.get_connection()
        if conn is None:
            raise Exception("No available connections.")
        try:
            with conn.cursor() as cursor:
                cursor.execute(query, params)
                return cursor.fetchall()
        except OperationalError as e:
            print(f"Error executing query: {e}")
            self._switch_to_next_connection()
            return self.execute_query(query, params)

# Usage Example:
if __name__ == "__main__":
    connection_strings = [
        "dbname=test user=postgres password=secret host=localhost port=5432",
        "dbname=test user=postgres password=secret host=localhost port=5433",
        "dbname=test user=postgres password=secret host=localhost port=5434"
    ]
    pool = ClusterConnectionPool(connection_strings)

    # Example query
    result = pool.execute_query("SELECT * FROM my_table")
    print(result)

ljluestc avatar Sep 07 '24 19:09 ljluestc