elasticsearch-dsl-py icon indicating copy to clipboard operation
elasticsearch-dsl-py copied to clipboard

how can I usge search_after in elasticsearch-dsl 7.1.0

Open linhuisheng opened this issue 4 years ago • 5 comments

how can I usge search_after in elasticsearch-dsl 7.1.0,Can you give me an example!

linhuisheng avatar Mar 23 '20 06:03 linhuisheng

s = Index('index_name').search().query().sort('_id').extra( **{"search_after": ["QeliI24By1Zd8KjZthR9"],"size":10 })

or you can use from_dict

s = Search.from_dict({"query": {"match": {"title": "python"}}})

cj564932957 avatar Apr 14 '20 05:04 cj564932957

The function I wrote below will work.

from __future__ import annotations

from typing import Iterator

from elasticsearch_dsl import Search


def search_after(s: Search, *, sort_values: list[str] | None = None, unique_field: str = "_id", size: int = 1000) -> Iterator:    
    def execute(s: Search) -> Iterator:
        es_response = s.execute()
        if hits := es_response["hits"]["hits"]:
            last_document = hits[-1]

            yield from es_response
            yield from execute(s.update_from_dict({"search_after": last_document["sort"]}))

    if sort_values is None:
        assert s.to_dict().get("sort") is not None, "sort_values not found in search, please sort_values parameter or sort your search."  # noqa: E501

    sort_values = [unique_field] if sort_values is None else sort_values + [unique_field]
    sort_values.extend(s.to_dict().get("sort", [])
    s = s.sort(*sort_values).update_from_dict({"size": size})

    yield from execute(s)

hakancelikdev avatar Oct 22 '21 14:10 hakancelikdev

I didn't do the fancy iterator thing, but I did use Point in Time so the query would be unaffected by refreshes.

# Set up paginated query with search_after and a fixed point_in_time
elasticsearch_py = get_default_elastic_conn()
pit = elasticsearch_py.open_point_in_time(index=NEWS_INDEX, keep_alive="5m")
pit_id = pit["id"]

query_size = 500
search_after = [0]
hits: List[AttrDict[str, Any]] = []
while query_size:
    if hits:
        search_after = hits[-1].meta.sort

    search = (
        Search()
        .extra(size=query_size)
        .extra(pit={"id": pit_id, "keep_alive": "5m"})
        .extra(search_after=search_after)
        .filter(filter_)
        .sort("url.keyword")  # Note you need a unique field to sort on or it may never advance
    )
    response = search.execute()
    hits = [hit for hit in response]

    pit_id = response.pit_id
    query_size = len(hits)
    for hit in hits:
        # Do work with hits

karlkovaciny avatar Mar 29 '22 07:03 karlkovaciny

I would like to open a PR to implement both of these commented suggestions. I plan to add this to the Search class.

def page(self):
    """
    Turn the search into a paged search utilizing Point in Time (PIT) and search_after.
    Returns a generator that will iterate over all the documents matching the query.
    """
    search = self._clone()

    # A sort is required to page search results. We use the optimized default if sort is None.
    # https://www.elastic.co/guide/en/elasticsearch/reference/current/paginate-search-results.html
    if not search._sort:
        search._sort = ['_shard_doc']

    keep_alive = search._params.pop("keep_alive", "30s")
    pit = search._using.open_point_in_time(index=search._index, keep_alive=keep_alive)
    pit_id = pit['id']

    # The index is passed with Point in Time (PIT).
    search._index = None
    search._extra.update(pit={"id": pit['id'], "keep_alive": keep_alive})

    es = get_connection(search._using)

    response = es.search(body=search.to_dict(), **search._params)
    while hits := response["hits"]["hits"]:
        for hit in hits:
            yield self._get_result(hit)

        # If we have fewer hits than our batch size, we know there are no more results.
        if len(hits) < search._params.get('size', 0):
            break

        last_document = hits[-1]
        pit_id = response['pit_id']
        search._extra.update(
            pit={"id": pit_id, "keep_alive": keep_alive}, 
            search_after=last_document["sort"]
        )
        response = es.search(body=search.to_dict(), **search._params)

    # Try to close the PIT unless it is already closed.
    try:
        search._using.close_point_in_time(body={"id": pit_id})
    except NotFoundError:
        pass

reese-allison avatar Sep 23 '22 20:09 reese-allison

I opened a PR https://github.com/elastic/elasticsearch-dsl-py/pull/1623

reese-allison avatar Sep 24 '22 14:09 reese-allison

Any of these methods would help to paginate over the 10000 limit?

lucasvc avatar Jan 20 '23 13:01 lucasvc

Any of these methods would help to paginate over the 10000 limit?

I regularly use the code on PR #1623 to page millions of objects.

reese-allison avatar Jan 20 '23 13:01 reese-allison

I regularly use the code on PR #1623 to page millions of objects.

Then I am doing something wrong with your code, because an execute that returns 19k objects (and can't be read) with your code response is,

{
    "pit_id": <omitted>,
    "took": 168,
    "timed_out": false,
    "_shards": {
        "total": 3292,
        "successful": 2618,
        "skipped": 2618,
        "failed": 674,
        "failures": [
            {
                "shard": 0,
                "index": <omitted>,
                "node": <omitted>,
                "reason": {
                    "type": "illegal_argument_exception",
                    "reason": "Result window is too large, from + size must be less than or equal to: [10000] but was [18993]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level setting."
                }
            },
....
},
    "hits": {
        "total": {
            "value": 0,
            "relation": "eq"
        },
        "max_score": 0.0,
        "hits": []
    }
}

all failures look the same and 0 hits. Any query with less than 10k results works as expected. Is this caused by my Elastic server?

lucasvc avatar Jan 20 '23 14:01 lucasvc

I regularly use the code on PR #1623 to page millions of objects.

Then I am doing something wrong with your code, because an execute that returns 19k objects (and can't be read) with your code response is,

{
    "pit_id": <omitted>,
    "took": 168,
    "timed_out": false,
    "_shards": {
        "total": 3292,
        "successful": 2618,
        "skipped": 2618,
        "failed": 674,
        "failures": [
            {
                "shard": 0,
                "index": <omitted>,
                "node": <omitted>,
                "reason": {
                    "type": "illegal_argument_exception",
                    "reason": "Result window is too large, from + size must be less than or equal to: [10000] but was [18993]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level setting."
                }
            },
....
},
    "hits": {
        "total": {
            "value": 0,
            "relation": "eq"
        },
        "max_score": 0.0,
        "hits": []
    }
}

all failures look the same and 0 hits. Any query with less than 10k results works as expected. Is this caused by my Elastic server?

Can you post your code? It looks like your size might be set too high for the shards to return results.

reese-allison avatar Jan 23 '23 14:01 reese-allison

Can you post your code? It looks like your size might be set too high for the shards to return results.

You are completelly right, before doing the paging I had the lines

    total = search.count()
    search = search[0:total]

where I was trying to fetch all without paging :) Thanks for you help.

lucasvc avatar Jan 24 '23 17:01 lucasvc