elasticsearch-py
elasticsearch-py copied to clipboard
Helpers for `bulk` method such as `async_bulk` sleep in blocking manner, preventing graceful shutdown
We tried using async_bulk and async_streaming_bulk helpers to ingest data into Elasticsearch and they work great, but we've found out that they prevent our code from gracefully shutting down when CTRL+C is pressed.
Example code that sleeps: https://github.com/elastic/elasticsearch-py/blob/e1603f4f25888ca7fee03ee977abafb00d512601/elasticsearch/_async/helpers.py#L249-L251
It would be great to have a way to:
- Either define how the sleep happens by passing sleep function into the client
- Make Elasticsearch client internally cancel all sleeps when the client is closed
Example code that our product calls when using the client: https://github.com/elastic/connectors/blob/main/connectors/es/sink.py (see comment on top of the file on how we collect and ingest data).
In short, we have a SyncOrchestrator class that internally creates two classes:
- Extractor. This class is responsible to provide a generator that will return documents from 3-rd party system and put it into the MemQueue
- Sink. This class is responsible to pick up the data from the MemQueue and send it in batches to Elasticsearch. Right now it just sends it with regular
bulkrequest: https://github.com/elastic/connectors/blob/main/connectors/es/sink.py#L149, but ideally we'd love to switch to a helper from the python client. - MemQueue itself is there to provide backpressure, limiting the number of items that can in the queue AND total size of items that are in the queue - this way we can to some extent control memory usage of the framework
Sorry for the delay Artem. I would be happy to implement the first version, allowing the sleep function to be user-defined. Silently cancelling all sleeps/bulks isn't something we'd want in the general case.
Hi, I and @girolamo-giordano are working on this, just to understand better:
you want a function that is passed in input during the client instantiation, that manage the pressing of CTRL+C and in general how to terminate it.
Hey @LorenzoFasolino and @girolamo-giordano, please tell me if this snippet clarifies things:
import asyncio
async def fake_bulk(sleep=asyncio.sleep):
print("start bulk, sleep 5")
await sleep(5)
async def silently_cancelled_sleep(seconds):
try:
await asyncio.sleep(seconds)
except asyncio.CancelledError:
print("silently ignoring cancellation")
async def main():
print("Calling with silently_cancelled_sleep, Ctrl-C will not raise")
await fake_bulk(silently_cancelled_sleep)
print("Calling with defaults, Ctrl-C will raise")
await fake_bulk()
print("Done")
asyncio.run(main())
The idea is to add this new sleep parameter to to async_bulk to allow using a custom sleep function. Then users can define a function like silently_cancelled_sleep that does not raise when canceled. That said, if cancelling happens outside of the sleep function, the function will still raise, so I'm not sure how useful it is.
@artem-shelkovnikov Would this actually solve your problem? Or do you need something else?
It should solve our problem, yes :)