asynch
asynch copied to clipboard
ProgrammingError on async race condition when using pool execute many
when using execute many to insert multiple entries into clickhouse with the following code to rapidly insert records:
while values_batch in value_batches:
async with self._db_pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.executemany(ch_query, values_batch)
we get the following exeption: ProgrammingError("records have not fetched, fetch all before execute next")
probably also an issue with a single execute
I encountered the same issue.
Same issue
same
I'm hitting this during ClickHouse server updates, when the connection gets dropped by the database. Always have to restart my application to recover from this.
Can confirm it happens with normal execute, not just executemany.
For the time being, while these changes haven't been released, you can check this snippet of code
pool: Pool | None = None
@contextlib.asynccontextmanager
async def with_clickhouse(**kwargs):
global pool
pool = await asynch.create_pool(**kwargs)
try:
yield pool
finally:
pool.close()
await pool.wait_closed()
pool = None
@contextlib.asynccontextmanager
async def session_scope() -> DictCursor:
if pool is None:
raise RuntimeError('out of `with_clickhouse()` scope')
async with pool.acquire() as conn:
try:
async with conn.cursor() as cursor:
yield cursor
finally:
if conn._connection.is_query_executing:
await conn._connection.disconnect()
Fix for correct checking disconnected connections: https://github.com/long2ice/asynch/pull/98