piscina
piscina copied to clipboard
Determining if worker is currently busy?
Is it possible to determine if a worker is currently processing a task or not? I found that this information might be present in workerInfos
but it doesn't seem the API exposes this currently.
It's not currently exposed, no. Generally it's not something we never anticipated people needing. Do you have a use case you can describe?
I want to expose metrics with https://github.com/siimon/prom-client to determine if my service is appropriately scaled. I tried using the utilization
attribute which is more of an approximation (though, it does sort of work). Ideally, I would be able to expose the status of each worker and do the utilization computation myself elsewhere in the stack.
The analog I'm seeking to build is https://uwsgi-docs.readthedocs.io/en/latest/StatsServer.html (which has a worker status attribute)
@jasnell Same here. My usecase is to be able to determine, how many inflight tasks are running in each worker (aka in each thread) right now. I.e. how saturated is each worker at the moment. This information is available in piscina.#pool.workers. readyItems[*].currentUsage()
.
I would've been able to grab this statistics easily by querying Piscina internals - if only Piscina didn't declare Piscina#pool as a #-private property. Having it declared with # disallows any attempts to access piscina.pool property from the outside, so there is literally no way out.
So far, this is how I exposed it (patch-package tool). Then, the internals became available as piscina.threads[*].workerInfo

/**
* Returns some real-time statistics about Piscina.
*/
export default function piscinaStats(piscina: Piscina): PiscinaStats {
const threads: Threads = piscina.threads;
return {
config: {
maxQueue: piscina.options.maxQueue,
maxThreads: piscina.options.maxThreads,
concurrentTasksPerWorker: piscina.options.concurrentTasksPerWorker,
},
currentQueueSize: piscina.queueSize,
currentInflightTaskCountsPerWorker: threads.map(
(thread) => thread.workerInfo?.currentUsage?.() ?? null
),
currentInflightTaskDurationsPerWorker: threads.map((thread) =>
[...(thread.workerInfo?.taskInfos?.values?.() ?? [])]
.map((taskInfo) =>
typeof taskInfo.created === "number"
? (taskInfo.started ? "" : "~") +
Math.round(performance.now() - taskInfo.created)
: null
)
.join(" ")
),
};
}
/**
* We monkey-patch Piscina to expose workerInfo object on each worker pool. So
* accessing its undocumented properties expecting that they may be missed at
* any time.
*
* See also https://github.com/piscinajs/piscina/issues/157
*/
type Threads = Array<
Worker & {
workerInfo?: {
currentUsage?: () => number;
taskInfos?: {
values?: () => Iterable<{
started?: number;
created?: number;
}>;
};
};
}
>;
Hi @kedarv @dko-slapdash, I'm curious here.
If looking for the current inflight tasks per worker, workerInfo.currentUsage
should fulfil that given need.
Maybe adding Diagnostics_Channel
for specific operations like task-completed
, etc. can expand a little bit more than real.
More or less, how do you imagine the given API for this?