nestjs icon indicating copy to clipboard operation
nestjs copied to clipboard

Работа с батчами

Open Nelfimov opened this issue 1 year ago • 8 comments

С чем связан запрос на фичу?

Необходимо написать библиотеку для батчинга

Расскажите как вы это себе видите

  • [ ] нужна самостоятельная библиотека для работы с батчами в памяти
  • [ ] в качестве сценария - батчинг сообщений из rabbitmq
  • [ ] добавить возможность прокидывать коллбэки на случаи
    • [ ] переполнения памяти
    • [ ] хэндлинг ошибок
  • [ ] покрыть тестами

Материалы

https://github.com/taskforcesh/bullmq

Nelfimov avatar Oct 02 '24 12:10 Nelfimov

Как я приблизительно вижу реализацию:

type QueueName = string | symbol
type CheckName = string | symbol
type AddCond<T> = {
  queueName: QueueName,
  item: T
}
type AddManyCond<T> = {
  queueName: QueueName,
  items: T[]
}
type ProcessorFn<T> = (queueName: QueueName, item: T[]) => void | Promise<void>
type CheckOk = () => void
type CheckFail = () => void
type Checks = {
  checkOk: CheckOk,
  checkFail: CheckFail
}
type OnChangeStateToOkCallback = () => void | Promise<void>
type OnChangeStateToFailCallback = () => void | Promise<void>
type Job = {
  queueName: QueueName,
  items: string[]
}

interface BatchQueueServiceI<T> {
  add(addCond: AddCond<T>): void
  addMany(addManyCond: AddManyCond<T>): void
  processBatch(processorFn: ProcessorFn<T>): void
  createCheck(checkName: CheckName, initialState: boolean): Checks
  deleteCheckQueue(checkName: CheckName): void
  onChangeTotalStateToOk(callback: OnChangeStateToOkCallback): void
  onChangeTotalStateToFail(callback: OnChangeStateToFailCallback): void
}

class BatchQueueService<T> implements BatchQueueServiceI<T> {
  private queues: Map<QueueName, T[]> = new Map();

  private checks: Map<string | symbol, boolean> = new Map();

  private onOkCallbacks: OnChangeStateToOkCallback[] = [];

  private onFailCallbacks: OnChangeStateToFailCallback[] = [];

  private maxQueueLength: number;

  private maxTotalQueueLength: number;

  private maxQueues: number;

  private timeoutDuration: number;

  private totalQueueLength: number = 0;

  private timers: Map<QueueName, NodeJS.Timeout> = new Map();

  private processorFn!: ProcessorFn<T>;

  constructor(
    maxQueueLength: number,
    maxTotalQueueLength: number,
    maxQueues: number,
    timeoutDuration: number
  ) {
    this.maxQueueLength = maxQueueLength;
    this.maxTotalQueueLength = maxTotalQueueLength;
    this.maxQueues = maxQueues;
    this.timeoutDuration = timeoutDuration;
  }

  public async add(addCond: AddCond<T>): Promise<void> {
    await this.addMany({
      queueName: addCond.queueName,
      items: [addCond.item]
    });
  }

  public async addMany(addManyCond: AddManyCond<T>): Promise<void> {
    this.checkAllChecks();

    const { queueName, items } = addManyCond;

    if (!this.queues.has(queueName)) {
      if (this.queues.size >= this.maxQueues) {
        await this.triggerProcessing(queueName);
        throw new Error("Maximum number of queues exceeded");
      }
      this.queues.set(queueName, []);
    }

    const queue = this.queues.get(queueName);
    if (!queue) return;

    if (queue.length + items.length > this.maxQueueLength) {
      await this.triggerProcessing(queueName);
      throw new Error("Maximum queue length exceeded");
    }

    queue.push(...items);
    this.totalQueueLength += items.length;

    if (this.totalQueueLength > this.maxTotalQueueLength) {
      await this.triggerProcessing(queueName);
      throw new Error("Maximum total length of queues exceeded");
    }

    this.startTimerIfNecessary(queueName);
  }

  private checkAllChecks() {
    const failedChecks = [];
    for (const [checkName, state] of this.checks.entries()) {
      if (!state) {
        failedChecks.push(checkName);
      }
    }
    if (failedChecks.length > 0) {
      throw new Error(`One or more checks failed: ${failedChecks.join(", ")}`);
    }
  }

  private startTimerIfNecessary(queueName: QueueName) {
    if (this.timers.has(queueName)) {
      clearTimeout(this.timers.get(queueName)!);
    }

    const timer = setTimeout(() => {
      this.triggerProcessing(queueName);
    }, this.timeoutDuration);

    this.timers.set(queueName, timer);
  }

