elasticsearch-py
elasticsearch-py copied to clipboard
Preserve input actions when yielding in bulk helpers
This PR solves the issue of the streaming_bulk and parallel_bulk helpers discarding original input actions while exhausting the generator. Somewhat related to #940 and probably solves that issue as well.
Use case
Let's say you have a lot of items, too many for memory, that you want to index and report on. For example; a XML stream-parsing generator, or an unresolved django queryset.
To index those items with the current api, you pass that generator as actions argument along with an expand_action_callback function to construct the actual documents.
Problem is that if you want to take further actions, like reporting, to the items in the generator after being index (or failed), the original input item (xml element or django model in this example) is not yielded back by the bulk helpers, resulting in an exhausted generator with items lost in translation, unable to report on.
The bulk helpers therefore needs to yield each input item along with the current ES action result.
Example
def parse(filename):
# open file, iter parse xml and yield elements
def make_document(element):
return {
"_index": "articles",
"_type": "article",
"_id": element["id"],
"_source": {
"title": element["title"],
# ...
},
}
def index(elements):
for ok, result in parallel_bulk(client, elements, expand_action_callback=make_document):
yield ok, result["index"]["action"] # Yielding original input element for further actions
def report(elements):
for ok, element in elements:
# report on parsed and indexed element
yield element
elements = parse("data.xml")
elements = index(elements)
elements = report(elements)
# elements = something_more ...
collections.deque(elements)
Additionally, this PR adds two new low level bulk helpers, streaming_chunks and parallel_chunks, allowing the chunking function to be customised.
Note: Current versions of streaming_bulk and parallel_bulk uses map() to extend actions, which in python 2 exhausts the generator at once and loads all items in the generator in memory. This PR also fixes this issue as an implementation bonus.
Since this is a community submitted pull request, a Jenkins build has not been kicked off automatically. Can an Elastic organization member please verify the contents of this patch and then kick off a build manually?