pandas icon indicating copy to clipboard operation
pandas copied to clipboard

BUG: RuntimeWarning: coroutine 'AsyncCursor.execute' was never awaited

Open dss010101 opened this issue 1 year ago • 3 comments

Pandas version checks

  • [X] I have checked that this issue has not already been reported.

  • [X] I have confirmed this bug exists on the latest version of pandas.

  • [X] I have confirmed this bug exists on the main branch of pandas.

Reproducible Example

I have the code below.  read_sql is method on my DBReader class and it's using pd.read_sql.

import pandas as pd

async def read_sql_async(self, sql, params = None):
  t1 = timeit.default_timer()

  async with await psycopg.AsyncConnection.connect(self.connect_str) as conn:
     #await asyncio.to_thread(pd.read_sql, query, con)
     res = await asyncio.get_event_loop().run_in_executor(None, pd.read_sql, sql, conn, params)

  print(f'perf: {timeit.default_timer() - t1}')
  return res

the asyncio code is below

async def test_thread_asyncio():
   db_reader = DataReader()
   sql = "select id, name from factor f where f.id = ANY(%s)"
   threads = 20
   id_partitions = np.array_split(list(range(1, 10000)), threads)
   id_partitions = [[p.tolist()] for p in id_partitions]
   tasks = {db_reader.read_sql_async(sql, params=p) for p in id_partitions}

   for task in asyncio.as_completed(tasks):
      try:
         df = await asyncio.shield(task)
      except Exception as exc:
         log.exception(f'error retrieving data')
      else:
         if df is not None:
            print(f'shape: {df.shape}')   

When i run this, im getting the following error:

/usr/local/lib/python3.12/site-packages/pandas/io/sql.py:2674: RuntimeWarning: coroutine 'AsyncCursor.execute' was never awaited
...
...
  File "/usr/local/lib/python3.12/asyncio/tasks.py", line 631, in _wait_for_one
    return f.result()  # May raise f.exception().
           ^^^^^^^^^^
  File "/workspaces/v8_pi_lab/src/v8_pi_lab/data/providers/db_provider.py", line 116, in read_sql_async
    res = await asyncio.get_event_loop().run_in_executor(None, pd.read_sql, sql, conn, params)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/pandas/io/sql.py", line 706, in read_sql
    return pandas_sql.read_query(
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/pandas/io/sql.py", line 2739, in read_query
    columns = [col_desc[0] for col_desc in cursor.description]
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: 'NoneType' object is not iterable


### Issue Description

I need to make parallelize my calls to the DB.  Using multiprocess works..but given these are mostly IO bound operations and not CPU bound, i prefer to use threads.  When i wrote code using ```concurrent.futures.ThreadPoolExecutor```, and that also works, but as you can see below by the timing, there seems to be some blocking.  so i then attempted to use coroutines..but i am getting the above error.


### Expected Behavior

To be able to make parallelized threading calls to pandas.read_sql and read_sql_query without blocking 

### Installed Versions

<details>
Cython                : None
pytest                : 8.2.0
hypothesis            : None
sphinx                : None
blosc                 : None
feather               : None
xlsxwriter            : None
lxml.etree            : None
html5lib              : None
pymysql               : None
psycopg2              : None
jinja2                : 3.1.3
IPython               : None
pandas_datareader     : None
adbc-driver-postgresql: None
adbc-driver-sqlite    : None
bs4                   : None
bottleneck            : None
dataframe-api-compat  : None
fastparquet           : None
fsspec                : None
gcsfs                 : None
matplotlib            : None
numba                 : None
numexpr               : None
odfpy                 : None
openpyxl              : 3.1.2
pandas_gbq            : None
pyarrow               : 16.0.0
pyreadstat            : None
python-calamine       : None
pyxlsb                : None
s3fs                  : None
scipy                 : None
sqlalchemy            : 2.0.29
tables                : None
tabulate              : None
xarray                : None
xlrd                  : None
zstandard             : None
tzdata                : 2024.1
qtpy                  : None
pyqt5                 : None

</details>

dss010101 avatar May 03 '24 19:05 dss010101

this is the version that works with ThreadPoolExecutor, but seems to block:

Here is the version that uses Thread Pooling

def read_sql(self, sql, params = None):
  t1 = timeit.default_timer()
  with warnings.catch_warnings():
	 warnings.simplefilter("ignore", UserWarning)
	 with psycopg.connect(self.connect_str, autocommit=True) as conn:
		df = pd.read_sql(sql, con = conn, params = params)
	 
	 self.log.debug(f'perf: {timeit.default_timer() - t1}')
	 return df

the concurrent futures code is this:

import concurrent.futures as cf

def test_thread_pool():
   db_reader = DataReader()
   sql = "select id, name from factor f where f.id = ANY(%s)"
   threads = 20
   id_partitions = np.array_split(list(range(1, 10000)), threads)
   id_partitions = [[p.tolist()] for p in id_partitions]
   with cf.ThreadPoolExecutor(max_workers=threads) as exec:
      futures = {
         exec.submit(db_reader.read_sql, sql, params=p):
           p for p in id_partitions
      }
      
      for future in cf.as_completed(futures):
         ids = futures[future]
         try:
            df = future.result()
         except Exception as exc:
            log.exception(f'error retrieving data for: {ids}')
         else:
            if df is not None:
               print(f'shape: {df.shape}')   

The output of the debug line from read_sql looks like this:

perf: 0.7313497869981802
perf: 0.8116309550023288
perf: 3.401154975006648
perf: 5.22201336100261
perf: 6.325166654998611
perf: 6.338692951001576
perf: 6.573095380997984
perf: 6.5976604809984565
perf: 6.8282670119951945
perf: 7.291718505999597
perf: 7.4276196580030955
perf: 7.407097272000101
perf: 8.38801568299823
perf: 9.119963648998237

You'll notice that it is incrementing - id have expected it to be all roughly around the same time - so it seems there is some sql blocking. Also, the time gap between the first two threads and 3rd is always about 2-3 seconds - why is that? I've also tried creating a new DbReader instance for each thread..but same effect.

anyone know if the connection or pandas read_sql blocks? or how to solve?

dss010101 avatar May 03 '24 19:05 dss010101

I have not had enough time to look at this, but from the issue title, shouldn't you await this to remove the warning:

tasks = {db_reader.read_sql_async(sql, params=p) for p in id_partitions}

Because db_reader.read_sql_async is an async method.

Aloqeely avatar May 05 '24 21:05 Aloqeely

I have not had enough time to look at this, but from the issue title, shouldn't you await this to remove the warning:

tasks = {db_reader.read_sql_async(sql, params=p) for p in id_partitions}

Because db_reader.read_sql_async is an async method.

I am. You can see the await in the for/as_completed loop.

I think the problem is prob an async connection or cusor doesn't work with pandas. If so, if like to understand if pandas blocks ..as per the 2nd example, why am I not able to achieve better concurrency

dss010101 avatar May 05 '24 23:05 dss010101