  private async triggerProcessing(queueName: QueueName) {
    const timer = this.timers.get(queueName);
    if (timer) {
      clearTimeout(timer);
      this.timers.delete(queueName);
    }

    const items = this.queues.get(queueName);
    if (items && items.length > 0) {
      this.queues.set(queueName, []);
      this.totalQueueLength -= items.length;
      await this.processorFn(queueName, items);
    }
  }

  public processBatch(processorFn: ProcessorFn<T>): void {
    this.processorFn = processorFn;
  }

  public createCheck(
    checkName: CheckName,
    initialState: boolean
  ): Checks {
    this.checks.set(checkName, initialState);

    const checkOk: CheckOk = () => {
      const beforeTotalCheck = this.totalCheck();
      this.checks.set(checkName, true);
      if (beforeTotalCheck) return;
      const afterTotalCheck = this.totalCheck();
      if (afterTotalCheck) {
        this.triggerOkCallbacks();
      }
    };

    const checkFail: CheckFail = () => {
      const beforeTotalCheck = this.totalCheck();
      this.checks.set(checkName, false);
      if (!beforeTotalCheck) return;
      const afterTotalCheck = this.totalCheck();
      if (!afterTotalCheck) {
        this.triggerFailCallbacks();
      }
    };

    return { checkOk, checkFail };
  }

  private totalCheck(): boolean {
    for (const state of this.checks.values()) {
      if (!state) {
        return false;
      }
    }
    return true;
  }

  public deleteCheckQueue(checkName: CheckName): void {
    // TODO: full implement
    // If a check that was in bad condition was deleted and
    // other checks were in good condition, then you need to run callbacks
    this.checks.delete(checkName);
  }

  public onChangeTotalStateToOk(callback: OnChangeStateToOkCallback): void {
    this.onOkCallbacks.push(callback);
  }

  public onChangeTotalStateToFail(callback: OnChangeStateToFailCallback): void {
    this.onFailCallbacks.push(callback);
  }

  private triggerOkCallbacks(): void {
    for (const callback of this.onOkCallbacks) {
      await callback();
    }
  }

  private triggerFailCallbacks(): void {
    for (const callback of this.onFailCallbacks) {
      await callback();
    }
  }
}

Небольшой пример использования:

// maxQueueLength: number,
// maxTotalQueueLength: number,
// maxQueues: number,
// timeoutDuration: number
const batchQueueService = new BatchQueueService<string>(
  100_000,
  1_000_000,
  5,
  5000
);

batchQueueService.processBatch(async (queueName, items) => {
  console.log(`Processing queue: ${queueName.toString()} with items:`, items.length);
});

const memoryCheck = batchQueueService.createCheck('memoryCheck', true);
const cpuCheck = batchQueueService.createCheck('cpuCheck', true);

batchQueueService.onChangeTotalStateToOk(() => {
  console.log('All checks passed, state is OK');
});

batchQueueService.onChangeTotalStateToFail(() => {
  console.log('One or more checks failed, state is NOT OK');
});

function checkCpu() {
  const cpuOk = Math.random() >= 0.5;
  if (cpuOk) {
    cpuCheck.checkOk();
  } else {
    cpuCheck.checkFail();
  }
}

function checkMemory() {
  const memoryOk = Math.random() >= 0.5;
  if (memoryOk) {
    memoryCheck.checkOk();
  } else {
    memoryCheck.checkFail();
  }
}

setInterval(() => {
  checkCpu();
}, 5000);

setInterval(() => {
  checkMemory();
}, 5000);

let itemCounter = 0;
const queueName = Symbol('testQueue');
setInterval(async () => {
  try {
    const randomBigItem = Array.from({ length: 1000 }, () => 'a').join('');
    await batchQueueService.add({
      queueName: queueName,
      item: `Item radomBigItem ${itemCounter++}`
    });
  } catch (err) {
    // @ts-ignore
    console.log('Error adding item:', err.message);
  }
}, 0);

