elasticsearch-py icon indicating copy to clipboard operation
elasticsearch-py copied to clipboard

parallel_bulk does not seem to respect chunk_size + parallel_bulk memory leak?

Open nicolaipre opened this issue 5 years ago • 7 comments
trafficstars

  • Elasticsearch server version: 7.5.2
  • Elasticsearch python version: 7.5.1

I am trying to parse files containing millions of lines, and I am using the helpers.parallel_bulk function for indexing data.

However, it seems that parallel_bulk does not respect the chunk_size parameter, and instead fills up my memory with all the data before it starts insertion.

Code excerpt (full script can be found here):

    # Generate data for Elasticsearch to index
    def generator(self):
        yields = 0
        with open(self.file_name, encoding="utf-8") as fp:
            for line in fp:
                match = self.regex.search( line.strip() )

                if match:
                    if not self.dry_run: # expensive to have dry run check here?
                        entry = match.groupdict()
                        entry['leak'] = self.leak_name # use _type instead? save space... / bad for search performance?

                        doc = dict()
                        doc['_index']  = self.index
                        doc['_type']   = self.doc_type # or should this be self.leak_name maybe ?
                        doc['_id']     = hashlib.md5( json.dumps(entry, sort_keys=True).encode("utf-8") ).hexdigest() # Create md5sum based on the entry, and add it to the document we want to index # expensive operation
                        doc['_source'] = entry
                        #pprint(doc)
                        yields += 1
                        print('Yields: ', yields)
                        yield doc

                    self.good += 1
                else:
                    self.bad += 1
                    self.ignored.write("%s" % line)

                status = "[Good: {}, bad: {}]".format(self.good, self.bad)
                Utils.progressbar( (self.good+self.bad), self.total, suffix=status)



    # Initialize
    def run(self):

        # Very good documentation here: https://bluesock.org/~willkg/blog/dev/elasticsearch_part1_index.html
        # and here: https://elasticsearch-py.readthedocs.io/en/master/helpers.html
        # Create Elasticsearch connection
        es = Elasticsearch(
            host               = self.host,
            port               = self.port,
            timeout            = 10,
            request_timeout    = 10,
            max_retries        = 10,
            retry_on_timeout   = True
        )
        es.cluster.health(wait_for_status='yellow')

        try:
            for success, info in helpers.parallel_bulk(
                client             = es,
                actions            = self.generator(),
                #thread_count       = 2,
                chunk_size         = self.chunk_size#,
                #max_chunk_bytes    = 1 * 1024 * 1024,
                #queue_size         = 2,
                #raise_on_exception = True
            ):
                #print(success)
                print('INSERTED!')
                #es.cluster.health(wait_for_status='yellow') # During insertion, wait for cluster to be in good state?
                if not success:
                    #print('[INDEXER]: A document failed:', info)
                    self.failedfile.write("A document failed: %s" % info) # TODO: replace stuff like this with logger

        except Exception as error:
            raise error

The "yields" variable counts all the yields done in the generator loop. If I specify a chunk_size of 500, it was of my understanding that the parallel_bulk function should start indexing once the chunk_size is reached? Instead it continues without inserting until all input is completely read (or at around 400.000 chunks). I have confirmed this by printing on success.

Perhaps I am missing something here, or is this expected behavior?

nicolaipre avatar Feb 05 '20 13:02 nicolaipre

I found the problem with not the chunk_size parameter not being respected - I had forgotten to cast the chunk_size input to an integer, so it was parsed as a string. Unfortunately I did not get an error about this.

For the memory issue; it seems to work fine with streaming_bulk, and is stable at around 50MB memory usage.

For parallel_bulk however, there seems to be memory leaking/filled up.

nicolaipre avatar Feb 05 '20 14:02 nicolaipre

I have had similar issues with parallel_bulk and memory issues and those were also resolved by instead utilizing streaming_bulk in my own process pool.

#1077

AntonFriberg avatar Feb 14 '20 11:02 AntonFriberg

Same issue here with parallel_bulk, the memory usage kept going up for all python processes that run parallel_bulk. Each process reached about 2G (I have 10 child processes) and the system run out of memory. I had to switch back to streaming_bulk and increased the workers count. After that each process only consumed 88M and were stable.

theSeanZhang avatar May 30 '20 11:05 theSeanZhang

It does look like all the documents are loaded into memory to be distributed to workers. Would be better to use memory channels to distribute tasks.

sethmlarson avatar May 30 '20 14:05 sethmlarson

I have the same problem of memory filling up using parallel_bulk. I see in the source code that parallel_bulk is using multiprocess.Threadpool. When I did some experiments I found that the problem doesn't occur when using multiprocess.Pool instead. So it might be a good idea to use multiprocess.Pool in parallel_bulk, especially since Threadpool is not documented

bzijlema avatar Oct 14 '20 08:10 bzijlema

Could you give a sample implementation on how you solved this issue? Using streaming bulk instead of parallel_bulk doesn't seem to solve my memory issues. I am trying to index 60 million documents.

mathijsfr avatar Nov 11 '22 11:11 mathijsfr