influxdb3-python icon indicating copy to clipboard operation
influxdb3-python copied to clipboard

Provide support for flushing the write buffer without closing `InfluxDBClient3`

Open caspian-ireland opened this issue 1 month ago • 0 comments
trafficstars

Use Case

A long running application will often manage a database client as a Singleton, where the instance is shared between multiple services and used throughout the application lifetime. For time-series data, batch insert is often used to improve write performance, which buffers records locally and then writes to the database in batches, reducing the number of network requests. InfluxDbClient3 offers batch inserts, but does not expose an API to flush the write buffer.

For applications that need to take some action when data has been successfully inserted, there is currently no way to ensure the write buffer has completely flushed, other than to close the database client.

For example, I'm working with an application that inserts data into Influx, and then performs queries on that data. To ensure that all the data is returned in the query, I have to close the database client (forcing the write buffer to flush), and then open a new client to run the query.

Take the following pseudo-code. I've omitted closing InfluxDBClient3 for the sake of simplicity:

from influxdb_client_3 import WriteOptions, write_client_options, InfluxDBClient3

write_options = WriteOptions(batch_size=5000)
wco = write_client_options(write_options=write_options)

client = InfluxDBClient3(
    host=host,
    database=database,
    token=influx_write_key,
    write_client_options=wco
)

def process():
    for record in record_producer():
        point = to_point(record)
        client.write_point(point)

    # No guarantee that the records have all been inserted
    signal_records_inserted()

When signal_records_inserted is called, there is no guarantee that the records have all been inserted into the database. I could call client.close(), which will flush the write buffer, but I then need to construct a new InfluxDBClient3 in order to perform further operations against the database.

Expected behavior

It would be great of InfluxDBClient3 offered a method that flushes the write buffer. For example, my previous example would then become:

def process():
    for record in record_producer():
        point = to_point(record)
        client.write_point(point)

    # Flush the write buffer to ensure all pending records have been inserted
    client.flush()
    signal_records_inserted()

Actual behavior

Write buffer is not flushed until client.close() is called. For an application managing InfluxDBClient3 as a singleton which is used throughout the lifetime of an application and only disposed on application shutdown, there is no way for services to know when a particular Point has been inserted into the database.

Additional info

No response

caspian-ireland avatar Sep 23 '25 09:09 caspian-ireland