ToDo:

  • необходимо сделать чеки, которые будут вызываться при каждом вызове метода add, т.к. есть чеки, которые возможно не накладно вызывать при каждом add, тогда было бы хорошо иметь возможнсть вызываеть их сразу.
  • Надо посмотреть насколько накладно получать информацио о памяти и использовании процессора, если не накладно, то можно вообще при каждом add чек делать.
  • Нужно реализовать memoryChecker и возможно cpuChecker (memoryChecker самый важный) и чтобы можно было добавить их удобно, например передав в конструктор опции для чекера (например раз в какое время происходит шедулинг и лимит)
  • Возможно есть смысл хранить состояние чека не как boolean, а скорее как некий enum (hardBad, littleBad, ok), чтобы например если есть некоторое критическое количество littleBad считать чеки в плохом состоянии. Это точно не очень важно, пока ключевое это только memoryChecker
  • Нужно увеличить читаемость класса
  • Написать критические useCase-ы использования. Например много чекеров + нужен интеграционный тест с каким-то брокером

PS: Класс достаточно удобен для расширения, т.к. если понадобиться добавить новый checker (например проверка диска), то нужно лишь использовать .createCheck, который вернёт функции, для изменения состояния определённого чека.

OsirisAnubiz avatar Oct 02 '24 17:10 OsirisAnubiz

upd:

  • важно обернуть колбеки в try/catch и логировать это

OsirisAnubiz avatar Oct 02 '24 17:10 OsirisAnubiz

@OsirisAnubiz хорошо что подходишь к таскам так основательно.

Если хочешь вложить пример кода и он большой - то лучше либо gist либо уже в отдельной ветке.

Nelfimov avatar Oct 03 '24 05:10 Nelfimov

Проверил на сколько накладные process.hrtime и os.cpuUsage

  • process.hrtime
    • Время в среднем 250мс за 20_000 вызовов - приемлемо
    • Процессор нагружается 99% за 20_000 вызовов - плохо
  • os.memoryUsage
    • Время в среднем 260мс за 20_000 вызовов - приемлемо
    • Процессор нагружается 99% за 20_000 вызовов - плохо

Нужно делать проверку только раз в n вызовов метода .add

OsirisAnubiz avatar Oct 03 '24 08:10 OsirisAnubiz

@Nelfimov Поддерживаю про основательный подход к задачам. А код, да, лучше публиковать и обрабатывать через ветки

TorinAsakura avatar Oct 03 '24 09:10 TorinAsakura

В итоге пользователь пакета не должен иметь прямой доступ к BatchStore

Взаимодействие с модулем должно происходить таким образом:

BatchQueueProducer:

@Injectable()
class BatchQueueProducer {
  constructor(
    @BatchQueueProducer() private readonly batchQueueProducer: BatchQueueProducer<SomeEntity>,
  ) {}

  async someMethod() {
    await this.batchQueueProducer.add('queue-name', someEntity)
  }
}

BatchQueueStateHandler(думаю что часто Producer и StateHandler будут в одном классе например SomeRabbitConsumer):

@Injectable()
class BatchQueueStateHandler {
  constructor(
    @BatchQueueChangeStateHandler() private readonly batchQueueChangeStateHandler: BatchQueueChangeStateHandler,
  ) {}

  onModuleInit() {
    this.batchQueueChangeStateHandler.onChangeTotalStateToOk(startRabbitConsuming)
    this.batchQueueChangeStateHandler.onChangeTotalStateToFail(stopRabbitConsuming)
  }

  async stopRabbitConsuming() {
    // some logic
  }

  async startRabbitConsuming() {
    // some logic
  }
}

BatchQueueConsumer:

@Injectable()
class BatchQueueConsumer {
  constructor(
    @BatchQueueConsumer() private readonly batchQueueConsumer: BatchQueueConsumer<SomeEntity>,
  ) {}

  async someMethod() {
    this.batchQueueConsumer.consume(async (queueName: string | symbol, someEntitys: SomeEntity[ ]) => {
      // some logic
    })
  }
}

BatchQueueMemoryChecker:

@Injectable()
class BatchQueueMemoryChecker {
  constructor(
    @BatchQueueCheckConfigurator() private readonly batchQueueCheckConfigurator: BatchQueueConfigurator,
  ) {}

  async onModuleInit() {
    const initalState = false;
    const { checkOk, checkFail } = this.batchQueueCheckConfigurator.createCheck('memory-usage', initalState)
    const isMemoryOk = this.checkMemory()
    setInterval(() => {
      if (isMemoryOk) {
        await checkOk()
      } else {
        await checkFail()
      }
    }, 1000)
  }

  checkMemory(): boolean {
    // some logic
  }
}

SomeModule:

@Module({
  imports: [
    BatchQueueModule.forRoot({
      maxQueueLength: 10_000,
      maxTotalQueueLength: 100_000,
      maxQueues: 100,
      timeoutDuration: 2_000,
    }),
  ],
  providers: [
    BatchQueueProducer,
    BatchQueueStateHandler,
    BatchQueueConsumer,
    BatchQueueMemoryChecker,
  ],
})
class SomeModule {}

