js-challenges
js-challenges copied to clipboard
有并发限制的Promise.all(ts类型)
class Run {
tasks = [];
flag = 0;
constructor(tasks, num) {
this.tasks = tasks;
this.num = num;
this.flag = 0;
}
run() {
if (!this.num) return;
new Array(this.num).fill(0).forEach(this.runOne.bind(this));
}
async runOne() {
if (!this.tasks.length) return;
if (this.flag >= this.num) return;
const task = this.tasks.shift();
this.flag++;
await task();
if (this.flag > 0) this.flag--;
await this.runOne();
}
}
const sleep = (duration) =>
new Promise((resolve) =>
setTimeout(() => {
console.log("task " + duration + " done");
resolve();
}, duration)
);
const tasks = new Array(10).fill(() => sleep(1000 + Math.random() * 3000));
const run = new Run(tasks, 2);
run.run();
测试代码:
function sleep(text, delay = 1000) {
return () => new Promise(resolve => {
setTimeout(() => {
console.log(text)
resolve();
}, delay);
});
}
const tasks = [1, 2, 3, 4, 5].map((i) => {
return sleep(i)
})
测试代码:asyncPool(tasks, 2)
预期结果:
- 第一次打印
12 - 第二次打印
34 - 第三次打印
5
方案一: 基于 async-pool 方案
维护两个数组:
allTaskspoolTasks
取出 tasks[i] 作为当前的任务:
allTasks ← task[i]
poolTasks ← e = task[i].then(()=>{poolTasks.splice(poolTasks.indexOf(e),1)})
通过 Promise.race + await 控制 poolTasks 始终在并发数范围内,最后通过 Promise.all 实现所有任务的并发。
const asyncPool = async (tasks, poolLimit) => {
/* 所有异步任务执行状态 */
const allTasks = [];
/* 正在执行的任务数组 */
const poolTasks = [];
for (let i = 0; i < tasks.length; i++) {
const curTask = Promise.resolve(tasks[i]());
allTasks.push(curTask);
/* 当 poolLimit <= tasks.length 时,实现并发控制 */
if (poolLimit <= tasks.length) {
/* 在原有异步包裹处理操作 */
const e = curTask.then(() => {
/* 成功后,从正在执行的任务数组中删除 */
poolTasks.splice(poolTasks.indexOf(e), 1)
})
poolTasks.push(e);
/* poolTasks 持续增加会超出限制数量 */
if (poolTasks.length >= poolLimit) {
/* 始终控制 poolTasks 的数量 */
await Promise.race(poolTasks);
}
}
}
/* 此时 allTasks 中剩余 pending < poolTasks */
return Promise.all(allTasks);
}
asyncPool(tasks, 2)
参考实现:asyncPool 的使用
方案二: 使用 koa 内的 compose 函数思想,构建 dispatch 函数
对于同步任务的递归为:
const looptask = () => {
looptask()
}
对于异步任务的递归:
const dispatch = () => {
asyncFn().then(() => {
dispatch();
})
}
基于这个思想,结合 Promise.all 的源码实现。
/* 使用 dispatch 实现 */
const asyncPool = (tasks, poolLimit) => {
return new Promise((resolve, reject) => {
const result = [];
let resolveCount = 0;
let currentIndex = 0;
const dispatch = () => {
const curTask = Promise.resolve(tasks[currentIndex]());
const index = currentIndex;
currentIndex++;
/* 异步任务的递归,通过 .then 实现 */
curTask.then(res => {
result[index] = res;
resolveCount++;
if (resolveCount === tasks.length) {
resolve(res);
}
/* 递归的触发(currentIndex指针还未触发) */
if (currentIndex < tasks.length) {
dispatch();
}
});
}
for (let i = 0; i < poolLimit && i < tasks.length; i++) {
dispatch();
}
})
}
asyncPool(tasks, 2)
function allwithlimit(arr, limit) {
let rs = [];
let len = arr.length;
let count = 0;
let pool = arr.slice(0, limit);
let _pool = arr.slice(limit);
let _index = 0;
let check = async function() {
if(_index === _pool.length) {
return;
}
let task = _pool[_index++];
let _rs = await task;
count++
rs[pool.length -1 + _index] = _rs;
check();
}
return new Promise((resolve, reject) => {
try {
pool.forEach(async (task, index) => {
const _rs = await task;
count++
rs[index] = _rs;
await check();
if(count === len) {
resolve(rs);
}
});
} catch(e) {
reject(e);
}
})
}
function sleep(text, delay = 1000) {
return () => new Promise(resolve => {
setTimeout(() => {
console.log(text)
resolve();
}, delay);
});
}
const tasks = [1, 2, 3, 4, 5].map((i) => {
return sleep(i);
})
// 第一次打印 12
// 第二次打印 34
// 第三次打印 5
const myPromiseAll = async (tasks, limit) => {
const pool = [];
const running = [];
for(const task of tasks) {
const cur = Promise.resolve(task());
const curCallback = cur.then(() => {
running.splice(running.indexOf(curCallback), 1);
})
pool.push(cur);
running.push(curCallback);
if(running.length >= limit) {
await Promise.race(running);
}
}
return Promise.all(pool);
}
myPromiseAll(tasks, 2);