embedded-queue
embedded-queue copied to clipboard
【bug】consumer order problem
- I inserted 1, 2, 3, 4, 5, 6 messages
- The maximum number of jobs for consumers is 1
- Out-of-order consumption results 2, 3, 1, 4, 5, 6
async brachMsg(conversationId: string, messages: any | Array<Message>, min: number = 0, max: number = 0, sendTopic: boolean = false,msgType?: 'task') {
const msgCount = messages.length;
let sendCount = 0;
const isRoom = conversationId.indexOf('R:') >= -1 ? 1 : 0;
let noticeMessage = null;
let noticeIndex = 0;
// IS ROOM
if (isRoom === 1) {
let isText = 0;
const msgNotice = messages.filter((msg: Message, index: number) => {
if (isText >= 1) {
return;
}
if (msg.type == 'text' && msg.content && msg?.isNotice == 1) {
noticeMessage = msg;
noticeIndex = index;
isText++;
}
});
if (noticeMessage) {
await this.messageQueue.createJob({
type: QueueEunm.SEND_MESSAGE, 'priority': msgType =='task' ? Priority.LOW : Priority.CRITICAL, data: {
conversationId,
message: noticeMessage,
min,
max,
sendTopic,
msgCount
}
});
}
}
for (let index = 0; index < messages.length; index++) {
const message: Message = messages[index];
if (index !== noticeIndex && !noticeMessage) {
await this.messageQueue.createJob({
type: QueueEunm.SEND_MESSAGE, 'priority': msgType =='task' ? Priority.LOW : Priority.CRITICAL, data: {
conversationId,
message,
min,
max,
sendTopic,
msgCount
}
});
sendCount++;
}
}
return sendCount;
}
How to solve the sorting of a large number of concurrent insert jobs?
init Queue
this.messageQueue = await Queue.createQueue({
inMemoryOnly: true,
filename: PathHelper.getQueuePath(this.userId + '_message_queue'),
autoload: true
});
start
this.messageQueue.process(
QueueEunm.SEND_MESSAGE,
async job => {
// wait 10 seconds.
await new Promise(resolve => {
let waitTime = 0;
if (job.data && job.data['sendTopic'] && job.data['message']['isNotice'] == 0) {
waitTime = _.random(job.data['min'], job.data['max']) * 1000;
}
setTimeout(resolve, waitTime);
});
},
1
);
@goldencolor Thanks for the bug report.
What is the value of priority
?
The order of priority is executed in the order of priority value.
If priority values are the same, the tasks are executed in the order of their creation time.
(The order of task creation time is stored down to the millisecond, so unless the task was created in a very short period of time, I assume that duplicate values will not occur.)
I am guessing that this is the result of the following code.
'priority': msgType =='task' ? Priority.LOW : Priority.CRITICAL
I probably created 7-8 at the same time, the priority is the lowest
I think there is a risk of concurrency for a large amount of data? Is there any possibility to improve this? My current practice is to add a 100ms delay before creating the task, so that there will be no out-of-order situation, but this is actually very expensive for performance
await new Promise(resolve => setTimeout(resolve, 100));
await this.messageQueue.createJob({
type: QueueEunm.SEND_MESSAGE, 'priority': msgType =='task' ? Priority.LOW : Priority.CRITICAL, data: {
conversationId,
message,
min,
max,
sendTopic,
msgCount
}
});
sendCount++;
I see, so priority is the same value. If that is the case, I suspect that the creation time may be the same value as a result of adding jobs in a short period of time.
For the time being, it will be difficult to find time to modify the implementation to process jobs strictly in the order they are created. I believe that an interval of 1 millisecond or more would result in the expected behavior, so I would ask that you check the accuracy of the timer in your execution environment and set as short a time as possible to deal with the problem.
If the concurrency problem cannot be solved, is it possible to set the order of consumption conditions or the priority of certain values through conditions? For example, sort by a parameter of job.data
Ummm, in the current implementation, there is no way to specify from the outside....
What should I do? Any good ?😶🌫️
I would like you to first try to see if adding a few milliseconds of weighting will help.
If that does not seem to solve the problem, please look for another package or fork this repository and try to modify it.
If you can get Queue.createJob
to store the sequence number and JobRepository.findInactiveJobByType
to sort by that value, I think you will get the behavior you are looking for.
I am not sure if this approach is correct, and I can't guarantee that my repository will support it, as I don't think I will have time to study it for a while. Sorry.
👌 Thank you very much for coming to fix and optimize him on this foundation