elastic-transport-python
elastic-transport-python copied to clipboard
Compatibility with Python multiprocessing
We have a custom Python script that is used to perform some calculations on elements on an index. During course of calculations, it is necessary to fetch list of timestamps for a search criteria from a backing index. This is done using the helpers.scan() paradigm.
Since search can take long time (we are searching among millions of documents), our idea was to create a multiprocessing.Pool and then use a map to perform the search so that parallel searches can be performed (We use 3 workers).
The scheme worked till Elasticsearch-7.17. After upgrade to Elasticsearch-8.1.0, we updated the script dependency (elasticsearch-py to 8.1.0) and noticed that random searches began failing with a unable to deserialize error.
Unable to deserialize as JSON: b'{"_scroll_id":"FGluY2x1ZGVfY29udGV4dF91dWlkDnF1ZXJ5VGhlbkZldGNoAxZjLW9VV25EMVJEQ09WVzRUbDRjSDVBAAAAAAAAHpsWQ3JRSHo1eEFTbC0zYVM1aXFhUnFldxY1dUhYV1FXeVN1LVZEWVM3TUN3ZmRnAAAAAAAAIWgWZ0lhNDIycXRSeW1DMnlDZ1VYMEJSZxZLdktJUVNlSVFoLWNxaGdzbXZnMFlRAAAAAAAAKToWX3QzaVBjRzBUQW1aMWdJZTI3MzVVdw==","took":4245,"timed_out":false,"_shards":{"total":15,"successful":15,"skipped":12,"failed":0},"hits":{"total":{"value":1609,"relation":"eq"},"max_score":null,"hits":[LONG HIT]}}HTTP/1.1 200 OK\r\nX-elastic-product: Elasticsearch\r\ncontent-type: application/vnd.elasticsearch+json;compatible-with=8\r\ncontent-length: 140256\r\n\r\n{"_scroll_id":"FGluY2x1ZGVfY29udGV4dF91dWlkDnF1ZXJ5VGhlbkZldGNoAxZjLW9VV25EMVJEQ09WVzRUbDRjSDVBAAAAAAAAHp0WQ3JRSHo1eEFTbC0zYVM1aXFhUnFldxZSWjBnV0haWFJ5U2Q5Tm5wd3BBbzhBAAAAAAAAFHsWN1Z5clJ3WTdSc2FOc0c5S3VfNG1IQRZLdktJUVNlSVFoLWNxaGdzbXZnMFlRAAAAAAAAKTsWX3QzaVBjRzBUQW1aMWdJZTI3MzVVdw==","took":4434,"timed_out":false,"_shards":{"total":15,"successful":15,"skipped":12,"failed":0},"hits":{"total":{"value":3243,"relation":"eq"},"max_score":null,"hits":[{"_index":"conn-2022.03.12","_id":"MXv3en8BEUCwvNkwmp_B","_score":null,"_source":{"@timestamp":"2022-03-11T21:51:54.599Z"},"sort":[562]},{"_index":"conn-2022.03.12","_id":"K2r8en8Bn-M7Q9BPMNt3","_score":null,"_source":{"@timestamp":"2022-03-11T21:56:54.730Z"},"sort":[732]},{"_index":"conn-2022.03.12","_id":"f3Ipe38Bn-M7Q
Notice that it seems like the response has body of another response tacked to it (which is probably what is causing the error).
There is no error if we set number of workers to 1 which makes me suspect that transport is not playing well with multiple workers spawned using multiprocessing.
We initialize the elasticsearch instance once globally and then each "worker" uses that instance to perform the search. Any ideas hw we can make this scheme play well with transport library?
UPDATE
We modified the script so that each worker creates its own ElasticSearch() instance at spawn time (in addtion to one created by script). The workers only ever use their own instance and now the script is working correctly.
This library is thread-safe but isn't safe to fork/access from multiple different processes so your approach of creating a separate instance per forked process is correct.
What configuration were you using in 7.17 that was working but now isn't in 8.0+?
@sethmlarson The earlier approach was to have a single global ElasticSearch() instance and then fork workers all of which accessed the same global ElasticSearch instance. This worked well till 7.17 but broke after upgrade to 8.1. To fix this, we changed to one instance per worker (in addtion to global instance).
Would it be possible to add information regarding thread/process safety to the manual?
@redbaron4 I understand. Could you copy and paste the code you were using in 7.17 so I can see how the client was configured and try to reproduce the problem?
Sorry I misunderstood your earlier comment.
Here's how I configure the client
ES.Elasticsearch(
connection_string, sniff_on_connection_fail=True,
sniff_on_start=True, sniffer_timeout=5 * timeout,
timeout=timeout, sniff_timeout=timeout // 2,
max_retries=retries, retry_on_timeout=True)
where timeout, retries and connection_string are user supplied options. The configuration line is same for both versions of elasticsearch.
The usage pattern is that this is part of function init_es which is called once at program startup. The resulting instance is stored in global variable and accessible via another function get_es
_ESINSTANCE = None
def init_es(connection_string, timeout=60, retries=5):
timeout = util.cast_int(timeout, lower=10, upper=3600, default=60)
retries = util.cast_int(retries, lower=1, upper=10, default=5)
global _ESINSTANCE
if _ESINSTANCE is None:
_ESINSTANCE = ES.Elasticsearch(
connection_string, sniff_on_connection_fail=True,
sniff_on_start=True, sniffer_timeout=5 * timeout,
timeout=timeout, sniff_timeout=timeout // 2,
max_retries=retries, retry_on_timeout=True)
def get_es():
global _ESINSTANCE
if _ESINSTANCE is None:
raise ValueError("Global ES not initialized. Call init_es first")
return _ESINSTANCE
Any function that needs to use the client calls get_es() and then uses the obtained client. Specifically, each worker may call the following function repeatedly (ESD is elasticsearch_dsl)
def lookup_details(hit_dict, index_name, summ_cols, search_cols,
timestamp_col="@timestamp", fetch_cols=None):
"""
Function that is used to lookup event details (from original index)
:param hit_dict: A dictionary consisting of one "hit"
:param index_name: The name of index to search for details
:param summ_cols: A list of columns that will be picked up from
"hit" to be searched in the index
:param search_cols: A list if columns that will be searched. The length
should be same as `summ_cols`
:param timestamp_col: The name of column in the index which has timestamp
info. Defaults to "@timestamp". Is used to
create filter based on first_seen and last_seen
entries in "hit"
:param fetch_cols: A list of attributes to fetch from the index for
matches. Defaults to None which means all attributes
are fetched.
Returns list of "hits" from searched index
"""
if len(summ_cols) != len(search_cols):
raise ValueError("summ_cols should be same length as search_cols")
es = get_es()
fseen = hit_dict["first_seen"]
lseen = hit_dict["last_seen"]
# Collect values of columns to be queried
qry_cols = [hit_dict[x] for x in summ_cols]
src_srch = ESD.Search(using=es, index=index_name)
if fetch_cols:
src_srch = src_srch.source(includes=fetch_cols)
src_fltr = src_srch.filter(
"range", **{timestamp_col: {"lte": lseen, "gte": fseen}})
# Build rest of search filter
entry_dict = {}
for entry in zip(search_cols, qry_cols):
d = dict([(entry[0], [entry[1], ], )])
entry_dict.update(d)
entry_fltr = None
for d in entry_dict:
if entry_fltr is None:
exist = getattr(src_fltr, "filter")
else:
exist = getattr(entry_fltr, "filter")
entry_fltr = exist("terms", **{d: entry_dict[d]})
return [x.to_dict() for x in entry_fltr.scan()]
This is the function which is getting garbled response when scan() is called and long running searches (5-10s) are being run on 2 or more workers