influxdb3-python icon indicating copy to clipboard operation
influxdb3-python copied to clipboard

Batch write example doesn't wait for callbacks and batch client doesn't support async/await

Open jstirnaman opened this issue 1 year ago • 4 comments

Specifications

  • Client Version: v0.7.0
  • InfluxDB Version: Cloud Dedicated
  • Platform: MacOS

Code sample to reproduce problem

Using the example in the README, I now need to insert a time.sleep call of 2-3 seconds to make sure the callbacks have run. I didn't encounter this until recently (using v0.5.0) so I don't know what's changed--maybe the InfluxDB instance is slower.

However, if I try to use async/await with the client, then I get the following error:

'InfluxDBClient3' object does not support the asynchronous context manager protocol`

Example using async/await:

from influxdb_client_3 import(InfluxDBClient3,
                              write_client_options,
                              WriteOptions,
                              InfluxDBError)

class BatchingCallback(object):

    def __init__(self):
        self.status = None

    def success(self, conf: tuple, data=None):
        self.status = f"Success: Data has been successfully written: {data}"
        print(self.status)

    def error(self, conf: tuple, data: str, err: InfluxDBError):
        self.status = f"{err}: Error writing batch: config: {conf}, data: {data}"
        print(self.status)

    def retry(self, conf: tuple, data: str, err: InfluxDBError):
        self.status = f"{err}: Retry error writing batch: config: {conf}, data: {data}"
        print(self.status)

import asyncio

async def main():
    callback = BatchingCallback()

    # Instantiate default WriteOptions for batching
    write_options = WriteOptions()
    wco = write_client_options(success_callback=callback.success,
                               error_callback=callback.error,
                               retry_callback=callback.retry,
                               write_options=write_options)

    # Use the with...as statement to ensure the file is properly closed and resources
    # are released.
    async with InfluxDBClient3(host="your_influxdb_host",
                               database="DATABASE_NAME",
                               token="DATABASE_TOKEN",
                               write_client_options=wco) as client:
        client._client.debug = True  # Enable debug mode
        await client.write_file(file='./data/home-sensor-data.csv',
                                timestamp_column='time',
                                tag_columns=["room"],
                                write_precision='s')

        assert callback.status.startswith("Success"), f"{callback.status}"

# Run the main function
asyncio.run(main())

Expected behavior

Using with and a batch write client, the client should wait for the response and for callbacks to fire.

Actual behavior

Execution continues without callbacks having run, and the client doesn't support async/await.

Additional info

No response

jstirnaman avatar Aug 01 '24 22:08 jstirnaman

The client does not support async. #53 exists for this topic.

Without the async do you still have these issues?

powersj avatar Aug 01 '24 22:08 powersj

The client does not support async. #53 exists for this topic.

Without the async do you still have these issues

Yes, sorry, that's what I meant about using the batch example from the README--to make sure the callback completes, I need to run something after client.write_file. Testing yesterday on Dedicated with a seemingly "normal" network, the required sleep was 2-3 seconds.

jstirnaman avatar Aug 02 '24 12:08 jstirnaman

Also, #53, as written, is specific to queries and it says writes use async inherited from the v2 client, but I wondered whether that was still the case, and if so, how to access it. Do we need to rewrite that issue?

jstirnaman avatar Aug 02 '24 12:08 jstirnaman

@jstirnaman

Also, #53, as written, is specific to queries and it says writes use async inherited from the v2 client, but I wondered whether that was still the case, and if so, how to access it.

Currently, that is not true. The support was removed in this PR: https://github.com/InfluxCommunity/influxdb3-python/pull/62.

Your code should look something like what is shown in https://github.com/InfluxCommunity/influxdb3-python/blob/main/Examples/batching_example.py. You can avoid using asyncio.

Do we need to rewrite that issue?

I will review the status of asyncio in pyarrow and summarize the status in #53.

bednar avatar Aug 06 '24 04:08 bednar

@jstirnaman

Also, #53, as written, is specific to queries and it says writes use async inherited from the v2 client, but I wondered whether that was still the case, and if so, how to access it.

Currently, that is not true. The support was removed in this PR: #62.

Your code should look something like what is shown in https://github.com/InfluxCommunity/influxdb3-python/blob/main/Examples/batching_example.py. You can avoid using asyncio.

@bednar

That's my point, though:

Using the example in the README, I now need to insert a time.sleep call of 2-3 seconds to make sure the callbacks have run.

...the example code doesn't wait for the callbacks to fire.

jstirnaman avatar Aug 20 '24 14:08 jstirnaman

@jstirnaman, oh, sorry for the misunderstanding; now I understand the issue. I'll take a look...

bednar avatar Aug 21 '24 12:08 bednar

Finally, I have enough time to look at this. The problem is caused by the way the client internally uses WriteApi; there isn't an initialized ContextManager for WriteApi (https://docs.python.org/3/reference/compound_stmts.html#the-with-statement), so at the end of ingesting, there is no blocking code to wait for the completion of the ingestion.

We will have to find a solution that maintains the simplicity of using the v3 client, a.k.a. client.write(record=data), along with using a context manager as in the v2 client:

with client.write_api() as write_api:
        write_api.write(record=data)

bednar avatar Aug 23 '24 13:08 bednar

@jstirnaman, I've added an item to our backlog, and @karel-rehor will take a look at it.

bednar avatar Sep 10 '24 07:09 bednar

@jstirnaman, I'm creating PR #112 to cover this issue and the items in item 108.

I've added two integration tests covering client.close() at the end of the context manager. I've also updated batching_example.py to make calls after the context manager has completed. These calls report the final status of class fields updated in the callbacks.

Feedback on these changes would be appreciated. Still to be added to the PR are documentation updates.

karel-rehor avatar Sep 17 '24 12:09 karel-rehor

closed by https://github.com/InfluxCommunity/influxdb3-python/pull/112

bednar avatar Sep 24 '24 11:09 bednar