piscina icon indicating copy to clipboard operation
piscina copied to clipboard

Determining if worker is currently busy?

Open kedarv opened this issue 3 years ago • 6 comments

Is it possible to determine if a worker is currently processing a task or not? I found that this information might be present in workerInfos but it doesn't seem the API exposes this currently.

kedarv avatar Nov 12 '21 21:11 kedarv

It's not currently exposed, no. Generally it's not something we never anticipated people needing. Do you have a use case you can describe?

jasnell avatar Nov 13 '21 00:11 jasnell

I want to expose metrics with https://github.com/siimon/prom-client to determine if my service is appropriately scaled. I tried using the utilization attribute which is more of an approximation (though, it does sort of work). Ideally, I would be able to expose the status of each worker and do the utilization computation myself elsewhere in the stack.

The analog I'm seeking to build is https://uwsgi-docs.readthedocs.io/en/latest/StatsServer.html (which has a worker status attribute)

kedarv avatar Nov 13 '21 00:11 kedarv

@jasnell Same here. My usecase is to be able to determine, how many inflight tasks are running in each worker (aka in each thread) right now. I.e. how saturated is each worker at the moment. This information is available in piscina.#pool.workers. readyItems[*].currentUsage().

I would've been able to grab this statistics easily by querying Piscina internals - if only Piscina didn't declare Piscina#pool as a #-private property. Having it declared with # disallows any attempts to access piscina.pool property from the outside, so there is literally no way out.

dko-slapdash avatar Dec 18 '22 08:12 dko-slapdash

So far, this is how I exposed it (patch-package tool). Then, the internals became available as piscina.threads[*].workerInfo

CleanShot 2022-12-18 at 02 05 28@2x
/**
 * Returns some real-time statistics about Piscina.
 */
export default function piscinaStats(piscina: Piscina): PiscinaStats {
  const threads: Threads = piscina.threads;
  return {
    config: {
      maxQueue: piscina.options.maxQueue,
      maxThreads: piscina.options.maxThreads,
      concurrentTasksPerWorker: piscina.options.concurrentTasksPerWorker,
    },
    currentQueueSize: piscina.queueSize,
    currentInflightTaskCountsPerWorker: threads.map(
      (thread) => thread.workerInfo?.currentUsage?.() ?? null
    ),
    currentInflightTaskDurationsPerWorker: threads.map((thread) =>
      [...(thread.workerInfo?.taskInfos?.values?.() ?? [])]
        .map((taskInfo) =>
          typeof taskInfo.created === "number"
            ? (taskInfo.started ? "" : "~") +
              Math.round(performance.now() - taskInfo.created)
            : null
        )
        .join(" ")
    ),
  };
}

/**
 * We monkey-patch Piscina to expose workerInfo object on each worker pool. So
 * accessing its undocumented properties expecting that they may be missed at
 * any time.
 *
 * See also https://github.com/piscinajs/piscina/issues/157
 */
type Threads = Array<
  Worker & {
    workerInfo?: {
      currentUsage?: () => number;
      taskInfos?: {
        values?: () => Iterable<{
          started?: number;
          created?: number;
        }>;
      };
    };
  }
>;

dko-slapdash avatar Dec 18 '22 10:12 dko-slapdash

Hi @kedarv @dko-slapdash, I'm curious here. If looking for the current inflight tasks per worker, workerInfo.currentUsage should fulfil that given need.

Maybe adding Diagnostics_Channel for specific operations like task-completed, etc. can expand a little bit more than real.

More or less, how do you imagine the given API for this?

metcoder95 avatar Sep 15 '23 08:09 metcoder95