threads.js
threads.js copied to clipboard
Question: how to handle long running processes?
I'm just wondering what the recommended way is to tackle the following problem:
I have a long running external process (python) that waits for stdin and outputs on stdout. It doesn't restart every time because the overhead would be far longer than the execution time. I want to launch this process 16 times with child_process.spawn.
Now I'm not sure how I achieve this with Threads, because so far I've only used it with short running processes that exit when done.
I tried to create the process outside the scope in the file where my expose function is, but that ends up with all workers sharing the same child process globally.
Any suggestions?
Hmm, I'm not even sure threads.js is the right tool for the task you describe. threads.js purpose is to make it easy to run JS/TS code on multiple threads. child_process.spawn() is out of its scope and the question is: What benefit should threads.js provide in this scenario, given that the actual work is already delegated from the main JS thread to child processes.
I can only guess that you would like to be able to call your python program transparently using just function calls and maybe re-use the thread pool.
Yeah exactly, I'm using the thread pool for other short tasks and it works great for that. That's why I was hoping to re-use it for these tasks too.
I'll do some experiments and see if I can figure something out.
This works pretty well:
export class BackgroundThreadManager {
private readonly pool: Pool<any>
private readonly workers: cp.ChildProcessWithoutNullStreams[] = []
constructor(workerPath: string, concurrency: number = -1) {
if (concurrency === -1) {
concurrency = cpus().length
}
for (let i = 0; i < concurrency; i++) {
this.workers.push(cp.spawn(workerPath))
}
this.pool = Pool(() => null as any, { // Don't load a worker, we only use the thread pool here with our own workers
concurrency: 1,
size: concurrency
})
log.verbose(`created background thread pool with ${concurrency} workers`)
}
async start(data: any[]): Promise<any[]> {
log.verbose(`started background thread pool execution with ${data.length} messages`)
const results: any[] = []
let amountDone: number = 0
try {
for (const d of data) {
// The executor is not used at all, the executor worker is null
this.pool.queue(async executor => { // eslint-disable-line
Assert.truthy(this.workers.length) // Should always be at least one available since the last worker that finished pushed it back in
const worker = this.workers.splice(0, 1)[0]
const result = await sendDataToWorker(d, worker) // This sends data to the stdin and waits for stdout/stderr
this.workers.push(worker)
})
}
await this.pool.completed()
try {
await this.pool.terminate()
} catch (err) {
if (err.message !== `Cannot read property 'Symbol(thread.terminate)' of null`) { // Because our worker is null
throw err
}
}
} finally {
await this.killWorkers()
}
log.verbose('background thread pool execution complete')
return results
}
async killWorkers() { // pool.terminate() doesn't know about our workers, so kill them manually
this.workers.forEach(w => w.kill())
}
}
Sending in null instead of a worker causes an error on terminate but other than that it works really well.
I'm thinking I could create a new worker class that holds all workers the with a terminate function so that the above error doesn't happen and that I don't need to manually kill the workers.