js-challenges icon indicating copy to clipboard operation
js-challenges copied to clipboard

实现有并行限制的 Promise 调度器

Open Sunny-117 opened this issue 2 years ago • 12 comments

Sunny-117 avatar Nov 03 '22 08:11 Sunny-117

Scheduler调度器:

class Scheduler {
  constructor(max) {
    // 最大可并发任务数
    this.max = max;
    // 当前并发任务数
    this.count = 0;
    // 阻塞的任务队列
    this.queue = [];
  }

  async add(fn) {
    if (this.count >= this.max) {
      // 若当前正在执行的任务,达到最大容量max
      // 阻塞在此处,等待前面的任务执行完毕后将resolve弹出并执行
      await new Promise(resolve => this.queue.push(resolve));
    }
    // 当前并发任务数++
    this.count++;
    // 使用await执行此函数
    const res = await fn();
    // 执行完毕,当前并发任务数--
    this.count--;
    // 若队列中有值,将其resolve弹出,并执行
    // 以便阻塞的任务,可以正常执行
    this.queue.length && this.queue.shift()();
    // 返回函数执行的结果
    return res;
  }
}

使用:

// 延迟函数
const sleep = time => new Promise(resolve => setTimeout(resolve, time));

// 同时进行的任务最多2个
const scheduler = new Scheduler(2);

// 添加异步任务
// time: 任务执行的时间
// val: 参数
const addTask = (time, val) => {
  scheduler.add(() => {
    return sleep(time).then(() => console.log(val));
  });
};

addTask(1000, '1');
addTask(500, '2');
addTask(300, '3');
addTask(400, '4');
// 2
// 3
// 1
// 4

lzxjack avatar Jan 11 '23 07:01 lzxjack

class Schedular {
    constructor(limit) {
        this.limit = limit;
        this.queue = [];
        this.runCounts = 0;
    }
    add(time, order) {
        const mypromise = () => {
            return new Promise((resolve, reject)=>{
                setTimeout(()=>{
                    console.log(order); //执行order
                    resolve();
                }, time);
            })
        }
        this.queue.push(mypromise);
    }
    taskStart() {
        for(let i = 0; i < this.limit; i++){
            this.request();
        }
    }
    request() {
        if(!this.queue || !this.queue.length || this.runCounts >= this.limit) return;
        this.runCounts++;
        this.queue.shift()().then((res)=>{
            this.runCounts--;
            this.request();
        })
    }
}
const scheduler = new Schedular(2)
const addTask = (time, order) => {
  scheduler.add(time, order)
}
addTask(1000, '1')
addTask(500, '2')
addTask(300, '3')
addTask(400, '4')
scheduler.taskStart()

bearki99 avatar Feb 13 '23 02:02 bearki99

题一

题意

并发控制Promise,要求:实现Scheduler

// 延迟函数
const sleep = time => new Promise(resolve => setTimeout(resolve, time));

// 同时进行的任务最多2个
const scheduler = new Scheduler(2);

// 添加异步任务
// time: 任务执行的时间
// val: 参数
const addTask = (time, val) => {
  scheduler.add(() => {
    return sleep(time).then(() => console.log(val));
  });
};

addTask(1000, '1');
addTask(500, '2');
addTask(300, '3');
addTask(400, '4');
// 2
// 3
// 1
// 4

实现

class Scheduler {
  constructor(max) {
    // 最大可并发任务数
    this.max = max;
    // 当前并发任务数
    this.count = 0;
    // 任务队列
    this.queue = [];
  }

  add(fn) {
    this.queue.push(fn)
    this.run()
  }
  run() {
    if (this.count >= this.max || this.queue.length === 0) return
    this.count++
    Promise.resolve(this.queue.shift()()).finally(() => {
      this.count--
      this.run()
    })
  }
}
// ------------test-------------------
// 延迟函数
const sleep = time => new Promise(resolve => setTimeout(resolve, time));

// 同时进行的任务最多2个
const scheduler = new Scheduler(2);

// 添加异步任务
// time: 任务执行的时间
// val: 参数
const addTask = (time, val) => {
  scheduler.add(() => {
    return sleep(time).then(() => console.log(val));
  });
};

addTask(1000, '1');
addTask(500, '2');
addTask(300, '3');
addTask(400, '4');
// 2
// 3
// 1
// 4

题二

题意

并发控制Promise,要求:实现Scheduler

注意 Scheduler.add()返回一个Promise(与题一的区别)

// 延迟函数
const sleep = time => new Promise(resolve => setTimeout(resolve, time));

// 同时进行的任务最多2个
const scheduler = new Scheduler(2);

