threads.js icon indicating copy to clipboard operation
threads.js copied to clipboard

Question: how to handle long running processes?

Open Christilut opened this issue 4 years ago • 3 comments

I'm just wondering what the recommended way is to tackle the following problem:

I have a long running external process (python) that waits for stdin and outputs on stdout. It doesn't restart every time because the overhead would be far longer than the execution time. I want to launch this process 16 times with child_process.spawn.

Now I'm not sure how I achieve this with Threads, because so far I've only used it with short running processes that exit when done.

I tried to create the process outside the scope in the file where my expose function is, but that ends up with all workers sharing the same child process globally.

Any suggestions?

Christilut avatar Nov 10 '20 13:11 Christilut

Hmm, I'm not even sure threads.js is the right tool for the task you describe. threads.js purpose is to make it easy to run JS/TS code on multiple threads. child_process.spawn() is out of its scope and the question is: What benefit should threads.js provide in this scenario, given that the actual work is already delegated from the main JS thread to child processes.

I can only guess that you would like to be able to call your python program transparently using just function calls and maybe re-use the thread pool.

andywer avatar Nov 10 '20 17:11 andywer

Yeah exactly, I'm using the thread pool for other short tasks and it works great for that. That's why I was hoping to re-use it for these tasks too.

I'll do some experiments and see if I can figure something out.

Christilut avatar Nov 10 '20 18:11 Christilut

This works pretty well:

export class BackgroundThreadManager {
  private readonly pool: Pool<any>
  private readonly workers: cp.ChildProcessWithoutNullStreams[] = []

  constructor(workerPath: string, concurrency: number = -1) {
    if (concurrency === -1) {
      concurrency = cpus().length
    }

    for (let i = 0; i < concurrency; i++) {
      this.workers.push(cp.spawn(workerPath))
    }

    this.pool = Pool(() => null as any, { // Don't load a worker, we only use the thread pool here with our own workers
      concurrency: 1,
      size: concurrency
    })

    log.verbose(`created background thread pool with ${concurrency} workers`)
  }

  async start(data: any[]): Promise<any[]> {
    log.verbose(`started background thread pool execution with ${data.length} messages`)

    const results: any[] = []
    let amountDone: number = 0

    try {
      for (const d of data) {
       // The executor is not used at all, the executor worker is null
        this.pool.queue(async executor => { // eslint-disable-line
          Assert.truthy(this.workers.length) // Should always be at least one available since the last worker that finished pushed it back in

          const worker = this.workers.splice(0, 1)[0]

          const result = await sendDataToWorker(d, worker) // This sends data to the stdin and waits for stdout/stderr

          this.workers.push(worker)
        })
      }

      await this.pool.completed()

      try {
        await this.pool.terminate()
      } catch (err) {
        if (err.message !== `Cannot read property 'Symbol(thread.terminate)' of null`) { // Because our worker is null
          throw err
        }
      }
    } finally {
      await this.killWorkers()
    }

    log.verbose('background thread pool execution complete')

    return results
  }

  async killWorkers() { // pool.terminate() doesn't know about our workers, so kill them manually
    this.workers.forEach(w => w.kill())
  }
}

Sending in null instead of a worker causes an error on terminate but other than that it works really well.

I'm thinking I could create a new worker class that holds all workers the with a terminate function so that the above error doesn't happen and that I don't need to manually kill the workers.

Christilut avatar Nov 11 '20 09:11 Christilut