js-challenges icon indicating copy to clipboard operation
js-challenges copied to clipboard

有并发限制的Promise.all(ts类型)

Open Sunny-117 opened this issue 3 years ago • 4 comments

Sunny-117 avatar Nov 03 '22 08:11 Sunny-117

class Run {
  tasks = [];
  flag = 0;
  constructor(tasks, num) {
    this.tasks = tasks;
    this.num = num;
    this.flag = 0;
  }

  run() {
    if (!this.num) return;
    new Array(this.num).fill(0).forEach(this.runOne.bind(this));
  }

  async runOne() {
    if (!this.tasks.length) return;
    if (this.flag >= this.num) return;
    const task = this.tasks.shift();
    this.flag++;
    await task();
    if (this.flag > 0) this.flag--;
    await this.runOne();
  }
}

const sleep = (duration) =>
  new Promise((resolve) =>
    setTimeout(() => {
      console.log("task " + duration + " done");
      resolve();
    }, duration)
  );

const tasks = new Array(10).fill(() => sleep(1000 + Math.random() * 3000));

const run = new Run(tasks, 2);

run.run();

fencer-yd avatar Jun 05 '23 06:06 fencer-yd

测试代码:

function sleep(text, delay = 1000) {
  return () => new Promise(resolve => {
    setTimeout(() => {
      console.log(text)
      resolve();
    }, delay);
  });
}

const tasks = [1, 2, 3, 4, 5].map((i) => {
  return sleep(i)
})

测试代码:asyncPool(tasks, 2)

预期结果:

  • 第一次打印 12
  • 第二次打印 34
  • 第三次打印 5

方案一: 基于 async-pool 方案

维护两个数组:

  1. allTasks
  2. poolTasks

取出 tasks[i] 作为当前的任务:

allTaskstask[i] poolTaskse = task[i].then(()=>{poolTasks.splice(poolTasks.indexOf(e),1)})

通过 Promise.race + await 控制 poolTasks 始终在并发数范围内,最后通过 Promise.all 实现所有任务的并发。

const asyncPool = async (tasks, poolLimit) => {
  /* 所有异步任务执行状态 */
  const allTasks = [];
  /* 正在执行的任务数组 */
  const poolTasks = [];

  for (let i = 0; i < tasks.length; i++) {
    const curTask = Promise.resolve(tasks[i]());
    allTasks.push(curTask);

    /* 当 poolLimit <= tasks.length 时,实现并发控制 */
    if (poolLimit <= tasks.length) {
      /* 在原有异步包裹处理操作 */
      const e = curTask.then(() => {
        /* 成功后,从正在执行的任务数组中删除 */
        poolTasks.splice(poolTasks.indexOf(e), 1)
      })
      poolTasks.push(e);

      /* poolTasks 持续增加会超出限制数量 */
      if (poolTasks.length >= poolLimit) {
        /* 始终控制 poolTasks 的数量 */
        await Promise.race(poolTasks);
      }
    }
  }

  /* 此时 allTasks 中剩余 pending < poolTasks */
  return Promise.all(allTasks);
}

asyncPool(tasks, 2)

参考实现:asyncPool 的使用

方案二: 使用 koa 内的 compose 函数思想,构建 dispatch 函数

对于同步任务的递归为:

const looptask = () => {
  looptask()
}

对于异步任务的递归:

const dispatch = () => {
  asyncFn().then(() => {
    dispatch();
  })
}

基于这个思想,结合 Promise.all 的源码实现。

/* 使用 dispatch 实现 */
const asyncPool = (tasks, poolLimit) => {
  return new Promise((resolve, reject) => {
    const result = [];
    let resolveCount = 0;
    let currentIndex = 0;

    const dispatch = () => {
      const curTask = Promise.resolve(tasks[currentIndex]());
      const index = currentIndex;
      currentIndex++;
      /* 异步任务的递归,通过 .then 实现 */
      curTask.then(res => {
        result[index] = res;
        resolveCount++;
        if (resolveCount === tasks.length) {
          resolve(res);
        }

        /* 递归的触发(currentIndex指针还未触发) */
        if (currentIndex < tasks.length) {
          dispatch();
        }
      });
    }

    for (let i = 0; i < poolLimit && i < tasks.length; i++) {
      dispatch();
    }
  })
}

asyncPool(tasks, 2)

wangjs-jacky avatar Oct 18 '23 15:10 wangjs-jacky

function allwithlimit(arr, limit) {
  let rs = [];
  let len = arr.length;
  let count = 0;
  let pool = arr.slice(0, limit);
  let _pool = arr.slice(limit);
  let _index = 0;
  let check = async function() {
    if(_index === _pool.length) {
      return;
    }
    let task = _pool[_index++];
    let _rs = await task;
    count++
    rs[pool.length -1  + _index] = _rs;
    check();
  }
  return new Promise((resolve, reject) => {
    try {
      pool.forEach(async (task, index) => {
        const _rs = await task;
        count++
        rs[index] = _rs;
        await check();
        if(count === len) {
          resolve(rs);
        }
      });
    } catch(e) {
      reject(e);
    }
  })
}

Windseek avatar Nov 26 '24 08:11 Windseek

function sleep(text, delay = 1000) {
    return () => new Promise(resolve => {
      setTimeout(() => {
        console.log(text)
        resolve();
      }, delay);
    });
  }
  
const tasks = [1, 2, 3, 4, 5].map((i) => {
    return sleep(i);
})

// 第一次打印 12
// 第二次打印 34
// 第三次打印 5

const myPromiseAll = async (tasks, limit) => {
    const pool = [];
    const running = [];
    for(const task of tasks) {
        const cur = Promise.resolve(task());
        const curCallback = cur.then(() => {
            running.splice(running.indexOf(curCallback), 1);
        })
        pool.push(cur);
        running.push(curCallback);

        if(running.length >= limit) {
            await Promise.race(running);
        }
    }
    return Promise.all(pool);
}

myPromiseAll(tasks, 2);

SiriusZHT avatar Jan 11 '25 12:01 SiriusZHT