// 添加异步任务
// time: 任务执行的时间
// val: 参数
const addTask = (time, val) => {
  scheduler.add(() => {
    return sleep(time)
  }).then(() => console.log(val));
};

addTask(1000, '1');
addTask(500, '2');
addTask(300, '3');
addTask(400, '4');
// 2
// 3
// 1
// 4

题解

class Scheduler {
  constructor(max) {
    // 最大可并发任务数
    this.max = max;
    // 当前并发任务数
    this.count = 0;
    // 任务队列
    this.queue = [];
  }

  add(fn) {
    return new Promise((resolve, reject) => {
      this.queue.push({ fn, resolve, reject })
      this.run()
    })
  }
  run() {
    if (this.count >= this.max || this.queue.length === 0) return
    this.count++
    const { fn, resolve, reject } = this.queue.shift()
    return fn().then(() => {
      resolve()
      this.count--
      this.run()
    })
  }
}
// ---------------------test-------------------
// 延迟函数
const sleep = time => new Promise(resolve => setTimeout(resolve, time));

// 同时进行的任务最多2个
const scheduler = new Scheduler(2);

// 添加异步任务
// time: 任务执行的时间
// val: 参数
const addTask = (time, val) => {
  scheduler.add(() => {
    return sleep(time)
  }).then(() => console.log(val));
};

addTask(1000, '1');
addTask(500, '2');
addTask(300, '3');
addTask(400, '4');
// 2
// 3
// 1
// 4

题三

题意

并发控制Promise,要求:实现Scheduler

与题二题一的区别:scheduler.add()传入的不是一个返回Promise的函数

// 延迟函数
const sleep = time => new Promise(resolve => setTimeout(resolve, time));

// 同时进行的任务最多2个
const scheduler = new Scheduler(2);

// 添加异步任务
// time: 任务执行的时间
// val: 参数
const addTask = (time, val) => {
  scheduler.add(() => {sleep(time)}).then(() => console.log(val));
};

addTask(1000, '1');
addTask(500, '2');
addTask(300, '3');
addTask(400, '4');
// 500ms 打印2
// 800ms 打印3
// 1000ms 打印1
// 1200ms 打印4

题解

题没写出来,不知道是不是因为题有问题

2239351258 avatar Mar 29 '23 12:03 2239351258

