elastic-transport-python icon indicating copy to clipboard operation
elastic-transport-python copied to clipboard

Compatibility with Python multiprocessing

Open redbaron4 opened this issue 2 years ago • 4 comments

We have a custom Python script that is used to perform some calculations on elements on an index. During course of calculations, it is necessary to fetch list of timestamps for a search criteria from a backing index. This is done using the helpers.scan() paradigm.

Since search can take long time (we are searching among millions of documents), our idea was to create a multiprocessing.Pool and then use a map to perform the search so that parallel searches can be performed (We use 3 workers).

The scheme worked till Elasticsearch-7.17. After upgrade to Elasticsearch-8.1.0, we updated the script dependency (elasticsearch-py to 8.1.0) and noticed that random searches began failing with a unable to deserialize error.

Unable to deserialize as JSON: b'{"_scroll_id":"FGluY2x1ZGVfY29udGV4dF91dWlkDnF1ZXJ5VGhlbkZldGNoAxZjLW9VV25EMVJEQ09WVzRUbDRjSDVBAAAAAAAAHpsWQ3JRSHo1eEFTbC0zYVM1aXFhUnFldxY1dUhYV1FXeVN1LVZEWVM3TUN3ZmRnAAAAAAAAIWgWZ0lhNDIycXRSeW1DMnlDZ1VYMEJSZxZLdktJUVNlSVFoLWNxaGdzbXZnMFlRAAAAAAAAKToWX3QzaVBjRzBUQW1aMWdJZTI3MzVVdw==","took":4245,"timed_out":false,"_shards":{"total":15,"successful":15,"skipped":12,"failed":0},"hits":{"total":{"value":1609,"relation":"eq"},"max_score":null,"hits":[LONG HIT]}}HTTP/1.1 200 OK\r\nX-elastic-product: Elasticsearch\r\ncontent-type: application/vnd.elasticsearch+json;compatible-with=8\r\ncontent-length: 140256\r\n\r\n{"_scroll_id":"FGluY2x1ZGVfY29udGV4dF91dWlkDnF1ZXJ5VGhlbkZldGNoAxZjLW9VV25EMVJEQ09WVzRUbDRjSDVBAAAAAAAAHp0WQ3JRSHo1eEFTbC0zYVM1aXFhUnFldxZSWjBnV0haWFJ5U2Q5Tm5wd3BBbzhBAAAAAAAAFHsWN1Z5clJ3WTdSc2FOc0c5S3VfNG1IQRZLdktJUVNlSVFoLWNxaGdzbXZnMFlRAAAAAAAAKTsWX3QzaVBjRzBUQW1aMWdJZTI3MzVVdw==","took":4434,"timed_out":false,"_shards":{"total":15,"successful":15,"skipped":12,"failed":0},"hits":{"total":{"value":3243,"relation":"eq"},"max_score":null,"hits":[{"_index":"conn-2022.03.12","_id":"MXv3en8BEUCwvNkwmp_B","_score":null,"_source":{"@timestamp":"2022-03-11T21:51:54.599Z"},"sort":[562]},{"_index":"conn-2022.03.12","_id":"K2r8en8Bn-M7Q9BPMNt3","_score":null,"_source":{"@timestamp":"2022-03-11T21:56:54.730Z"},"sort":[732]},{"_index":"conn-2022.03.12","_id":"f3Ipe38Bn-M7Q

Notice that it seems like the response has body of another response tacked to it (which is probably what is causing the error).

There is no error if we set number of workers to 1 which makes me suspect that transport is not playing well with multiple workers spawned using multiprocessing.

We initialize the elasticsearch instance once globally and then each "worker" uses that instance to perform the search. Any ideas hw we can make this scheme play well with transport library?

UPDATE

We modified the script so that each worker creates its own ElasticSearch() instance at spawn time (in addtion to one created by script). The workers only ever use their own instance and now the script is working correctly.

redbaron4 avatar Mar 14 '22 09:03 redbaron4