elasticsearch-dsl-py
elasticsearch-dsl-py copied to clipboard
how can I usge search_after in elasticsearch-dsl 7.1.0
how can I usge search_after in elasticsearch-dsl 7.1.0,Can you give me an example!
s = Index('index_name').search().query().sort('_id').extra( **{"search_after": ["QeliI24By1Zd8KjZthR9"],"size":10 })
or you can use from_dict
s = Search.from_dict({"query": {"match": {"title": "python"}}})
The function I wrote below will work.
from __future__ import annotations
from typing import Iterator
from elasticsearch_dsl import Search
def search_after(s: Search, *, sort_values: list[str] | None = None, unique_field: str = "_id", size: int = 1000) -> Iterator:
def execute(s: Search) -> Iterator:
es_response = s.execute()
if hits := es_response["hits"]["hits"]:
last_document = hits[-1]
yield from es_response
yield from execute(s.update_from_dict({"search_after": last_document["sort"]}))
if sort_values is None:
assert s.to_dict().get("sort") is not None, "sort_values not found in search, please sort_values parameter or sort your search." # noqa: E501
sort_values = [unique_field] if sort_values is None else sort_values + [unique_field]
sort_values.extend(s.to_dict().get("sort", [])
s = s.sort(*sort_values).update_from_dict({"size": size})
yield from execute(s)
I didn't do the fancy iterator thing, but I did use Point in Time so the query would be unaffected by refreshes.
# Set up paginated query with search_after and a fixed point_in_time
elasticsearch_py = get_default_elastic_conn()
pit = elasticsearch_py.open_point_in_time(index=NEWS_INDEX, keep_alive="5m")
pit_id = pit["id"]
query_size = 500
search_after = [0]
hits: List[AttrDict[str, Any]] = []
while query_size:
if hits:
search_after = hits[-1].meta.sort
search = (
Search()
.extra(size=query_size)
.extra(pit={"id": pit_id, "keep_alive": "5m"})
.extra(search_after=search_after)
.filter(filter_)
.sort("url.keyword") # Note you need a unique field to sort on or it may never advance
)
response = search.execute()
hits = [hit for hit in response]
pit_id = response.pit_id
query_size = len(hits)
for hit in hits:
# Do work with hits
I would like to open a PR to implement both of these commented suggestions. I plan to add this to the Search class.
def page(self):
"""
Turn the search into a paged search utilizing Point in Time (PIT) and search_after.
Returns a generator that will iterate over all the documents matching the query.
"""
search = self._clone()
# A sort is required to page search results. We use the optimized default if sort is None.
# https://www.elastic.co/guide/en/elasticsearch/reference/current/paginate-search-results.html
if not search._sort:
search._sort = ['_shard_doc']
keep_alive = search._params.pop("keep_alive", "30s")
pit = search._using.open_point_in_time(index=search._index, keep_alive=keep_alive)
pit_id = pit['id']
# The index is passed with Point in Time (PIT).
search._index = None
search._extra.update(pit={"id": pit['id'], "keep_alive": keep_alive})
es = get_connection(search._using)
response = es.search(body=search.to_dict(), **search._params)
while hits := response["hits"]["hits"]:
for hit in hits:
yield self._get_result(hit)
# If we have fewer hits than our batch size, we know there are no more results.
if len(hits) < search._params.get('size', 0):
break
last_document = hits[-1]
pit_id = response['pit_id']
search._extra.update(
pit={"id": pit_id, "keep_alive": keep_alive},
search_after=last_document["sort"]
)
response = es.search(body=search.to_dict(), **search._params)
# Try to close the PIT unless it is already closed.
try:
search._using.close_point_in_time(body={"id": pit_id})
except NotFoundError:
pass
I opened a PR https://github.com/elastic/elasticsearch-dsl-py/pull/1623
Any of these methods would help to paginate over the 10000 limit?
Any of these methods would help to paginate over the 10000 limit?
I regularly use the code on PR #1623 to page millions of objects.
I regularly use the code on PR #1623 to page millions of objects.
Then I am doing something wrong with your code, because an execute
that returns 19k objects (and can't be read) with your code response
is,
{
"pit_id": <omitted>,
"took": 168,
"timed_out": false,
"_shards": {
"total": 3292,
"successful": 2618,
"skipped": 2618,
"failed": 674,
"failures": [
{
"shard": 0,
"index": <omitted>,
"node": <omitted>,
"reason": {
"type": "illegal_argument_exception",
"reason": "Result window is too large, from + size must be less than or equal to: [10000] but was [18993]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level setting."
}
},
....
},
"hits": {
"total": {
"value": 0,
"relation": "eq"
},
"max_score": 0.0,
"hits": []
}
}
all failures
look the same and 0 hits.
Any query with less than 10k results works as expected.
Is this caused by my Elastic server?
I regularly use the code on PR #1623 to page millions of objects.
Then I am doing something wrong with your code, because an
execute
that returns 19k objects (and can't be read) with your coderesponse
is,{ "pit_id": <omitted>, "took": 168, "timed_out": false, "_shards": { "total": 3292, "successful": 2618, "skipped": 2618, "failed": 674, "failures": [ { "shard": 0, "index": <omitted>, "node": <omitted>, "reason": { "type": "illegal_argument_exception", "reason": "Result window is too large, from + size must be less than or equal to: [10000] but was [18993]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level setting." } }, .... }, "hits": { "total": { "value": 0, "relation": "eq" }, "max_score": 0.0, "hits": [] } }
all
failures
look the same and 0 hits. Any query with less than 10k results works as expected. Is this caused by my Elastic server?
Can you post your code? It looks like your size might be set too high for the shards to return results.
Can you post your code? It looks like your size might be set too high for the shards to return results.
You are completelly right, before doing the paging I had the lines
total = search.count()
search = search[0:total]
where I was trying to fetch all without paging :) Thanks for you help.