deno_std icon indicating copy to clipboard operation
deno_std copied to clipboard

Feature request (async): limit() to limit a function concurrency

Open guy-borderless opened this issue 1 year ago • 12 comments

Is your feature request related to a problem? Please describe. I find p-limit helpful (APIs often limit user concurrency). The async module already has retry, debounce, and pooledMap which are similar. An API like p-limit in deno-std will be great.

import pLimit from 'p-limit';

const limit = pLimit(10);

// only 10 concurrent requests
export const geocodeAddress = limit(async (address: string) => {...})

guy-borderless avatar May 17 '24 18:05 guy-borderless

Doesn't pooledMap() achieve essentially the same thing with its poolLimit parameter?

iuioiua avatar May 19 '24 09:05 iuioiua

Doesn't pooledMap() achieve essentially the same thing with its poolLimit parameter?

Not easily I think, this scenario is for when we don't know the iterable up front. So we still have to create and manage an iterable here (and probably prefer to use some lib for it). How would you write the above code with pooledMap?

guy-borderless avatar May 19 '24 09:05 guy-borderless

With the above code, how do you imagine the iterable would be provided to it?

BlackAsLight avatar May 21 '24 03:05 BlackAsLight

I'm not sure pLimit is common abstraction of that situation/requirement.

Could Semaphore class in this package ( https://jsr.io/@lambdalisue/async ) achieve that same thing?

This issue ( https://github.com/denoland/deno_std/issues/4536 ) might be related to this topic.

kt3k avatar May 23 '24 11:05 kt3k

Would that be:

import { Semaphore } from "@std/async";

const limiter = new Semaphore(10);

// only 10 concurrent requests
export const geocodeAddress = limiter.lock(async (address: string) => {...})

If it's about that, np. Btw, how would you rate limit invocations? (max calls per duration)

guy-borderless avatar May 23 '24 11:05 guy-borderless

I imagine you could do something like this to accomplish the same thing you're after.

import { pooledMap } from '@std/async'

function pLimit(poolLimit: number) {
  return function <T, R>(iteratorFn: (data: T) => Promise<R>) {
    return function <T>(array: Iterable<T> | AsyncIterable<T>) {
      return pooledMap(poolLimit, array, iteratorFn)
    }
  }
}

const limit = pLimit(10)

const geocodeAddress = limit(async (address: string) => {...})

geocodeAddress(['an array of addresses'])

BlackAsLight avatar May 23 '24 12:05 BlackAsLight

I imagine you could do something like this to accomplish the same thing you're after.

import { pooledMap } from '@std/async'

function pLimit(poolLimit: number) {
  return function <T, R>(iteratorFn: (data: T) => Promise<R>) {
    return function <T>(array: Iterable<T> | AsyncIterable<T>) {
      return pooledMap(poolLimit, array, iteratorFn)
    }
  }
}

const limit = pLimit(10)

const geocodeAddress = limit(async (address: string) => {...})

geocodeAddress(['an array of addresses'])
``

your code is not equivalent. it requires all addresses to be batched in the same lexical place. In practice, to be equivalent you need to introduce state.

guy-borderless avatar May 23 '24 12:05 guy-borderless

your code is not equivalent. it requires all addresses to be batched in the same lexical place. In practice, to be equivalent you need to introduce state.

const { readable, writable } = new TranformStream<string, string>()
geocodeAddress(readable)

const writer = writable.getWriter()

// write an address in different places
writer.write('address')

// call at the very end when you know there is no more addresses
writer.close()

BlackAsLight avatar May 23 '24 20:05 BlackAsLight

import { pooledMap } from '@std/async'

const geocodeAddress = function() {
  const { readable, writable } = new TransformStream<string, string>()
  pooledMap(10, readable, async (address: string) => {
    ...
  })
  return writable.getWriter()
}()

geocodeAddress.write('address')

geocodeAddress.close()

BlackAsLight avatar May 23 '24 22:05 BlackAsLight

by the "leakiness" and general friction seen here, it does seem a primitive is missing. I think it would be useful to have something like a semaphore in the standards library

guy-borderless avatar May 26 '24 20:05 guy-borderless

+1 for a semaphore

I just got a case where I have nested arrays of promises, all ending up in a remote API call, overloading the remote (or risk getting blocked/banned for too many open connections), so I need a way to limit the number of "active" API calls just where they are happening.

ghost avatar Jun 13 '24 13:06 ghost

+1 for a semaphore

I just got a case where I have nested arrays of promises, all ending up in a remote API call, overloading the remote (or risk getting blocked/banned for too many open connections), so I need a way to limit the number of "active" API calls just where they are happening.

Looking at the most common libraries (details listed at the bottom) the most common abstraction to limit invocations seems to be a specialized queue.

  1. async library - provides a queue
  2. p-limit - wraps p-queue
  3. fastq - it's a queue

I think it would be useful to provide in the standard library a specialized queue class for limiting async invocations since it is a common scenario.

type AsyncQueueArgs = {
  // max number of concurrent promises
  concurrency: number
  // min number of milliseconds since the last call
  debounceMs: number
  // max number of invocations per throttle window
  throttle: { invocations: number; ms: number }
}

class AsyncQueue {
  constructor(args: AsyncQueueArgs) {
  }

  // returns the function so that each function invocation passes through the queue
  enqueueFn<TFn extends () => Promise<any>>(fn: TFn): TFn

  // enqueues a promise
  enqueue<TPromise extends Promise<any>>(promise: TPromise): TPromise

  [Symbol.iterator]() {
    // resolved promises...
  }
}

to recreate p-limit is now:

function plimit(concurrency: number, fn: () => Promise<any>) {
  const queue = new AsyncQueue({ concurrency })
  return queue.enqueueFn(fn)
}

since the queue can be an iterator this also provides the p-map utility (41,390,697 weekly)

The most popular npm libraries:

async

  • Weekly Downloads: 52,103,656 .
  • Description: Async is a utility module that provides straightforward, powerful functions for working with asynchronous JavaScript, suitable for both Node.js and browser environments.
  • GitHub: async GitHub Repository
  • npm: async npm Package

p-limit

  • Weekly Downloads: 105,571,463
  • Description: p-limit allows you to run multiple promise-returning and async functions with limited concurrency, helping to manage the number of simultaneous operations.
  • GitHub: p-limit GitHub Repository
  • npm: p-limit npm Package

fastq

  • Weekly Downloads: 32,427,175
  • Description: fastq is a fast, in-memory work queue that provides a simple API for processing tasks concurrently with a specified concurrency limit.
  • GitHub: fastq GitHub Repository
  • npm: fastq npm Package

I would gladly implement of course

guy-borderless avatar Sep 15 '24 10:09 guy-borderless