BUG: RuntimeWarning: coroutine 'AsyncCursor.execute' was never awaited
Pandas version checks
-
[X] I have checked that this issue has not already been reported.
-
[X] I have confirmed this bug exists on the latest version of pandas.
-
[X] I have confirmed this bug exists on the main branch of pandas.
Reproducible Example
I have the code below. read_sql is method on my DBReader class and it's using pd.read_sql.
import pandas as pd
async def read_sql_async(self, sql, params = None):
t1 = timeit.default_timer()
async with await psycopg.AsyncConnection.connect(self.connect_str) as conn:
#await asyncio.to_thread(pd.read_sql, query, con)
res = await asyncio.get_event_loop().run_in_executor(None, pd.read_sql, sql, conn, params)
print(f'perf: {timeit.default_timer() - t1}')
return res
the asyncio code is below
async def test_thread_asyncio():
db_reader = DataReader()
sql = "select id, name from factor f where f.id = ANY(%s)"
threads = 20
id_partitions = np.array_split(list(range(1, 10000)), threads)
id_partitions = [[p.tolist()] for p in id_partitions]
tasks = {db_reader.read_sql_async(sql, params=p) for p in id_partitions}
for task in asyncio.as_completed(tasks):
try:
df = await asyncio.shield(task)
except Exception as exc:
log.exception(f'error retrieving data')
else:
if df is not None:
print(f'shape: {df.shape}')
When i run this, im getting the following error:
/usr/local/lib/python3.12/site-packages/pandas/io/sql.py:2674: RuntimeWarning: coroutine 'AsyncCursor.execute' was never awaited
...
...
File "/usr/local/lib/python3.12/asyncio/tasks.py", line 631, in _wait_for_one
return f.result() # May raise f.exception().
^^^^^^^^^^
File "/workspaces/v8_pi_lab/src/v8_pi_lab/data/providers/db_provider.py", line 116, in read_sql_async
res = await asyncio.get_event_loop().run_in_executor(None, pd.read_sql, sql, conn, params)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/concurrent/futures/thread.py", line 58, in run
result = self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/pandas/io/sql.py", line 706, in read_sql
return pandas_sql.read_query(
^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/pandas/io/sql.py", line 2739, in read_query
columns = [col_desc[0] for col_desc in cursor.description]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: 'NoneType' object is not iterable
### Issue Description
I need to make parallelize my calls to the DB. Using multiprocess works..but given these are mostly IO bound operations and not CPU bound, i prefer to use threads. When i wrote code using ```concurrent.futures.ThreadPoolExecutor```, and that also works, but as you can see below by the timing, there seems to be some blocking. so i then attempted to use coroutines..but i am getting the above error.
### Expected Behavior
To be able to make parallelized threading calls to pandas.read_sql and read_sql_query without blocking
### Installed Versions
<details>
Cython : None
pytest : 8.2.0
hypothesis : None
sphinx : None
blosc : None
feather : None
xlsxwriter : None
lxml.etree : None
html5lib : None
pymysql : None
psycopg2 : None
jinja2 : 3.1.3
IPython : None
pandas_datareader : None
adbc-driver-postgresql: None
adbc-driver-sqlite : None
bs4 : None
bottleneck : None
dataframe-api-compat : None
fastparquet : None
fsspec : None
gcsfs : None
matplotlib : None
numba : None
numexpr : None
odfpy : None
openpyxl : 3.1.2
pandas_gbq : None
pyarrow : 16.0.0
pyreadstat : None
python-calamine : None
pyxlsb : None
s3fs : None
scipy : None
sqlalchemy : 2.0.29
tables : None
tabulate : None
xarray : None
xlrd : None
zstandard : None
tzdata : 2024.1
qtpy : None
pyqt5 : None
</details>
this is the version that works with ThreadPoolExecutor, but seems to block:
Here is the version that uses Thread Pooling
def read_sql(self, sql, params = None):
t1 = timeit.default_timer()
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
with psycopg.connect(self.connect_str, autocommit=True) as conn:
df = pd.read_sql(sql, con = conn, params = params)
self.log.debug(f'perf: {timeit.default_timer() - t1}')
return df
the concurrent futures code is this:
import concurrent.futures as cf
def test_thread_pool():
db_reader = DataReader()
sql = "select id, name from factor f where f.id = ANY(%s)"
threads = 20
id_partitions = np.array_split(list(range(1, 10000)), threads)
id_partitions = [[p.tolist()] for p in id_partitions]
with cf.ThreadPoolExecutor(max_workers=threads) as exec:
futures = {
exec.submit(db_reader.read_sql, sql, params=p):
p for p in id_partitions
}
for future in cf.as_completed(futures):
ids = futures[future]
try:
df = future.result()
except Exception as exc:
log.exception(f'error retrieving data for: {ids}')
else:
if df is not None:
print(f'shape: {df.shape}')
The output of the debug line from read_sql looks like this:
perf: 0.7313497869981802
perf: 0.8116309550023288
perf: 3.401154975006648
perf: 5.22201336100261
perf: 6.325166654998611
perf: 6.338692951001576
perf: 6.573095380997984
perf: 6.5976604809984565
perf: 6.8282670119951945
perf: 7.291718505999597
perf: 7.4276196580030955
perf: 7.407097272000101
perf: 8.38801568299823
perf: 9.119963648998237
You'll notice that it is incrementing - id have expected it to be all roughly around the same time - so it seems there is some sql blocking. Also, the time gap between the first two threads and 3rd is always about 2-3 seconds - why is that? I've also tried creating a new DbReader instance for each thread..but same effect.
anyone know if the connection or pandas read_sql blocks? or how to solve?
I have not had enough time to look at this, but from the issue title, shouldn't you await this to remove the warning:
tasks = {db_reader.read_sql_async(sql, params=p) for p in id_partitions}
Because db_reader.read_sql_async is an async method.
I have not had enough time to look at this, but from the issue title, shouldn't you
awaitthis to remove the warning:tasks = {db_reader.read_sql_async(sql, params=p) for p in id_partitions}Because
db_reader.read_sql_asyncis an async method.
I am. You can see the await in the for/as_completed loop.
I think the problem is prob an async connection or cusor doesn't work with pandas. If so, if like to understand if pandas blocks ..as per the 2nd example, why am I not able to achieve better concurrency