Batch write example doesn't wait for callbacks and batch client doesn't support async/await
Specifications
- Client Version: v0.7.0
- InfluxDB Version: Cloud Dedicated
- Platform: MacOS
Code sample to reproduce problem
Using the example in the README, I now need to insert a time.sleep call of 2-3 seconds to make sure the callbacks have run.
I didn't encounter this until recently (using v0.5.0) so I don't know what's changed--maybe the InfluxDB instance is slower.
However, if I try to use async/await with the client, then I get the following error:
'InfluxDBClient3' object does not support the asynchronous context manager protocol`
Example using async/await:
from influxdb_client_3 import(InfluxDBClient3,
write_client_options,
WriteOptions,
InfluxDBError)
class BatchingCallback(object):
def __init__(self):
self.status = None
def success(self, conf: tuple, data=None):
self.status = f"Success: Data has been successfully written: {data}"
print(self.status)
def error(self, conf: tuple, data: str, err: InfluxDBError):
self.status = f"{err}: Error writing batch: config: {conf}, data: {data}"
print(self.status)
def retry(self, conf: tuple, data: str, err: InfluxDBError):
self.status = f"{err}: Retry error writing batch: config: {conf}, data: {data}"
print(self.status)
import asyncio
async def main():
callback = BatchingCallback()
# Instantiate default WriteOptions for batching
write_options = WriteOptions()
wco = write_client_options(success_callback=callback.success,
error_callback=callback.error,
retry_callback=callback.retry,
write_options=write_options)
# Use the with...as statement to ensure the file is properly closed and resources
# are released.
async with InfluxDBClient3(host="your_influxdb_host",
database="DATABASE_NAME",
token="DATABASE_TOKEN",
write_client_options=wco) as client:
client._client.debug = True # Enable debug mode
await client.write_file(file='./data/home-sensor-data.csv',
timestamp_column='time',
tag_columns=["room"],
write_precision='s')
assert callback.status.startswith("Success"), f"{callback.status}"
# Run the main function
asyncio.run(main())
Expected behavior
Using with and a batch write client, the client should wait for the response and for callbacks to fire.
Actual behavior
Execution continues without callbacks having run, and the client doesn't support async/await.
Additional info
No response
The client does not support async. #53 exists for this topic.
Without the async do you still have these issues?
The client does not support async. #53 exists for this topic.
Without the async do you still have these issues
Yes, sorry, that's what I meant about using the batch example from the README--to make sure the callback completes, I need to run something after client.write_file. Testing yesterday on Dedicated with a seemingly "normal" network, the required sleep was 2-3 seconds.
Also, #53, as written, is specific to queries and it says writes use async inherited from the v2 client, but I wondered whether that was still the case, and if so, how to access it. Do we need to rewrite that issue?
@jstirnaman
Also, #53, as written, is specific to queries and it says writes use async inherited from the v2 client, but I wondered whether that was still the case, and if so, how to access it.
Currently, that is not true. The support was removed in this PR: https://github.com/InfluxCommunity/influxdb3-python/pull/62.
Your code should look something like what is shown in https://github.com/InfluxCommunity/influxdb3-python/blob/main/Examples/batching_example.py. You can avoid using asyncio.
Do we need to rewrite that issue?
I will review the status of asyncio in pyarrow and summarize the status in #53.
@jstirnaman
Also, #53, as written, is specific to queries and it says writes use async inherited from the v2 client, but I wondered whether that was still the case, and if so, how to access it.
Currently, that is not true. The support was removed in this PR: #62.
Your code should look something like what is shown in https://github.com/InfluxCommunity/influxdb3-python/blob/main/Examples/batching_example.py. You can avoid using asyncio.
@bednar
That's my point, though:
Using the example in the README, I now need to insert a time.sleep call of 2-3 seconds to make sure the callbacks have run.
...the example code doesn't wait for the callbacks to fire.
@jstirnaman, oh, sorry for the misunderstanding; now I understand the issue. I'll take a look...
Finally, I have enough time to look at this. The problem is caused by the way the client internally uses WriteApi; there isn't an initialized ContextManager for WriteApi (https://docs.python.org/3/reference/compound_stmts.html#the-with-statement), so at the end of ingesting, there is no blocking code to wait for the completion of the ingestion.
We will have to find a solution that maintains the simplicity of using the v3 client, a.k.a. client.write(record=data), along with using a context manager as in the v2 client:
with client.write_api() as write_api:
write_api.write(record=data)
@jstirnaman, I've added an item to our backlog, and @karel-rehor will take a look at it.
@jstirnaman, I'm creating PR #112 to cover this issue and the items in item 108.
I've added two integration tests covering client.close() at the end of the context manager. I've also updated batching_example.py to make calls after the context manager has completed. These calls report the final status of class fields updated in the callbacks.
Feedback on these changes would be appreciated. Still to be added to the PR are documentation updates.
closed by https://github.com/InfluxCommunity/influxdb3-python/pull/112