elasticsearch-js icon indicating copy to clipboard operation
elasticsearch-js copied to clipboard

Bulk helper semaphore handling results in hanging await for long-running requests (exceeding flushInterval)

Open brentmjohnson opened this issue 3 years ago • 6 comments

🐛 Bug Report

Bulk helper hangs forever when flushInterval is exceeded while iterator is already awaiting semaphore.

To Reproduce

Steps to reproduce the behavior:

  1. Run below code against a test cluster - should complete successfully
  2. Simulate long running server-side operation exceeding configured flushInterval. Multiple ways to do this but one way is to modify the compiled Helpers.js with the following:
client.bulk(Object.assign({}, bulkOptions, { body: bulkBody }), reqOptions, async (err, { body }) => {
await new Promise(resolve => setTimeout(resolve, flushInterval));
if (err) return callback(err, null)
  1. Re-run below code and watch the hanging await caused by onFlushTimeout() invoked on a payload already awaiting semaphore()

Paste your code here:

'use strict'
const util = require("util")
const { Readable } = require('stream');
const { Client } = require('@elastic/elasticsearch');

async function* generator() {
    let i = 0
    while (i < 10) {
        await new Promise(resolve => setTimeout(resolve, 1 * 1000));
        yield { i: i++ };
    }
};

const readableStream = Readable.from(generator());

const elasticClient = new Client({
    [TESTCLUSTER]
});

(async () => {
    const bulkHelper = elasticClient.helpers.bulk({
        flushBytes: 43,
        concurrency: 1,
        datasource: readableStream,
        onDocument(doc) {
            console.log(doc)
            return {
                index: { _index: 'semaphoretest' }
            }
        },
        onDrop(doc) {
            console.log(doc);
        }
    }).catch((err) => {
        console.error(err);
    });

    while (util.inspect(bulkHelper).includes('pending')) {
        await new Promise(resolve => setTimeout(resolve, 1 * 1000));
        console.log('...waiting');
    }

    console.log(await bulkHelper);
})();

Expected behavior

Bulk helper awaits gracefully for queued requests to complete, error, or timeout.

Paste the results here:

{ i: 0 }
...waiting
{ i: 1 }
...waiting
...waiting
...waiting
...[forever]

Your Environment

  • node version: v14.17.6
  • @elastic/elasticsearch version: >=7.15.0
  • os: Linux

brentmjohnson avatar Oct 05 '21 20:10 brentmjohnson

Yes, I can confirm this. It my case this tends to happen only when I push for larger concurrency and/or flushBytes (process' performance should allow for me to). I'm streaming data from large MySQL tables concurrently and bulk helper client hangs seemingly randomly during the 40+ minute indexing process.

node version: v16.13.1 @elastic/elasticsearch version: 7.13.0 OS: macOS/RHEL 8 (confirmed on both)

p1x44r avatar Dec 31 '21 15:12 p1x44r

A workaround that's working for me is to set the flushInterval to Math.pow(2, 31) - 1 and to rely only on the flushBytes system. It's not pretty thought.

YoannMa avatar Mar 30 '22 13:03 YoannMa

We seem to be experiencing the same issue

rprovodenko avatar Apr 07 '22 14:04 rprovodenko

We understand that this might be important for you, but this issue has been automatically marked as stale because it has not had recent activity either from our end or yours. It will be closed if no further activity occurs, please write a comment if you would like to keep this going.

Note: in the past months we have built a new client, that has just landed in master. If you want to open an issue or a pr for the legacy client, you should do that in https://github.com/elastic/elasticsearch-js-legacy

stale[bot] avatar Apr 27 '22 19:04 stale[bot]

(comment to avoid stale)

YoannMa avatar Apr 27 '22 21:04 YoannMa

Heya, I wrote this to reproduce the issue:

const dataset = [
  { user: 'jon', age: 23 },
  { user: 'arya', age: 18 },
  { user: 'tyrion', age: 39 }
]

test('issue #1562', async t => {
  async function handler (req, res) {
    console.log(req.url)
    setTimeout(() => {
      res.writeHead(200, { 'content-type': 'application/json' })
      res.end(JSON.stringify({ errors: false, items: [{}] }))
    }, 1000)
  }

  const [{ port }, server] = await buildServer(handler)
  const client = new Client({ node: `http://localhost:${port}` })

  async function * generator () {
    const data = dataset.slice()
    for (const doc of data) {
      await sleep(1000)
      yield doc
    }
  }

  const result = await client.helpers.bulk({
    datasource: Readable.from(generator()),
    flushBytes: 1,
    flushInterval: 1000,
    concurrency: 1,
    onDocument (doc) {
      return {
        index: { _index: 'test' }
      }
    },
    onDrop (doc) {
      t.fail('This should never be called')
    }
  })

  t.type(result.time, 'number')
  t.type(result.bytes, 'number')
  t.match(result, {
    total: 3,
    successful: 3,
    retry: 0,
    failed: 0,
    aborted: false
  })

  server.stop()
})

My observations:

  • if the flushInterval is exactly the same as the server timeout, the third request is never being sent
  • if the flushInterval is different from the server timeout (doesn't matter if higher or lower), the code works as expected.

It happens both in v7 and v8. This is weird, we'll investigate. Thank you for reporting!

delvedor avatar May 23 '22 16:05 delvedor