pyodbc icon indicating copy to clipboard operation
pyodbc copied to clipboard

pyodbc not releasing GIL

Open jasperboekel opened this issue 9 months ago • 0 comments
trafficstars

I came across this issue while I was building a FastAPI application while using aioodbc to fetch data from a sql server database. But I think it is actually fundamentally a pyodbc problem.

So from my understanding I/O operations should release the GIL and therefore parallelizing using multithreading should lead to speed gains. I will, however, provide code snippets that prove otherwise.

Python3.11 pyodbc==5.2.0 unixodbc 2.3.12 ODBC Driver 17 for SQL Server

import concurrent.futures
import datetime as dt
import os
from typing import AsyncIterator, Dict, List, Tuple, Union

import pyodbc
import structlog
import turbodbc

env = os.getenv("env")
log = structlog.stdlib.get_logger("test")

SQL_HOST = ""
SQL_USER = ""
SQL_PASSWORD = ""
SQL_DATABASE = ""


def execute_query(con_params: Dict[str, str], query: str) -> List[tuple]:
    """
    Executes a query on the database using pyodbc.
    Returns the results as a list of tuples.
    """
    connection_string = ";".join([f"{key}={value}" for key, value in con_params.items()])

    try:
        with pyodbc.connect(connection_string) as conn:
            with conn.cursor() as cursor:
                print("connection established")
                start = dt.datetime.now()
                cursor.execute(query)
                rows = cursor.fetchall()
                print(f"Query executed in {dt.datetime.now() - start}")
                return rows
    except Exception as e:
        print(f"Error executing query: {e}")
        return []


def fetch_trades(query):
    DATABASE_CONFIG = {
        "driver": "ODBC Driver 18 for SQL Server",
        "server": SQL_HOST,
        "database": SQL_DATABASE,
        "uid":  rf"{{{SQL_USER}}}",
        "pwd":  rf"{{{SQL_PASSWORD}}}",
        "TrustServerCertificate": "yes"
    }

    """Fetch trade data efficiently using `turbodbc`."""
    with turbodbc.connect(**DATABASE_CONFIG, use_async_io=True) as conn:
        cursor = conn.cursor()
        start = dt.datetime.now()
        cursor.execute(query)
        rows = cursor.fetchallnumpy()
        print(f"Time taken: {dt.datetime.now() - start}")
    return rows


def run_parallel_queries(con_params: Dict[str, str], query: str, num_queries: int = 5) -> List[List[tuple]]:
    """
    Executes multiple instances of the same query in parallel using ThreadPoolExecutor.
    Returns a list containing results from each query execution.
    """
    results = []

    with concurrent.futures.ThreadPoolExecutor(max_workers=num_queries) as executor:
        future_to_query = {executor.submit(execute_query, con_params, query): i for i in range(num_queries)}
        # future_to_query = {executor.submit(fetch_trades, query): i for i in range(num_queries)}

        for future in concurrent.futures.as_completed(future_to_query):
            try:
                results.append(future.result())
            except Exception as e:
                print(f"Error fetching result: {e}")

    return results


if __name__ == "__main__":
    con_params: Dict[str, str] = {
        "DRIVER": "{ODBC Driver 18 for SQL Server}",
        "Server": SQL_HOST,
        "Database": SQL_DATABASE,
        "UID": rf"{{{SQL_USER}}}",
        "PWD": rf"{{{SQL_PASSWORD}}}",
        "UseFMTONLY": "Yes",
    }
    con_params.update(
        {
            "Authentication": "ActiveDirectoryPassword",
            "TrustServerCertificate": "Yes",
            "ColumnEncryption": "Enabled",
            "Encrypt": "Yes",
            "ApplicationIntent": "ReadOnly",  # "ReadWrite"
        }
    )

    query = """
            DECLARE @num_rows INT = 10000; -- Change this to your desired number of rows

            WITH Numbers AS (
                SELECT 1 AS id
                UNION ALL
                SELECT id + 1 FROM Numbers WHERE id < @num_rows
            )
            SELECT
                id,
                DATEADD(DAY, id, '2025-01-01') AS delivery,
                CAST(ROUND(RAND(CHECKSUM(NEWID())) * 100, 2) AS DECIMAL(10,2)) AS value
            FROM Numbers
            OPTION (MAXRECURSION 10000); -- Prevents SQL Server's default recursion limit (100)
    """

    start = dt.datetime.now()
    results = run_parallel_queries(con_params, query, num_queries=5)
    for i, res in enumerate(results):
        print(f"Result set {i+1}: {len(res)} rows")

    print(f" total time: {dt.datetime.now() - start}")

