embedded-queue icon indicating copy to clipboard operation
embedded-queue copied to clipboard

【bug】consumer order problem

Open goldencolor opened this issue 2 years ago • 9 comments

  1. I inserted 1, 2, 3, 4, 5, 6 messages
  2. The maximum number of jobs for consumers is 1
  3. Out-of-order consumption results 2, 3, 1, 4, 5, 6
async brachMsg(conversationId: string, messages: any | Array<Message>, min: number = 0, max: number = 0, sendTopic: boolean = false,msgType?: 'task') {
    const msgCount = messages.length;
    let sendCount = 0;
    const isRoom = conversationId.indexOf('R:') >= -1 ? 1 : 0;
    let noticeMessage = null;
    let noticeIndex = 0;
    // IS ROOM
    if (isRoom === 1) {
      let isText = 0;
      const msgNotice = messages.filter((msg: Message, index: number) => {
        if (isText >= 1) {
          return;
        }
        if (msg.type == 'text' && msg.content && msg?.isNotice == 1) {
          noticeMessage = msg;
          noticeIndex = index;
          isText++;
        }
      });
      if (noticeMessage) {
        await this.messageQueue.createJob({
          type: QueueEunm.SEND_MESSAGE, 'priority': msgType =='task' ? Priority.LOW : Priority.CRITICAL, data: {
            conversationId,
            message: noticeMessage,
            min,
            max,
            sendTopic,
            msgCount
          }
        });
      }
    }

    for (let index = 0; index < messages.length; index++) {
      const message: Message = messages[index];
      if (index !== noticeIndex && !noticeMessage) {
        await this.messageQueue.createJob({
            type: QueueEunm.SEND_MESSAGE, 'priority': msgType =='task' ? Priority.LOW : Priority.CRITICAL, data: {
              conversationId,
              message,
              min,
              max,
              sendTopic,
              msgCount
            }
          });
          sendCount++;
      }
    }

    return sendCount;
  }

How to solve the sorting of a large number of concurrent insert jobs?

goldencolor avatar Jun 22 '22 05:06 goldencolor

init Queue

    this.messageQueue = await Queue.createQueue({
      inMemoryOnly: true,
      filename: PathHelper.getQueuePath(this.userId + '_message_queue'),
      autoload: true
    });

start


 this.messageQueue.process(
      QueueEunm.SEND_MESSAGE,
      async job => {
        // wait 10 seconds.
        await new Promise(resolve => {
          let waitTime = 0;
          if (job.data && job.data['sendTopic'] && job.data['message']['isNotice'] == 0) {
            waitTime = _.random(job.data['min'], job.data['max']) * 1000;
          }
          setTimeout(resolve, waitTime);
        });
      },
      1
    );

goldencolor avatar Jun 22 '22 05:06 goldencolor

@goldencolor Thanks for the bug report.

What is the value of priority? The order of priority is executed in the order of priority value. If priority values are the same, the tasks are executed in the order of their creation time. (The order of task creation time is stored down to the millisecond, so unless the task was created in a very short period of time, I assume that duplicate values will not occur.)

I am guessing that this is the result of the following code.

'priority': msgType =='task' ? Priority.LOW : Priority.CRITICAL

hajipy avatar Jun 22 '22 11:06 hajipy

I probably created 7-8 at the same time, the priority is the lowest

I think there is a risk of concurrency for a large amount of data? Is there any possibility to improve this? My current practice is to add a 100ms delay before creating the task, so that there will be no out-of-order situation, but this is actually very expensive for performance

 await new Promise(resolve => setTimeout(resolve, 100));
        await this.messageQueue.createJob({
          type: QueueEunm.SEND_MESSAGE, 'priority': msgType =='task' ? Priority.LOW : Priority.CRITICAL, data: {
            conversationId,
            message,
            min,
            max,
            sendTopic,
            msgCount
          }
        });
        sendCount++;

goldencolor avatar Jun 22 '22 13:06 goldencolor

I see, so priority is the same value. If that is the case, I suspect that the creation time may be the same value as a result of adding jobs in a short period of time.

For the time being, it will be difficult to find time to modify the implementation to process jobs strictly in the order they are created. I believe that an interval of 1 millisecond or more would result in the expected behavior, so I would ask that you check the accuracy of the timer in your execution environment and set as short a time as possible to deal with the problem.

hajipy avatar Jun 22 '22 14:06 hajipy

If the concurrency problem cannot be solved, is it possible to set the order of consumption conditions or the priority of certain values through conditions? For example, sort by a parameter of job.data

goldencolor avatar Jun 22 '22 14:06 goldencolor

Ummm, in the current implementation, there is no way to specify from the outside....

hajipy avatar Jun 22 '22 14:06 hajipy

What should I do? Any good ?😶‍🌫️

goldencolor avatar Jun 22 '22 15:06 goldencolor

I would like you to first try to see if adding a few milliseconds of weighting will help.

If that does not seem to solve the problem, please look for another package or fork this repository and try to modify it. If you can get Queue.createJob to store the sequence number and JobRepository.findInactiveJobByType to sort by that value, I think you will get the behavior you are looking for.

I am not sure if this approach is correct, and I can't guarantee that my repository will support it, as I don't think I will have time to study it for a while. Sorry.

hajipy avatar Jun 22 '22 15:06 hajipy

👌 Thank you very much for coming to fix and optimize him on this foundation

goldencolor avatar Jun 23 '22 08:06 goldencolor