p-map
p-map copied to clipboard
FR: introduce `p` function for concurrent promise execution with condition
I'm not sure if this is the right place to submit an idea, but it would be interesting to have a function p
that allows executing a specified number of promises concurrently while a condition is true
.
In my specific case, I have a list with 200 items and need to check if at least 5 of them have a certain "quality". Since it is a time-consuming process and the outcome is quite random, I activate 10 concurrent executions and keep performing an items.shift()
operation to test each item.
Once I reach 5 successful elements, I pause the processing. However, when the number of elements drops below 5 again, I need to resume searching for new elements. In my particular case, it's not a problem if I find more than 5 elements, as some of the concurrent executions may still be pending even after pausing.
Here's a rough outline of the idea:
const p = pFunctionHere({
concurrency: 10,
afterPausedInterval: 100, // When the condition is reached the first time (returning `false`), then every 100ms it will be executed again to see if it is necessary to restart the concurrencies.
});
const tester = async () => {
// Get an item from the list.
const item = items.shift();
if (await isValid(item)) { // Time-consuming process.
const testItem = addItem(item); // addedItems.push(...)
testItem.test(); // Test the item every minute.
}
}
const condition = () => {
return items.length > 0 && addedItems.length < 5;
}
p.run(tester, condition);
This feature would allow executing tester function concurrently up to the specified concurrency limit (10 in this case) while the condition condition
is true
. Once the condition is no longer met, the execution would pause until the condition becomes true
again.
It would be great to have this functionality as it provides a convenient way to handle concurrent promise execution with a condition.
I did a draft:
interface PConditionalOptions {
concurrency?: number; // Maximum number of concurrent executions (default: 1).
restartInterval?: number; // Interval at which the condition is checked when maximum concurrency is reached (default: 100ms).
}
type Runner = () => Promise<void>; // Function type for the task runner.
type Condition = () => boolean; // Function type for the condition checker.
export class PConditional {
#concurrency: number; // Maximum number of concurrent executions.
#currentConcurrences = 0; // Current number of concurrent executions.
#restartInterval: number; // Interval at which the condition is checked when maximum concurrency is reached.
#runner?: Runner; // Function representing the task to be executed.
#condition!: Condition; // Function representing the condition to be checked.
#restartTimer?: ReturnType<typeof setInterval>; // Timer used for periodic condition checking when maximum concurrency is reached.
public constructor({ concurrency, restartInterval }: PConditionalOptions) {
this.#concurrency = concurrency ?? 1;
this.#restartInterval = restartInterval ?? 100;
}
// A getter that returns whether the execution is currently paused.
public get isPaused() {
return this.#restartTimer !== undefined;
}
// A method to start running the promises with the specified runner and condition.
public run(runner: Runner, condition: Condition) {
if (this.#runner !== undefined) {
throw new Error("run() has already been executed");
}
this.#runner = runner;
this.#condition = condition;
this.#runConcurrences();
}
// Runs the tasks concurrently until the maximum concurrency is reached or the condition is false.
#runConcurrences() {
for (
;
this.#currentConcurrences < this.#concurrency;
this.#currentConcurrences++
) {
if (this.#condition()) {
void this.#runner!().finally(() => {
this.#currentConcurrences--;
this.#runConcurrences();
});
continue;
}
this.#startConditionalTimer();
return;
}
}
// Starts the timer to periodically check the condition when maximum concurrency is reached.
#startConditionalTimer() {
if (this.#restartTimer === undefined) {
this.#restartTimer = setInterval(() => {
if (this.#condition()) {
clearInterval(this.#restartTimer);
this.#restartTimer = undefined;
this.#runConcurrences();
}
}, this.#restartInterval);
}
}
}
https://github.com/sindresorhus/p-whilst? Alternatively, do any of these fit your use case?
@tommy-mitchell from my analysis, p-whilst
is just a while()
loop that processes one promise at a time, and I need concurrency. Of the other p-functions, none seemed to solve my problem.
In summary, the rules are:
- Run indefinitely (like
p-while
could do); - Run in concurrency (but
p-while
cannot); - Run conditionally (like
p-while
could do, but a condition that returnsfalse
must not terminate, but enter a stand-by state); - The value returned by the promise is irrelevant;
I wonder if this could be solved by p-whilst
by adding a concurrency
option to it.
Something like this:
const condition = () => {
if (addedItems.length >= 5) {
await delay(100);
}
return items.length > 0 && addedItems.length < 5;
}