py-questdb-client
py-questdb-client copied to clipboard
feat: HTTP SenderPool with asyncio support
Overview
A new API to make it easier to work with the sender asynchronously with true parallelism.
from questdb.ingress.pool import SenderPool
with SenderPool('http::addr=localhost:9000;') as pool:
# Buffers can be safely constructed independently,
# also on separate threads.
buf1 = pool.transaction('tbl1')
buf1.row(...)
buf2 = pool.transaction('tbl2')
buf2.dataframe(...)
# parallelism
fut1 = buf1.commit()
fut2 = buf2.commit()
await fut1
await fut2
Details
- The buffer can only accumulate rows for a given table.
- Each
flush
represents an atomic database transaction. - Flush operations can happen in parallel (network operations release the GIL).
- The ownership of the buffer is "moved" to the pool.
- By introducing parallelism alleviates the performance penalties of using ILP/HTTP: Network roundtrip times.
API downsides
- This is a new "parallel" API for more advanced use cases. Creates an API split:
- Server-style applications written in Python would use this new API.
- Simpler "jupyter notebook" style stuff would continue using the existing API.
- Both APIs would continue being supported (since this new one is just a wrapper around the other anyway).
- Shoe-horning these features into the regular API would be a struggle.
- This API drops auto-flushing completely, since auto-flushing creates silent network-blocking operations in the API.
Thread safety and Parallelism
- Once a pool object is created, it can be shared between threads.
- The
pool.next_buffer()
andpool.flush()
methods are thread safe. - This allows for
N:M
concurrency-
N
buffer writer threads -
M
threads responsible for concurrently writing to the database (inside the pool).
-
Tasks
- [ ] Review the API. Is this even a good idea?
- [ ] Split out
SenderPool
into newquestdb.ingress.pool
module. - [ ] Improve test coverage, including
TransactionalBuffer.dataframe(..)
. - [ ] Triple-check thread safety of
pool.next_buffer()
andpool.flush
implementations. - [ ] Skip the indirection and call
asyncio.wrap_future
indef flush()
directly (since that's how it's implemented anyway): https://github.com/python/cpython/blob/8ad88984200b2ccddc0a08229dd2f4c14d1a71fc/Lib/asyncio/base_events.py#L896 - this allows implementing.flush()
in terms of.flush_to_future()
and cut code duplication.
Closes https://github.com/questdb/py-questdb-client/issues/64