threads.js
threads.js copied to clipboard
Tasks stuck and pool blocked
Hello, I'm having this weird behaviour where tasks are queued but the function itself is not running, and runningTasks just end up pilling up... Keep in mind that I have not changed anything in my code..
console.log('1')
this[pool].queue(async worker => {
console.log('2')
let m = worker[method]
console.log(m)
if (!m) return "Method doesn't exist";
console.log('3')
q = await m(item)
console.log('4')
return q;
})
console.log('5')
await this[pool].completed()
console.log('6')
return q;
and from the worker file:
const workerFunctions= {
async gen(c) {
console.log('FUNCTION RUNNING')
..................
}
...........
}
expose(workerFunctions)
Behaviour: 1 2 [Function] 3 5
The thing is, the exposed function is not being executed at all (FUNCTION RUNNING is never logged) and the pool just ends up having more and more runningTasks pilling up....
Hey @aya1337. Have you tried to enable debug logging?
Sorry, closed wrong issue 🙊
Hey @aya1337. Have you tried to enable debug logging?
https://threads.js.org/usage-advanced#debug-logging
I didn't try debug logging, no idea how that went past my head Had to seperate the functions exposed into 3 types of pools and its now working just fine. I couldn't wait because the service was down for so many people haha
I might try to check out the reason out when I have time, I'll let you know if its anything serious Thank you!
Let’s close this issue for now then. Would be great if you can share any news on this you might come up with 👍
Hello, it has happened again... I used debugging and this is what is happening (Keep in mind that the same code above is used and the same scenario as above is happening)
Debugging just the pool:
threads:pool:PoolOne Queueing task #1... +0ms
threads:pool:PoolOne Attempt de-queueing a task in order to run it... +1ms
threads:pool:PoolOne Running task #1 on worker #1... +1ms
and its stuck after.
If i terminate the entire pool forcefully and make a new one and then queue a task: Debugging everything:
threads:master:thread-utils Terminating worker +0ms
threads:master:spawn Initializing new thread +17m
threads:master:messages Message from worker before finishing initialization: { type: 'init', exposed: { type: 'module', methods: [ 'testfunc' ] } } +17m
threads:pool:PoolOne Queueing task #1... +0ms
threads:pool:PoolOne Attempt de-queueing a task in order to run it... +1ms
threads:pool:PoolOne Running task #1 on worker #1... +1ms
threads:master:messages Sending command to run function to worker: { type: 'run', uid: 3, method: 'testfunc', args: [ [ [Object] ] ] } +19m
This is the pool:
WorkerPool {
eventSubject: MulticastSubject {
_subscriber: [Function],
_observers: Set { [SubscriptionObserver] }
},
initErrors: [],
isClosing: false,
nextTaskID: 2,
taskQueue: [],
debug: [Function: debug],
options: { name: 'PoolOne', size: 1 },
workers: [ { init: [Promise], runningTasks: [Array] } ], //runningTasks.length = 1
eventObservable: Observable { _subscriber: [Function] }
}
Hey @aya1337. Sorry, I just don't have a lot of time right now and I don't have a hot lead yet why that happens.
Maybe someone else can help 🙋♂️
Hey @aya1337. Last weekend I just released a pool task completion bug fix (see #271). Maybe your issue was related to that one? Maybe you can try again with the latest version :)
Any updates, @aya1337?
@andywer I was actually typing it haha. Thanks for the update. It unfortunately did not solve the issue. Tasks keep pilling up endlessly, the workers all have runningTasks that never execute. No exposed function ever gets called even (First thing it does is console log pid).
I tried to find a code to 100% reproduce it but I had some weird results. If I await anything before queueing (even if Im not passing the result to the queue), the behaviour occurs. If theres no awaiting before queueing, it works as expected (unless i wrap queueing into a function?). Perhaps theres a certain concept I'm unaware of?
poolOne.terminate(true)
const { spawn, Pool, Worker } = require("threads");
poolOne = Pool(() => spawn(new Worker("./poolOne"), { timeout: 120000 }),{name: "poolOne",size: 1});
const asyncStuff = [...(await asyncFunction())] //Unused
const suppliedArray = [] //Or even: const suppliedArray = asyncStuff
const res = [];
poolOne.queue(async worker => {
try {
console.log('1')
const result = await worker.exposedFunction(suppliedArray)
console.log('2')
return res.push(result);
} catch(e) {
return false;
}
});
await poolOne.completed();
return res;
- Only 1 is console logged.
- runningTasks start pilling up endlessly
- 2 is never logged. poolOne is not completed
I tried to wrap the whole queuing and waiting for completion into an async function. It surprisingly worked when testing (not too sure about how it would be in production)
poolOne.terminate(true)
const { spawn, Pool, Worker } = require("threads");
poolOne = Pool(() => spawn(new Worker("./poolOne"), { timeout: 120000 }),{name: "poolOne",size: 1});
const asyncStuff = [...(await asyncFunction())]
const wrapped = async (suppliedArray) => {
const res = [];
poolOne.queue(async worker => {
try {
console.log('1')
const result = await worker.exposedFunction(suppliedArray)
console.log('2')
return res.push(result);
} catch(e) {
return false;
}
});
await poolOne.completed();
return res;
}
return wrapped(asyncStuff) //or even: return wrapped([])
- 1 and 2 were logged.
- Exposed function was executed and finished
- runningTasks was emptied after
Super weird.
Damn race conditions. But I might just have had a valuable train of thought:
Maybe the message to run the function in the worker is sent before the worker has set up its message handler. Just had a quick look at the code and we don't make sure that the spawn()
calls have actually finished before dispatching tasks.
Without a pool that's not an issue as you need the async result of spawn()
to do anything with the worker, so you have to wait. The pool constructor, however, needs to be synchronous and thus it initializes the workers in the background.
We need to make sure that's finished before we start dispatching tasks. I will prepare a PR later!
On a second thought… We do wait for every worker to finish initialization before dispatching tasks, though:
https://github.com/andywer/threads.js/blob/9fa13af4a99d11a844899f022590139d6fb101ac/src/master/pool.ts#L173
Will need to keep thinking…
Edit: It is unlikely, but maybe it is the way that the workers initialize. We send the init
message first and then we actually subscribe to incoming messages. I mean, those two steps happen only microseconds apart from each other in the same function – I would assume message passing between threads takes longer than that, but maybe I am wrong and this is the problem.
Let me prepare a PR later 😉
I believe you are right. I just got confused on how the same behaviour happened even when I didnt use the variable where I awaited const asyncStuff = [...(await asyncFunction())]
. At one point, I thought it was unrelated to this package (because why would a variable thats never used when queueing would cause such thing).
As for spawning, I actually await like 2 seconds ms => new Promise(res => setTimeout(res, ms));
after initializing the pool (i just didnt include it in the code) but you might be correct. I'll wait for the PR and will let you know.
Thanks for giving me a bit of your time to answer. Really appreciate it
Wanted to write the potential fix when I realized that we do set up the critical message handlers in the worker before we send the init message. So that can't be it then.
I have some more questions about the test scenario you posted in the beginning, though:
- Are you using the latest version of threads.js?
- Did you run it using the
tiny-worker
fallback? - How many workers did the pool spawn – the default (= number of CPU threads)? That would help debugging the debug log.
@andywer
- Latest version https://github.com/andywer/threads.js/commit/9fa13af4a99d11a844899f022590139d6fb101ac
- Yes, I'm using tiny-worker fallback (sorry for failing to mention that)
- I tried the default (4), tried 8, even tried just 1 (for the debugging above, i used only 1 -
size: 1
-)
@andywer is there a fix for this? I am having the same (or perhaps similar) issue like @anna-rmrf.
Here is my code (from a mocha test):
it('RUN - Can create multiple concurrent players', async () => {
console.log(`\x1b[34m Creating player pool \x1b[0m`);
playersPool = Pool(() => spawn(new Worker(`../scripts/player-setup`)), MAXPLAYERS);
console.log(`\x1b[34m Player pool created \x1b[0m`);
//const playerThread = await spawn(new Worker(`../scripts/player-setup`));
let battleMessage = await new Promise(async (resolve, reject) => {
const ws = new WSClient(WebSocket, socketUrl, ['GiveUp']);
ws.on('msg', async (message) => {
// const message = JSON.parse(e.data);
console.log(`WS Message:`, message);
if (message.subject === 'GiveUp') {
ws.close();
await playersPool.completed();
resolve(message);
}
});
console.log(`\x1b[34m Submitting worker method \x1b[0m`);
playersPool.queue(async worker => {
await worker.executeRandomTest({
playerName: 'TommyBoy',
environment: environment
});
console.log(`\x1b[34m executeRandomTest invoked \x1b[0m`);
});
console.log(`\x1b[34m Worker method submitted \x1b[0m`);
/* await playerThread.executeRandomTest({
playerName: 'TommyBoy',
environment: environment
}); */
});
console.log(`${battleMessage}`);
await playersPool.terminate();
expect(1).to.equal(1);
});
The stuff I have commented out (spawning a single thread and then running executeRandomTest
is what works when a pool is not used. So I know that executeRandomTest
works, just not with the thread pool.
Here is the output (DEBUG was on). It got stuck after printing the final message:
...
Player setup completed successfully
✓ RUN - Can be deployed (Mock Only)
threads:master:spawn Initializing new thread +0ms
threads:master:messages Message from worker before finishing initialization: {
type: 'init',
exposed: {
type: 'module',
methods: [ 'executePvPBattle', 'executeBotBattle', 'executeRandomTest' ]
}
} +0ms
threads:master:spawn Initializing new thread +2s
threads:master:messages Message from worker before finishing initialization: {
type: 'init',
exposed: {
type: 'module',
methods: [ 'executePvPBattle', 'executeBotBattle', 'executeRandomTest' ]
}
} +2s
✓ RUN - Can set up players for battle (3655ms)
Creating player pool
threads:master:spawn Initializing new thread +2s
threads:master:spawn Initializing new thread +4ms
threads:master:spawn Initializing new thread +4ms
threads:master:spawn Initializing new thread +7ms
threads:master:spawn Initializing new thread +5ms
threads:master:spawn Initializing new thread +12ms
threads:master:spawn Initializing new thread +9ms
threads:master:spawn Initializing new thread +9ms
threads:master:spawn Initializing new thread +4ms
threads:master:spawn Initializing new thread +14ms
Player pool created
Submitting worker method
threads:pool:1 Queueing task #1... +0ms
threads:pool:1 Attempt de-queueing a task in order to run it... +1ms
Worker method submitted
threads:pool:1 Running task #1 on worker #1... +4ms
threads:pool:1 Task #1 failed +10s
threads:pool:1 Error while initializing pool worker: Error: Timeout: Did not receive an init message from worker after 10000ms. Make sure the worker calls expose().
at Timeout._onTimeout (/usr/local/src/backend/node_modules/threads/dist/master/spawn.js:35:53)
at listOnTimeout (internal/timers.js:554:17)
at processTimers (internal/timers.js:497:7) +1ms
threads:pool:1 Task #1 errored: Error: Timeout: Did not receive an init message from worker after 10000ms. Make sure the worker calls expose().
at Timeout._onTimeout (/usr/local/src/backend/node_modules/threads/dist/master/spawn.js:35:53)
at listOnTimeout (internal/timers.js:554:17)
at processTimers (internal/timers.js:497:7) +14ms
threads:pool:1 Attempt de-queueing a task in order to run it... +25ms
threads:pool:1 Task queue is empty +0ms
threads:master:messages Message from worker before finishing initialization: {
type: 'init',
exposed: {
type: 'module',
methods: [ 'executePvPBattle', 'executeBotBattle', 'executeRandomTest' ]
}
} +12s
threads:master:messages Message from worker before finishing initialization: {
type: 'init',
exposed: {
type: 'module',
methods: [ 'executePvPBattle', 'executeBotBattle', 'executeRandomTest' ]
}
} +24ms
threads:master:messages Message from worker before finishing initialization: {
type: 'init',
exposed: {
type: 'module',
methods: [ 'executePvPBattle', 'executeBotBattle', 'executeRandomTest' ]
}
} +62ms
threads:master:messages Message from worker before finishing initialization: {
type: 'init',
exposed: {
type: 'module',
methods: [ 'executePvPBattle', 'executeBotBattle', 'executeRandomTest' ]
}
} +81ms
threads:master:messages Message from worker before finishing initialization: {
type: 'init',
exposed: {
type: 'module',
methods: [ 'executePvPBattle', 'executeBotBattle', 'executeRandomTest' ]
}
} +83ms
threads:master:messages Message from worker before finishing initialization: {
type: 'init',
exposed: {
type: 'module',
methods: [ 'executePvPBattle', 'executeBotBattle', 'executeRandomTest' ]
}
} +51ms
threads:master:messages Message from worker before finishing initialization: {
type: 'init',
exposed: {
type: 'module',
methods: [ 'executePvPBattle', 'executeBotBattle', 'executeRandomTest' ]
}
} +54ms
threads:master:messages Message from worker before finishing initialization: {
type: 'init',
exposed: {
type: 'module',
methods: [ 'executePvPBattle', 'executeBotBattle', 'executeRandomTest' ]
}
} +33ms
threads:master:messages Message from worker before finishing initialization: {
type: 'init',
exposed: {
type: 'module',
methods: [ 'executePvPBattle', 'executeBotBattle', 'executeRandomTest' ]
}
} +7ms
threads:master:messages Message from worker before finishing initialization: {
type: 'init',
exposed: {
type: 'module',
methods: [ 'executePvPBattle', 'executeBotBattle', 'executeRandomTest' ]
}
} +44ms
It appears to be timing out before the init on the pool of worker threads is called.
Actually, in looking through the error and where it is coming from (spawn.js), found a nice workaround:
Set THREADS_WORKER_INIT_TIMEOUT to something higher e.g. 20 seconds as the default of 10 seconds is what is tripping up the initialization process.
So, this worked:
THREADS_WORKER_INIT_TIMEOUT=20000 npm test -- -g "RUN"
@ashishchandr70 Great that you found a workaround! Yeah, we should document that somewhere…
Say, do by any chance run tests concurrently? This timeout tend to happen more often when trying to spawn a lot of workers at once.
Hi @andywer I don't run them concurrently. I was just creating the worker pool and then the next command was to use one of the workers to execute an exposed method.