The above code snippet uses multithreading. Running this for num_queries = 5 gives me the following:

connection established connection established connection established connection established connection established Query executed in 0:00:02.210422 Query executed in 0:00:02.210795 Query executed in 0:00:02.258706 Query executed in 0:00:02.263452 Query executed in 0:00:02.240482 Result set 1: 10000 rows Result set 2: 10000 rows Result set 3: 10000 rows Result set 4: 10000 rows Result set 5: 10000 rows total time: 0:00:03.798965

Whereas running this for num_queries = 1 gives me:

connection established Query executed in 0:00:00.138177 Result set 1: 10000 rows total time: 0:00:01.669335

The query time for 5 queries is more than 5 times the query time of 1 query

Furthermore, the following code snippet:

import datetime as dt
import multiprocessing as mp
import os
from typing import Dict, List

import pyodbc

SQL_HOST = ""
SQL_USER = ""
SQL_PASSWORD = ""
SQL_DATABASE = ""


def execute_query(con_params: Dict[str, str], query: str) -> List[tuple]:
    """
    Executes a query on the database using pyodbc in a separate process.
    Returns the results as a list of tuples.
    """
    connection_string = ";".join([f"{key}={value}" for key, value in con_params.items()])

    try:
        with pyodbc.connect(connection_string) as conn:
            with conn.cursor() as cursor:
                print(f"Process {os.getpid()}: Connection established")
                start = dt.datetime.now()
                cursor.execute(query)
                rows = cursor.fetchall()
                elapsed = dt.datetime.now() - start
                print(f"Process {os.getpid()}: Query executed in {elapsed.total_seconds()} seconds")
                return rows
    except Exception as e:
        print(f"Process {os.getpid()}: Error executing query: {e}")
        return []


def run_parallel_queries(con_params: Dict[str, str], query: str, num_queries: int = 5) -> List[List[tuple]]:
    """
    Executes multiple instances of the same query in parallel using multiprocessing.
    Returns a list containing results from each query execution.
    """
    start_time = dt.datetime.now()

    # Use multiprocessing Pool to spawn parallel processes
    with mp.Pool(processes=num_queries) as pool:
        results = pool.starmap(execute_query, [(con_params, query)] * num_queries)

    total_time = dt.datetime.now() - start_time
    print(f"Total execution time: {total_time.total_seconds()} seconds")

    return results


# Example usage
if __name__ == "__main__":
    con_params: Dict[str, str] = {
        "DRIVER": "{ODBC Driver 18 for SQL Server}",
        "Server": SQL_HOST,
        "Database": SQL_DATABASE,
        "UID": rf"{{{SQL_USER}}}",
        "PWD": rf"{{{SQL_PASSWORD}}}",
        "UseFMTONLY": "Yes",
    }
   con_params.update(
          {
              "Authentication": "ActiveDirectoryPassword",
              "TrustServerCertificate": "Yes",
              "ApplicationIntent": "ReadOnly",  # "ReadWrite"
          }
      )

    query = """
            DECLARE @num_rows INT = 10000; -- Change this to your desired number of rows

            WITH Numbers AS (
                SELECT 1 AS id
                UNION ALL
                SELECT id + 1 FROM Numbers WHERE id < @num_rows
            )
            SELECT
                id,
                DATEADD(DAY, id, '2025-01-01') AS delivery,
                CAST(ROUND(RAND(CHECKSUM(NEWID())) * 100, 2) AS DECIMAL(10,2)) AS value
            FROM Numbers
            OPTION (MAXRECURSION 10000); -- Prevents SQL Server's default recursion limit (100)
    """

    results = run_parallel_queries(con_params, query, num_queries=5)

    for i, res in enumerate(results):
        print(f"Result set {i+1}: {len(res)} rows")

The above code snippet implements multiprocessing and. Prints this:

Process 595651: Connection established Process 595653: Connection established Process 595654: Connection established Process 595655: Connection established Process 595652: Connection established Process 595654: Query executed in 0.171187 seconds Process 595651: Query executed in 0.222585 seconds Process 595652: Query executed in 0.163341 seconds Process 595653: Query executed in 0.228181 seconds Process 595655: Query executed in 0.189986 seconds Total execution time: 1.703746 seconds Result set 1: 10000 rows Result set 2: 10000 rows Result set 3: 10000 rows Result set 4: 10000 rows Result set 5: 10000 rows

The output I would expect under multithreading ..

EDIT: All the above was executed on WSL, when I run the code on windows however, I do get the parallelism I would expect.

jasperboekel avatar Jan 31 '25 16:01 jasperboekel