OsirisAnubiz avatar Oct 03 '24 17:10 OsirisAnubiz

TODO:

  • [x] Нужно дописать метод addCheckOnAdd, который будет делать определённые проверки через некоторое количество вызовов add
  • [x] Дописать юнит тесты для addCheckOnAdd
  • [x] Нужно напсать Producer, это просто класс проксирующий в BatchStore
  • [x] Нужно написать Consumer, это просто класс проксирующий в BatchStore
  • [x] Нужно написать StateHandler, это просто класс проксирующий в BatchStore
  • [x] Нужно написать CheckConfigurator, это просто класс проксирующий в BatchStore
  • [x] Нужно написать написать декораторы, которые инджектят классы для работы с очередями
    • [x] @BatchProducer()
    • [x] @BatchConsumer()
    • [x] @QueueStateHandler()
    • [x] @QueueCheckConfigurator()
  • [x] Нужно дописать BatchQueueModule. Он создаёт Producer, Consumer, StateHandler, CheckConfigurator и провайдит из инджекшен токены связанные с декораторами
  • [x] Нужно, чтобы MemoryChecker, которые предоставляеся этим пекеджом работал через @BatchQueueCheckConfigurator(), на данный момен он знает полный интерфейс BatchQueue
  • [x] Нужны интеграционные тесты по работе в связке с RabbitMQ

OsirisAnubiz avatar Oct 03 '24 17:10 OsirisAnubiz

Статус на конец дня:

  • Написана большая часть BatchQueue, остался метод addCheckOnAdd
  • Частично сделан модуль BatchQueueModule
  • Частично сделан BatchQueueMemoryChecker
  • Описано как должно работать взаимодействие с модулем
  • Написаны unit тесты для BatchQueue

Осталось сделать:

  • то, что описано в TODO, думаю за завтрашний день успею закончить

OsirisAnubiz avatar Oct 03 '24 17:10 OsirisAnubiz

Статус на конец дня:

  • Исправил ошибку из-за которой размер батча принимаемый консьюмером не соответствует максимальной длинне. (добавил ассинхонный мьютекс)
  • Написал интеграционные тесты, но последний не проходит. Почему-то консьюмер принимает сообщения, когда есть неудавшийся чек

Осталось сделать:

  • Понять из-за чего происходит ошибка в тестах и исправить её.
  • Изменить названия коммитов.

OsirisAnubiz avatar Oct 07 '24 18:10 OsirisAnubiz

upd:

  • нужно предотвратить утечку памяти в BatchQueue, забыл почистить использующиеся мапы

OsirisAnubiz avatar Oct 07 '24 18:10 OsirisAnubiz

Статус на конец дня

  • Исправил все замечания в PR
  • Сделал так, чтобы проверки с одинаковым именем, которые срабатывают при добавлении элемента в очередь и те которые вызывает пользователь пакета были как одна проверка. Это важно, т.к. если не пройдёт чек на добавление, то его нельзя будет восстановить

Осталось сделать

  • Исправить yarn check. Падает typecheck с очень странными ошибками на строчках: app = .... Это можно решить @ts-expect-error, но это замазывает проблему просто, надо понять в чём дело
  • Пройти PR

OsirisAnubiz avatar Oct 08 '24 17:10 OsirisAnubiz

Статус на конец дня

  • Исправил новые замечания на PR

Осталось сделать

  • Ничего, ожидаю замечания от последнего ревьюера

OsirisAnubiz avatar Oct 09 '24 16:10 OsirisAnubiz

Пытаюсь вынести какое-то время CheckManager, но не получается пока. Если нужно предусмотреть сложное сотояние, то состояние всего CheckManager думаю нормально хранить как мап где ключ это название чека, а значение это некий Symbol характеризирующий состояние

OsirisAnubiz avatar Oct 10 '24 14:10 OsirisAnubiz

Статус на конец дня

  • Разделил BatchQueue на несколько классов
  • Исправил остальные замечания на PR

Осталось сделать

  • Ничего. Ожидание замечаний на PR

OsirisAnubiz avatar Oct 10 '24 20:10 OsirisAnubiz

upd: из-за сложных состояний чеков будут слишком зацепляться классы, поэтому сейчас состояние ok или fail по сути. можно просто создать несколько чеков которые будут характеризовать более сложное состояние думаю это нормально убрал метод которые проверяет что-то на определённое количество добавлений, т.к. понял, что это бесполезная вещь

OsirisAnubiz avatar Oct 10 '24 20:10 OsirisAnubiz