class scheduler { constructor(max) { this.max = max; // 当前并发数 this.count = 0; // 阻塞队列 this.queue = []; } async add(fn) { // 判断并发数是否大于最大数 if (this.count >= this.max) { // 是就暂停,通过resolve开启下次调用 await new Promise((resolve) => this.queue.push(resolve)); // console.log(this.queue.length); } // 并发数+1 this.count++; fn().then(() => { //执行完毕 // 并发数-1 this.count--; // 看看是否有阻塞队列是否存在,是就停止阻塞 if (this.queue.length) this.queue.shift()(); }); } }

cscty avatar Jun 19 '23 15:06 cscty

let tasks = []; for (let i = 0; i < 8; i++) { tasks.push(() => { return new Promise((resolve, reject) => { setTimeout(() => { resolve(i); console.log(i); }, 1000); }); }); } function tdq(tasks, max) { // 当前并发数 let count = 0; // 阻塞队列 let queue = []; async function add(task) { if (count >= max) { // 阻塞 await new Promise((resolve) => { queue.push(resolve); }); } count++; task().then((data) => { count--; if (queue.length) { queue.shift()(); } }); } for (let i = 0; i < tasks.length; i++) { add(tasks[i]); } } tdq(tasks, 3);

cscty avatar Jul 03 '23 13:07 cscty

let tasks = []; for (let i = 0; i < 8; i++) { tasks.push(() => { return new Promise((resolve, reject) => { setTimeout(() => { resolve(i); console.log(i); }, 1000); }); }); } function tdq(tasks, max) { // 当前并发数 let count = 0; // 阻塞队列 let queue = []; async function add(task) { if (count >= max) { // 阻塞 await new Promise((resolve) => { queue.push(resolve); }); } count++; task().then((data) => { count--; if (queue.length) { queue.shift()(); } }); } for (let i = 0; i < tasks.length; i++) { add(tasks[i]); } } tdq(tasks, 3);

cscty avatar Jul 03 '23 13:07 cscty

function AsyncPool(limit) {
  this.limit = limit
  this.executingQueue = []
  this.queue = []
  this.finished = []
}

AsyncPool.prototype.add = function (fn) {
  this.queue.push(fn)
}

AsyncPool.prototype.run = async function () {
  let {queue, executingQueue, limit, finished} = this
  for (let i = 0; i < queue.length; i++) {
    let task = queue[i]
    let p = task()
    finished.push(p)
    if (queue.length >= limit) {
      p = p.then(() => {
        executingQueue.splice(executingQueue.indexOf(p), 1)
      })
      executingQueue.push(p)
      if (executingQueue.length >= limit) {
        await Promise.race(executingQueue)
      }
    }
  }
  return Promise.all(finished)
}

let pool = new AsyncPool(2)
for (let i = 1; i <= 5; i++) {
  pool.add(
    () => new Promise(resolve => setTimeout(() => {
      console.log(i * 1000)
      resolve(i * 1000)
    },i * 1000))
  )
}

pool.run().then(res => {
  console.log(res)
})

ZhangYedi-cmd avatar Jul 23 '23 14:07 ZhangYedi-cmd

/**
 * @desc 实现有并行限制的 Promise 调度
 * JS 实现一个带并发限制的异步调度器 Scheduler,保证同时运行的任务最多有两个。
例如目前有 4 个任务,完成时间分别为,1000ms、500ms、300ms、400ms
那么在该调度器中的执行完成顺序应该为 2、3、1、4
分析:因为1、2先进入队列中,2完成则输出2,3进入,3完成输出3,此时为800ms,4进入后的200ms,1完成输出1,而后4完成输出 4
 */
class Scheduler {
  constructor(limit) {
    this.limit = limit;
    this.queue = [];
    this.running = 0;
  }
  createTask(duration, fn) {
    return () => new Promise((resolve) => {
      setTimeout(() => {
        resolve(fn());
      }, duration);
    });
  }
  addTask(callback, duration) {
    const task = this.createTask(duration, callback);
    this.queue.push(task);
  }
  start() {
    for (let i = 0; i < this.limit; i++) {
      this.scheduler();
    }
  }
  scheduler() {
    if (this.queue.length === 0 || this.running > this.limit) {
      return;
    }
    this.running++;
    const task = this.queue.shift();
    task().then((() => {
      this.running--;
      this.scheduler();
    }));
  }
}

// 实例化一个调度器
const scheduler = new Scheduler(2);

// 添加任务
scheduler.addTask(() => {
  console.log("任务1");
}, 1000);
scheduler.addTask(() => {
  console.log("任务2");
}, 500);
scheduler.addTask(() => {
  console.log("任务3");
}, 300);
scheduler.addTask(() => {
  console.log("任务4");
}, 400);
// 任务执行
scheduler.start();

bobocomeon avatar Oct 15 '23 09:10 bobocomeon

class Scheduler {
  constructor (max) {
    // 最大并发量
    this.max = max
    // 当前并发数
    this.count = 0
    // 执行队列
    this.queue = []
    // 是否可以运行
    this.isRunning = true
  }
  
  // 添加任务
  addTask (time, callback) {
    this.queue.push({ time, callback })
    if (this.isRunning) {
      this.runTask()
    }
  }
  
  // 执行任务
  runTask () {
    if (this.queue.length === 0) {
      this.isRunning = false;
      return;
    }
    // 达到最大并发
    // this.queue.length === this.max(因为queue中的任务会出去执行,因此判断会有问题)
    if (this.count >= this.max) {
      this.isRunning = false
      return
    }
    // 未到达最大并发
    const { time, callback } = this.queue.shift()
    // 释放出空间,
    this.count++
    setTimeout(() => {
      callback()
      this.count--
      // 继续执行下一个任务
      this.runTask()
    }, time)
  }
}



// 示例用法
const scheduler = new Scheduler(2);

scheduler.addTask(4000, () => {
  console.log('Task 1 executed after 1000ms');
});

scheduler.addTask(4000, () => {
  console.log('Task 2 executed after 2000ms');
});

scheduler.addTask(4000, () => {
  console.log('Task 3 executed after 1000ms');
});

topulikeweb avatar Dec 20 '23 05:12 topulikeweb

Scheduler调度器:

class Scheduler {
  constructor(max) {
    // 最大可并发任务数
    this.max = max;
    // 当前并发任务数
    this.count = 0;
    // 阻塞的任务队列
    this.queue = [];
  }

  async add(fn) {
    if (this.count >= this.max) {
      // 若当前正在执行的任务,达到最大容量max
      // 阻塞在此处,等待前面的任务执行完毕后将resolve弹出并执行
      await new Promise(resolve => this.queue.push(resolve));
    }
    // 当前并发任务数++
    this.count++;
    // 使用await执行此函数
    const res = await fn();
    // 执行完毕,当前并发任务数--
    this.count--;
    // 若队列中有值,将其resolve弹出,并执行
    // 以便阻塞的任务,可以正常执行
    this.queue.length && this.queue.shift()();
    // 返回函数执行的结果
    return res;
  }
}

使用:

// 延迟函数
const sleep = time => new Promise(resolve => setTimeout(resolve, time));

// 同时进行的任务最多2个
const scheduler = new Scheduler(2);

// 添加异步任务
// time: 任务执行的时间
// val: 参数
const addTask = (time, val) => {
  scheduler.add(() => {>     return sleep(time).then(() => console.log(val));
  });
};

addTask(1000, '1');
addTask(500, '2');
addTask(300, '3');
addTask(400, '4');
// 2
// 3
// 1
// 4

兄弟,你这代码gpt写的吗,还是哪个仓库看见的,这种写法应该是我第一次写。注释都是我的注释🙉。我只给字节面试官写过一次。链接

wangyang-o avatar Mar 19 '24 07:03 wangyang-o

function PromiseWithResolvers() {
    let resolve = (value) => {},
      reject = (reason) => {};
    const promise = new Promise((res, rej) => {
      resolve = res;
      reject = rej;
    });
    return { promise, resolve, reject };
  }

class PromiseScheduler {
  #tasks = [];
  #isRunning = false;
  #runningCount = 0;
  #maxRunning = 1;
  #RungingChange = (value, type) => {};
  constructor(maxRunning = 1) {
    this.#maxRunning = maxRunning;
  }
  addTask(task) {
    // Add a task to the queue
    //判断是否为异步函数
    if (typeof task !== "function") {
      throw new Error("task must be a function");
    }
    if (
      task[Symbol.toStringTag] !== "AsyncFunction" ||
      task["constructor"].name !== "AsyncFunction"
    ) {
      throw new Error("task must be a async function");
    }
    this.#tasks.push(task);
  }
  addTasks(tasks) {
    // Add a list of tasks to the queue
    let addTask = (task) => {
      this.addTask(task);
    };
    tasks.forEach(addTask);
  }
  start() {
    this.#isRunning = true;
    this.#run();
  }
  #run() {
    // Start the execution of tasks
    if (
      !this.#isRunning ||
      this.#runningCount >= this.#maxRunning ||
      !this.#tasks.length > 0
    ) {
      return;
    }
    const { promise, resolve, reject } = PromiseWithResolvers();
    const task = this.#tasks.shift();
    if (typeof task === "function") {
      this.#runningCount++;
      task().then(resolve).catch(reject);
      this.#RungingChange(promise, "running");
      promise
        .then((value) => {
          this.#RungingChange(value, "success");
        })
        .catch((reason) => {
          this.#RungingChange(reason, "error");
        })
        .finally(() => {
          this.#runningCount--;
          this.#run();
        });
    }
    this.#run();
  }
  pause() {
    // Pause the execution of tasks
    this.#isRunning = false;
  }
  stop() {
    // Stop the execution of tasks
    this.pause();
    this.clearAll();
  }
  clear(task) {
    // Clear the queue
    this.#tasks = this.#tasks.filter((t) => t !== task);
  }
  clearAll() {
    // Clear the queue
    this.#tasks = [];
  }
  setMaxRunning(maxRunning) {
    // Set the maximum number of tasks that can be executed at the same time
    if (typeof maxRunning !== "number") {
      throw new Error("maxRunning must be a number");
    }
    if (maxRunning < 1) {
      throw new Error("maxRunning must be greater than 0");
    }
    if (Object.is(maxRunning, NaN)) {
      throw new Error("maxRunning not is NAN");
    }

    this.#maxRunning = maxRunning;
  }
  onRunningChange(cb) {
    if (typeof cb !== "function") {
      throw new Error("cb must be a function");
    }
    this.#RungingChange = cb;
  }
}

JiangXinYu113 avatar Aug 22 '24 09:08 JiangXinYu113

class Scheduler {
    // 当前任务执行数量
    curCount = 0;
    // 阻塞任务
    queue = [];
    constructor(max){
        this.max = max
    }
    async add(cb){
        if(this.curCount >= this.max){
            await new Promise(resolve => this.queue.push(resolve))
        }
        this.curCount++;
        const result = await cb()
        this.curCount--;
        if(this.queue.length){
            this.queue.shift()()
        }
        return result;
    }
}
const seelp = (time) => new Promise(resolve => setTimeout(resolve,time))
const addTask = (time,message) => {
    return scheduler.add(() => {
        return seelp(time).then(() => {
            console.log(message)
        })
    })
}
const scheduler = new Scheduler(2)
addTask(1000,'1')
addTask(500,'2')
addTask(300,'3')
addTask(400,'4')

刚开始没理解 看了各位大佬的解答才懂 其实就是 把 promise 任务交给调度器 然后调度器控制当前最多能执行多少任务,超过数量的任务就只能等待前面任务完成后才能执行

jianxingyao avatar Sep 15 '24 11:09 jianxingyao