elasticsearch-dsl-py icon indicating copy to clipboard operation
elasticsearch-dsl-py copied to clipboard

TransportError 429 in Search.scan() / rate-limit scan iteration

Open sgasse opened this issue 3 years ago • 1 comments

Hi!

I have run into a TransportError 429 when trying to fetch around 110k documents by iterating through them with Search.scan(). My workaround is using a wait-time in the iteration.

Here is the code I am using currently:

from time import sleep
from elasticsearch_dsl import Search
from tqdm import tqdm

search = Search(index="my-index").query(project_query)
search.source(["field1", "field2"])

docs = []
for hit in tqdm(search.params(scroll="5m", size=5000).scan(), total=search.count()):
    docs.append(hit.to_dict())
    sleep(3e-3)  # magic number in my environment to avoid TransportError

Without the wait-time, the exact error is:

TransportError(429, '429 Too Many Requests /_search/scroll)

Using the wait-time is a viable workaround for me right now since I am backfilling some data and the job can run for a longer time. In my real code, I am actually submitting processing jobs directly to a ThreadPoolExecutor in the loop where I retrieve documents, so I am not losing so much time.

Looking at the progress bar of tqdm, I can see that there are bursts of progress followed by some waiting, presumably when scan() needs to fetch the next batch of data to yield another hit. If a wait-time in the sense of rate-limiting is really the right approach here, it would probably be best to place that in the code that fetches batches instead of in the code that iterates over results of the batches.

Since I am not so deep in ElasticSearch, I can very well imagine that there is a better way though. Do you have a suggestion how to avoid the error in a cleaner way?

sgasse avatar Oct 15 '21 08:10 sgasse

I took another look and added the sleep() in the scan() function of ES instead. This works well for me and is a lot faster. To avoid having to change the code in site-packages, I copied/updated the scan() function:

def scan(query, index, scroll="5m", size=5000, raise_on_error=True):
    scroll_kwargs = {"params": {"__elastic_client_meta": (("h", "s"),)}}
    client = get_connection()

    # initial search
    resp = client.search(
        body=query,
        scroll=scroll,
        size=size,
        request_timeout=None,
        index=[index],
    )
    scroll_id = resp.get("_scroll_id")

    try:
        while scroll_id and resp["hits"]["hits"]:
            for hit in resp["hits"]["hits"]:
                yield hit

            # Default to 0 if the value isn't included in the response
            shards_successful = resp["_shards"].get("successful", 0)
            shards_skipped = resp["_shards"].get("skipped", 0)
            shards_total = resp["_shards"].get("total", 0)

            # check if we have any errors
            if (shards_successful + shards_skipped) < shards_total:
                shards_message = "Scroll request has only succeeded on %d (+%d skipped) shards out of %d."
                print(
                    shards_message,
                    shards_successful,
                    shards_skipped,
                    shards_total,
                )
                if raise_on_error:
                    raise RuntimeError(
                        scroll_id,
                        shards_message
                        % (
                            shards_successful,
                            shards_skipped,
                            shards_total,
                        ),
                    )

            # Add rate limit
            sleep(0.5)

            resp = client.scroll(
                body={"scroll_id": scroll_id, "scroll": scroll}, **scroll_kwargs
            )
            scroll_id = resp.get("_scroll_id")

    finally:
        if scroll_id:
            client.clear_scroll(
                body={"scroll_id": [scroll_id]},
                ignore=(404,),
                params={"__elastic_client_meta": (("h", "s"),)},
            )

Is there a better way to solve this? If not, I'd raise an issue on the elasticsearch-py project to maybe add a parameter which introduces such a sleep with opt-in. Otherwise I am happy to use another method :)

sgasse avatar Oct 15 '21 14:10 sgasse