elasticsearch-py icon indicating copy to clipboard operation
elasticsearch-py copied to clipboard

Helpers for `bulk` method such as `async_bulk` sleep in blocking manner, preventing graceful shutdown

Open artem-shelkovnikov opened this issue 1 year ago • 5 comments

We tried using async_bulk and async_streaming_bulk helpers to ingest data into Elasticsearch and they work great, but we've found out that they prevent our code from gracefully shutting down when CTRL+C is pressed.

Example code that sleeps: https://github.com/elastic/elasticsearch-py/blob/e1603f4f25888ca7fee03ee977abafb00d512601/elasticsearch/_async/helpers.py#L249-L251

It would be great to have a way to:

  • Either define how the sleep happens by passing sleep function into the client
  • Make Elasticsearch client internally cancel all sleeps when the client is closed

artem-shelkovnikov avatar Mar 22 '24 14:03 artem-shelkovnikov

Example code that our product calls when using the client: https://github.com/elastic/connectors/blob/main/connectors/es/sink.py (see comment on top of the file on how we collect and ingest data).

In short, we have a SyncOrchestrator class that internally creates two classes:

  • Extractor. This class is responsible to provide a generator that will return documents from 3-rd party system and put it into the MemQueue
  • Sink. This class is responsible to pick up the data from the MemQueue and send it in batches to Elasticsearch. Right now it just sends it with regular bulk request: https://github.com/elastic/connectors/blob/main/connectors/es/sink.py#L149, but ideally we'd love to switch to a helper from the python client.
  • MemQueue itself is there to provide backpressure, limiting the number of items that can in the queue AND total size of items that are in the queue - this way we can to some extent control memory usage of the framework

artem-shelkovnikov avatar Mar 22 '24 14:03 artem-shelkovnikov

Sorry for the delay Artem. I would be happy to implement the first version, allowing the sleep function to be user-defined. Silently cancelling all sleeps/bulks isn't something we'd want in the general case.

pquentin avatar May 17 '24 06:05 pquentin

Hi, I and @girolamo-giordano are working on this, just to understand better:

you want a function that is passed in input during the client instantiation, that manage the pressing of CTRL+C and in general how to terminate it.

LorenzoFasolino avatar Nov 25 '24 15:11 LorenzoFasolino

Hey @LorenzoFasolino and @girolamo-giordano, please tell me if this snippet clarifies things:

import asyncio

async def fake_bulk(sleep=asyncio.sleep):
    print("start bulk, sleep 5")
    await sleep(5)

async def silently_cancelled_sleep(seconds):
    try:
        await asyncio.sleep(seconds)
    except asyncio.CancelledError:
        print("silently ignoring cancellation")

async def main():
    print("Calling with silently_cancelled_sleep, Ctrl-C will not raise")
    await fake_bulk(silently_cancelled_sleep)
    print("Calling with defaults, Ctrl-C will raise")
    await fake_bulk()
    print("Done")

asyncio.run(main())

The idea is to add this new sleep parameter to to async_bulk to allow using a custom sleep function. Then users can define a function like silently_cancelled_sleep that does not raise when canceled. That said, if cancelling happens outside of the sleep function, the function will still raise, so I'm not sure how useful it is.

@artem-shelkovnikov Would this actually solve your problem? Or do you need something else?

pquentin avatar Dec 05 '24 13:12 pquentin

It should solve our problem, yes :)

artem-shelkovnikov avatar Dec 05 '24 14:12 artem-shelkovnikov