elasticsearch-py
elasticsearch-py copied to clipboard
CLOSE_WAIT connections left unclosed when using library with dataflow
Elasticsearch version (bin/elasticsearch --version
):
7.9
elasticsearch-py
version (elasticsearch.__versionstr__
):
7.9.1
Description of the problem including expected versus actual behavior:
We use this library with Google dataflow (apache beam). When we run dataflow job for couple hours, we start to see lots of connection timeout. By ssh into vm and running netstat
command, it turns out there are multiple CLOSE_WAIT connections and elasticsearch-py is running out of connection pool. Once VM got in this situation it doens't help how long I wait, connections kept in this status and job gets stuck.
Two things I wish to happen:
- elasticsearch-py closes connection properly and there is no CLOSE_WAIT socket for long period of time
- elasticsearch-py somehow detect connection got stuck and request a new connection (not ideal but better than job got stuck)
Steps to reproduce: For us this only becomes problem in production so it might be related to the volume of data you process with dataflow. I'm not sure if it's bug with elasticsearch-py or dataflow to be honest.
Here is a simplified version of dataflow code we're running:
import apache_beam as beam
from apache_beam.metrics import Metrics
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.io.gcp.bigtableio import WriteToBigTable
from apache_beam.utils.timestamp import Duration
from apache_beam.utils.shared import Shared
from elasticsearch import Elasticsearch
class PredictionFn(beam.DoFn):
def __init__(self, project_id, es_handle):
self.es_handle = es_handle
def setup(self):
logging.getLogger('elasticsearch').setLevel(logging.WARN)
self.elasticsearch = self.es_handle.acquire(self.init_elasticsearch)
def init_elasticsearch(self):
es_host = fetch_host()
http_auth = fetch_auth()
return Elasticsearch([es_host], http_auth=http_auth,
timeout=300, sniff_on_connection_fail=True,
retry_on_timeout=True, max_retries=2,
maxsize=5) # 5 connections per client
def process(self, element):
(client_id, device_id, user_id) = element
start = time.time()
# Fetching data from elasticsearch
res = self.elasticsearch.search(index="sessions", body=get_query1())
if len(res["hits"]["hits"]) == 0:
logging.info('no data found for user %s, skipping', user_id)
return []
# some more processing here...
# count number of userids per device
res = self.elasticsearch.search(index="sessions", body=build_query2(res))
# somemore work here
class AddTimestampFn(beam.DoFn):
def process(self, element):
yield beam.window.TimestampedValue(element, int(time.time()))
def run(argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
parser.add_argument(
'--subscription',
required=True,
help=(
'Input PubSub subscription of the form '
'"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>"'))
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
# We use the save_main_session option because DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
pipeline_options.view_as(StandardOptions).streaming = True
pipeline_options.view_as(WorkerOptions).use_public_ips = False
project_id = pipeline_options.view_as(GoogleCloudOptions).project
with beam.Pipeline(options=pipeline_options) as p:
users = (p
| "ReadEvents" >> beam.io.ReadFromPubSub(subscription=known_args.subscription, with_attributes=False)
| "ExtractUsers" >> beam.ParDo(ExtractUserId())
| beam.ParDo(AddTimestampFn())
| beam.WindowInto(beam.window.FixedWindows(5, 0))
| beam.Distinct())
predictions = (users
| 'DoSomeWork' >> (beam.ParDo(DoSomeWorkFn(project_id, Shared()))))
_ = (predictions
| beam.ParDo(PrintFn('WorkResult')))
if __name__ == '__main__':
run()