workerpool
workerpool copied to clipboard
Feat: Add abort signal support for worker execution
Adds support for Abort Signals
through instances of AbortController
for each task execution context. Functionality is enabled through the useAbortSignal
flag on the execOptions
object provided to the Worker
exec
method.
Example:
// file: myWorker.js
var workerpool = require('../../');
function abort(signal) {
return new Promise(function (_resolve, reject) {
signal.addEventListener("abort", () => {
workerpool.workerEmit({
state: signal.aborted ? 'term': 'active',
message: signal.reason
});
});
reject("rejecting");
});
}
workerpool.worker({
abort: abort
});
var pool = createPool(__dirname + '/workers/abort.js');
pool.exec('abort', [],{
on: (payload) => {
console.log("recieved event: ", payload).
},
useAbortSignal: true
})
.catch(function (err) {
console.log("recieved error: ", err);
});
Next Steps
To further the value of abort signaling there should be functionality added to Promise.timeout wrapper created during time of task execution to allow for further bridging to the given worker instance which is executing the task to explicitly call abort
on the signal
created when the task began it's execution to allow for cleanup on a given task timing out from the main
thread context.
If the functionality within this PR is merged the above outline can be turned into an issue and mentioned here for tracking.
Thanks for your PR Josh, it looks well taken care of 👌 .
Can you explain what the use cases are for this new abort signal API? I suppose it is to for example cancel running fetch actions, or close a DB connection? I try to understand what it's capabilities and limitations are Like: I suppose it can be used in both browser and node.js? And I suppose it will not work when the task is running a synchronous operation, only async?
Before this feature is ready to use it indeed has to work with both the .cancel()
and .timeout()
methods of the Promise
implementation of workerpool.
A real world example would help me understand how this is to be used, can you give an example?
On a side note: I expect to reply only after the weekend, I'm away for a couple of days.
A real world example would help me understand how this is to be used, can you give an example?
Sure! The main use cases assuming the current implementation is to allow for cleanup in the case of an async operations which have failure cases and need some clean which might also need top level async/await
to be implemented in a concise way.
const workerpool = require('workerpool');
function queryRunner(signal, queryString) {
const db = new DbController({}); // some database client;
await db.connect();
const query = db.query(queryString);
signal.addEventListener('abort', async () => {
query.cancel(); // cancel the query operatio
});
// Some call to start the query and block until resolution
// If this promise rejects the above abort listener will habdle the clean up
return query.execute();
}
workerpool.addWorker({
queryRunner: queryRunner
});
There is also some benefit with the current implementation if considering setTimeout
operations which are still pending when a promise rejects.
const workerpool = require('workerpool');
async function setTimeout(signal, ms) {
return await new Promise((resolve) => {
const timer = setTimeout(() => {
resolve();
}, ms);
signal.addEventListener('abort', () => {
clearTimeout(timer);
});
// some implementation which might cause a promise reject.
});
}
workerpool.addWorker({
setTimeout: setTimeout
});
Before this feature is ready to use it indeed has to work with both the
.cancel()
and.timeout()
methods of thePromise
implementation of workerpool.
The majority of the value comes from integration with .cancel()
and .timeout()
methods. However, there is some value with the current implementation in the fact that cleanup can now occur through the eventTarget
behind the Signal
allowing for async cleanup within the worker on error cases without having to explicitly catch / handle all possible error cases.
Thanks for your explanation, I'll have a look at it asap.
This usecase definitely makes sense, thanks for the explanation.
I made a few inline comments, but most importantly I would like to think through the API: how to pass the
signal
to the function?It feels a bit weird to prepend the arguments with an extra argument
signal
. How about attaching the signal to the method, so you can use it inside the function likethis.signal.addEventListener(...)
? Or introduce a globalworker.onAbort
, or so?I would like to see if we can get rid of the need for
useAbortSignal
. Suppose that we go for usingthis.signal
or a globalworker.onAbort
, then there is no need for theuseAbortSignal
. The workerpool can automatically create an AbortController and attach it to every registered method. Only thing is that we need to make sure the added listeners are removed again at the end of the method call. If that's not possible we can probably create a new AbortController right before every call, and ideally create it lazily. We should look into any performance impact then though. We could also think through whether we want to useAbortController
or instead create a custom callback likethis.onAbort
or a globalworker.onAbort
. In that case we can optimize it ourselves, like: create it only once, keep a list with callbacks in an array, and simply clear this list right before every method invocation. What do you think?
I think this make sense, I was originally going to implement this by injecting the signal
on the function which is being invoked in the worker which would allow for usage like the below example
const workerpool = require('workerpool');
async function setTimeout( ms) {
return await new Promise((resolve) => {
const timer = setTimeout(() => {
resolve();
}, ms);
this.signal.addEventListener('abort', () => {
clearTimeout(timer);
});
// some implementation which might cause a promise reject.
});
}
workerpool.addWorker({
setTimeout: setTimeout
});
The reason I ended up adding it as an option was to make it more obvious that the signals are now in scope. However, I now agree that it's a better pattern as modifying the function signature from what the end user has defined is not ideal. And it would be better to simply inject the signal
into the object which is the function. This would then allow the flag to be removed as this feature will no longer break backwards compatibility. I am choosing injecting the signal over a global signal as I feel that unique signals per worker function is better as it allows for more granular error contexts than a single global which could end up getting rather bloated. It will also be harder to keep track of since the current system keeps a global map of function name -> function
but does not seem to know what functions are from what file context. Thus is currently ambiguous.
I will refactor to this pattern over passing as a function parameter.
@josdejong
I've updated the pr to no longer pass the signal
as a parameter to the worker function and instead assign it to the worker function which allows for access within the worker execution scope. I've also added a new event onAbort
which will be invoked if a task has the flag aborted
as true
.
I realized I was not using the on
event properly as the task context will be cleaned up once the task queue has seen that the task has resulted in an error
. The PR description and title have been updated to reflect the change in direction. Let me know what you think, with the changes I've made I am rather close to having cancel
and setTimeout
triggering the task abort.
The downside of the current implementation is that if functions are not defined in a script context which is loaded into the worker on time of creation, we are not currently able to inject a signal
into the worker scope since they are executing with global
as the this
context. It currently seems rather awkward to force the signal into the global of the worker as cleanup would have to occur since signals are meant to only be used for a single abort
trigger.
see commit: 14656f3ad734aba893b0d352972a3438b478afaf
I think we can weigh if having signaling support when workers are executed through the run
context is worth possibly going back to the original implementation which passed as a parameter since the parameter would work with the global execution context.
Thanks a lot, I like where this is heading!
The downside of the current implementation is that if functions are not defined in a script context which is loaded into the worker on time of creation
I'm not entirely sure if I understand what you mean. Do you mean that the signals do not work for "offloaded functions"?
I think so yes, below is the implementation of the worker.methods.run
/**
* Execute a function with provided arguments
* @param {String} fn Stringified function
* @param {Array} [args] Function arguments
* @returns {*}
*/
worker.methods.run = function run(fn, args) {
var f = new Function('return (' + fn + ').apply(null, arguments);');
return f.apply(f, args);
};
The above implementation encapsulates the execution of an offloaded function
. Since the first apply
runs in scope of the Function
constructor is is rather difficult to change the this context
to anything but globalThis
the original implementation using parameters works with the current implementation however.
Thanks for the updates!
Some thoughts: the old behavior was that when a running task is cancelled or times out, the worker running that task is terminated. The new behavior is that before termination, an abort event is delivered to the worker so it can neatly close database connections or open requests for example.
With this in mind:
- I would expect this new event listener for an abort signal to be something that is purely on the worker side and not on the pool side (the main thread). So I would not expect a new option
onAbort
at the Pool side? I think the Pool side should not know at all whether the worker task has abort logic or not? - I would not expect this new behavior to have any impact on
Promise
. I would like to keepPromise.js
unchanged, it feels like we're attaching logic that should not be there.
Some thoughts about the implementation:
-
in the WorkerHandler I would expect extra logic before the
return me.terminateAndNotify(true)
(or as part of it), that sends a new type of message "ABORT" to the worker (you called it "TIMEOUT" right now). And then, inworker.js
, I would expect some code to listen for this "ABORT" and trigger the abort callback (if any). -
To prevent having to do bookkeeping with a
worker.controllers[request.id]
, it may be an idea to utilize the fact that we only execute a single task at a time in a worker, so we do not need to keep track of multiple abort controllers. -
With (2) in mind, attaching the signal to the functions can give issues with not using the right context, breaking usage of
this.signal
. It may work out very well to create a global callback listener onworker
instead? It could even work as primitive as the following:function myWorkerFunction() { return new Promise(function (_resolve, reject) { worker.onAbort = () => { console.log("abort event triggered"); }; reject("rejecting"); }); } workerpool.worker({ myWorkerFunction });
then, in the worker.js code, all we have to do is (a) when a task is cancelled or timedout, check if
worker.onAbort
is defined and if so, invoke it. And after the task is finished, clearworker.onAbort
(if defined). -
To neatly wait until the clients abort code (like shutting down a db connection) is finished, I think we should let it return a promise and await until it resolves. Only after that, we should terminate the worker. Even better: if we know the task has neatly aborted, there is no need to terminate the worker? I guess we should do that only when the abort promise returns some specific flag denoting that there is no need for termination of the worker.
@joshLong145 I see you commented yesterday but removed your comment again. Any thoughts on this?
@joshLong145 I see you commented yesterday but removed your comment again. Any thoughts on this?
I took your advice and removed all my logic for managing controllers and went with a single controller being invoked from a message sent from the handlers processing of the timeout or cancel exception. The problem I am seeing is being able to modify this 'worker' object you use in your example of invoking our own callback instead of using a controller instance managed by the worker. In my testing I have been unable to modify the global state of any variable and have it reflected on the worker in scope of the message handler once we receive the message from the timeout in the worker context. It seems that the only global variables from within the worker are what is standard on globalThis for a worker instance. So I do not fully understand how a global function defined from within a worker's scope would work. From my testing the only pattern that works for all library features is something that is passed as a parameter which is passed to the worker explicitly. I think we can still only keep one instance of the AbortController instead of managing one for each request. But I don't know how to get around the parameter passing at this time due to the above mentioned findings.
I was looking into this in more depth, and noticed that the onTerminate
option is really close to what we're looking for here for this onAbort
. So much in fact I think there may be no need for additional features.
Here a working example demonstrating how to close a db connection on termination (after a cancel or timeout):
// main.js
const workerpool = require("workerpool");
const pool = workerpool.pool(__dirname + "/workers/cancelWorker.js", {
workerType: "thread"
})
const main = async () => {
await pool
.exec("runDbQuery", [2, 3])
// .timeout(2000) // the timeout will trigger a termination, which neatly closes the connection beforehand
.then((result) => console.log('runDbQuery result', result))
.catch((err) => console.error(err))
await pool.terminate()
};
main();
With the following worker:
// cancelWorker.js
const workerpool = require("workerpool");
// note: we could also keep an array with multiple connections,
// but we know that a worker only executes one function at a time,
// so there is no need for that.
let dbConnection = null
async function openConnection() {
// we mimic an open db connection by a simple number
const newConnection = Math.round(Math.random() * 1000)
console.log(`Opening db connection ${newConnection}`)
return newConnection
}
async function closeConnection(dbConnection) {
console.log(`Closing db connection ${dbConnection}...`)
await sleep(500)
console.log(`db connection ${dbConnection} closed`)
clearInterval(dbConnection)
dbConnection = null
}
async function runDbQuery(a, b) {
// register the connection in a global variable
dbConnection = await openConnection()
// pretend to run a query
console.log(`Executing query via db connection ${dbConnection}...`)
await sleep(5000)
const result = a + b
console.log(`Query results are in: ${result}`)
await closeConnection(dbConnection)
return result
}
function sleep(delay) {
return new Promise(resolve => setTimeout(resolve, delay))
}
// create a worker and register public functions
workerpool.worker(
{
runDbQuery
},
{
onTerminate: function (code) {
if (dbConnection) {
return closeConnection(dbConnection)
}
},
}
)
Now, we could think through adding some "eye candy" on top of onTerminate
that makes this scenario easier and to enable using it in an offloaded worker. I think this API could look like:
async function runDbQuery(a, b) {
// register the connection in a global variable
dbConnection = await openConnection()
// register the closeConnection function via worker.onAbort,
// so it will be invoked when the worker terminates after a cancel or timeout
worker.onAbort = () => closeConnection(dbConnection)
// pretend to run a query
console.log(`Executing query via db connection ${dbConnection}...`)
await sleep(5000)
const result = a + b
console.log(`Query results are in: ${result}`)
await closeConnection(dbConnection)
return result
}
To implement this, I think all we have to do is add a section to worker.cleanupAndExit
which executes an async worker.onAbort
if defined, and after that clears the worker.onAbort
again.
I was looking into this in more depth, and noticed that the
onTerminate
option is really close to what we're looking for here for thisonAbort
. So much in fact I think there may be no need for additional features.Here a working example demonstrating how to close a db connection on termination (after a cancel or timeout):
// main.js const workerpool = require("workerpool"); const pool = workerpool.pool(__dirname + "/workers/cancelWorker.js", { workerType: "thread" }) const main = async () => { await pool .exec("runDbQuery", [2, 3]) // .timeout(2000) // the timeout will trigger a termination, which neatly closes the connection beforehand .then((result) => console.log('runDbQuery result', result)) .catch((err) => console.error(err)) await pool.terminate() }; main();
With the following worker:
// cancelWorker.js const workerpool = require("workerpool"); // note: we could also keep an array with multiple connections, // but we know that a worker only executes one function at a time, // so there is no need for that. let dbConnection = null async function openConnection() { // we mimic an open db connection by a simple number const newConnection = Math.round(Math.random() * 1000) console.log(`Opening db connection ${newConnection}`) return newConnection } async function closeConnection(dbConnection) { console.log(`Closing db connection ${dbConnection}...`) await sleep(500) console.log(`db connection ${dbConnection} closed`) clearInterval(dbConnection) dbConnection = null } async function runDbQuery(a, b) { // register the connection in a global variable dbConnection = await openConnection() // pretend to run a query console.log(`Executing query via db connection ${dbConnection}...`) await sleep(5000) const result = a + b console.log(`Query results are in: ${result}`) await closeConnection(dbConnection) return result } function sleep(delay) { return new Promise(resolve => setTimeout(resolve, delay)) } // create a worker and register public functions workerpool.worker( { runDbQuery }, { onTerminate: function (code) { if (dbConnection) { return closeConnection(dbConnection) } }, } )
Now, we could think through adding some "eye candy" on top of
onTerminate
that makes this scenario easier and to enable using it in an offloaded worker. I think this API could look like:async function runDbQuery(a, b) { // register the connection in a global variable dbConnection = await openConnection() // register the closeConnection function via worker.onAbort, // so it will be invoked when the worker terminates after a cancel or timeout worker.onAbort = () => closeConnection(dbConnection) // pretend to run a query console.log(`Executing query via db connection ${dbConnection}...`) await sleep(5000) const result = a + b console.log(`Query results are in: ${result}`) await closeConnection(dbConnection) return result }
To implement this, I think all we have to do is add a section to
worker.cleanupAndExit
which executes an asyncworker.onAbort
if defined, and after that clears theworker.onAbort
again.
Thanks for the reply. I see your perspective with onTerminate
largely implementing what is being attempted with this PR with the use of Abort Signals
. However, with your example you forgo implementations which are async
that support programmatic abort through Abort Signals
for example fetch in this example in the mdn docs a signal is provided which force the promise to reject
if abort
is called on the controller
. which allows the promise to reject If other failures occur. Another example of an api with signal support is Azure Blob Storage SDK. So thought you are correct with there being an easier implementation with only managing a single AbortController
im unsure if I agree that onTerminate()
solves the problem as it cannot cancel inner promises which may need to be canceled as well to fully stop a worker method. So with that said I think we still need an AbortController
instance for the executing worker method
.
Also, with the example you add for how onTerminate
may be defined in scope of a worker through onaAbort
I'm unsure how this implementation would work.
async function runDbQuery(a, b) {
// register the connection in a global variable
dbConnection = await openConnection()
// register the closeConnection function via worker.onAbort,
// so it will be invoked when the worker terminates after a cancel or timeout
worker.onAbort = () => closeConnection(dbConnection)
// pretend to run a query
console.log(`Executing query via db connection ${dbConnection}...`)
await sleep(5000)
const result = a + b
console.log(`Query results are in: ${result}`)
await closeConnection(dbConnection)
return result
}
In the above onAbort
is being defined on worker
in scope of runDbQuery
however. Im unsure how the worker has a global object worker
since when I inspect global
in the worker context I only observe the following
<ref *1> Object [global] {
global: [Circular *1],
queueMicrotask: [Function: queueMicrotask],
clearImmediate: [Function: clearImmediate],
setImmediate: [Function: setImmediate] {
[Symbol(nodejs.util.promisify.custom)]: [Getter]
},
structuredClone: [Getter/Setter],
clearInterval: [Function: clearInterval],
clearTimeout: [Function: clearTimeout],
setInterval: [Function: setInterval],
setTimeout: [Function: setTimeout] {
[Symbol(nodejs.util.promisify.custom)]: [Getter]
},
atob: [Getter/Setter],
btoa: [Getter/Setter],
performance: [Getter/Setter],
fetch: [AsyncFunction: fetch]
}
So I'm unsure how a worker
object in the global may be manipulated as it is not defined at least in ways that I have tried up to this point.
Sorry about the confusion about worker
. What I meant was workerpool
, the imported library. So it would be an overwritable function of the library, workerpool.onAbort = ...
, similar to how we can use the global window.onclick = ...
. Now, it's ugly to override a global method, so maybe we should introduce a method for that, like workerpool.addAbortListener(...)
, similar to window.addEventListener(...)
.
I think an AbortController
can be used on top of onTerminate
too, something like:
// cancelWorker.js
const workerpool = require("workerpool");
// note: we could also keep an array with multiple controllers,
// but we know that a worker only executes one function at a time,
// so there is no need for that.
let myAbortController = null
async function runFetch(url) {
myAbortController = new AbortController()
// You can use myAbortController.signal.addEventListener if there is
// more stuff to be taken care of when `fetch` triggers the abort action
return fetch(url, { signal: myAbortController.signal })
}
// create a worker and register public functions
workerpool.worker(
{
runFetch
},
{
onTerminate: function (code) {
if (myAbortController) {
myAbortController.abort()
}
}
}
)
I may be overlooking something, if that is the case, can you share an example of it to explain?
Sorry about the confusion about
worker
. What I meant wasworkerpool
, the imported library. So it would be an overwritable function of the library,workerpool.onAbort = ...
, similar to how we can use the globalwindow.onclick = ...
. Now, it's ugly to override a global method, so maybe we should introduce a method for that, likeworkerpool.addAbortListener(...)
, similar towindow.addEventListener(...)
.I think an
AbortController
can be used on top ofonTerminate
too, something like:// cancelWorker.js const workerpool = require("workerpool"); // note: we could also keep an array with multiple controllers, // but we know that a worker only executes one function at a time, // so there is no need for that. let myAbortController = null async function runFetch(url) { myAbortController = new AbortController() // You can use myAbortController.signal.addEventListener if there is // more stuff to be taken care of when `fetch` triggers the abort action return fetch(url, { signal: myAbortController.signal }) } // create a worker and register public functions workerpool.worker( { runFetch }, { onTerminate: function (code) { if (myAbortController) { myAbortController.abort() } } } )
I may be overlooking something, if that is the case, can you share an example of it to explain?
Ah thank you for the clarification on worker
was going a bit crazy 😆. This now is making much more sense to me. My thought process around providing an Abort Controller
Where mainly to prevent the instance from leaving scope before the worker runtime could run clean up in terminateAndNotify
but since i now understand the execution model better. Meaning that there is only one executing method at a time per worker I no longer think this is an issue. As long as users call abort
within the new wrapper onAbort
there is no loss of functionality with them managing it vs the worker
managing it.
I Think we have a good path forward with
- Wrapping
onTerminate
withonAbort
which will return aPromise
- Blocking on the returned promise until settled in
terminateAndExit
- Reseting
onAbort
for the next method execution
Sounds good!
I think the onAbort
wrapper will be a useful addition.
Do you have thoughts on how to best expose the onAbort
function? Like workerpool.onAbort = ...
or workerpool.addAbortListener(...)
? I have the feeling that the latter is most neat and most future proof.
Do you have thoughts on how to best expose the
onAbort
function? Likeworkerpool.onAbort = ...
orworkerpool.addAbortListener(...)
? I have the feeling that the latter is most neat and most future proof.
workerpool.addAbortListener(...)
makes the most sense to me. Having a function which takes the listener as an argument is the most flexible.
Do you have thoughts on how to best expose the
onAbort
function? Likeworkerpool.onAbort = ...
orworkerpool.addAbortListener(...)
? I have the feeling that the latter is most neat and most future proof.
workerpool.addAbortListener(...)
makes the most sense to me. Having a function which takes the listener as an argument is the most flexible.
@josdejong
Has been implemented in PR #448