rqueue icon indicating copy to clipboard operation
rqueue copied to clipboard

Priorities are NOT STRICT for queues with priority in STRICT polling mode

Open igorjava2025 opened this issue 1 month ago • 13 comments

What's not working? - Strict polling mode is not strict.

When high priority queue contains a lot of messages, lower priority queues messages must be not processed for real strict priority mode. But actual a little part of lower priority messages will be processed anyway before high priority messages will be processed.

What're application dependencies ?

  • Rqueue Version: 3.4.0
  • Spring Boot Version: 3.5.7
  • Spring Messaging Version: spring boot starter amqp 3.5.7
  • Spring Data Redis Version: spring boot starter data redis 3.5.7
  • Any other spring library dependencies and their version spring boot * 3.5.7

How to Reproduce (optional)? Create load test with a lot of medium priority messages and a little messages of high and low priority (like in example with sms).

  • Steps to reproduce the behaviour
  • A sample reproducible code if possible.

Additional Details (optional)

Add any other context about the problem that would be helpful like OS, Redis, Docker etc

Redis 7.2 Docker for spring boot application: any docker with jdk 21

I copy my messages from previos issue ( https://github.com/sonus21/rqueue/issues/17 ) :

I tried to check the work of queues with priority (I used the latest version 3.4.0 with Spring Boot 3.x), and priorities do not work correctly in my test. I use next settings: standalone Redis db (v 7.2), readFrom=MASTER_PREFERRED (checked in debug), 4 types priority (like in doc example with sms), STRICT polling mode (checked it in debug). I started load test with a lot of medium priority messages. When the Redis db was filled with medium-priority messages, I added a small load of low-priority and high-priority messages to the existing load of medium-priority messages. I expected that the high-priority messages would eventually start showing up in the metrics (they should be processed before the accumulated medium-priority messages), and the low-priority messages would be processed after the high-priority and medium-priority messages were processed. But the actual result was that low-priority messages began processing at the same time as high-priority messages. The only difference was in processing speed (high-priority messages were processed several times faster).

I will try to continue the research of this problem. Would appreciate any help.

I researched logs and current project code. Unfortunately It's really queues with not STRICT priority (even for STRICT polling mode). I think that main reason is logic of method "StrictPriorityPoller.getQueueToPoll()". It's this code"now - lastFetchedTime.get(queue) > Constants.MILLIS_IN_A_MINUTE"). After polling the queue with the highest priority, this queue will not be processed for 1 minute. Therefore, there are free threads that are taken to process queues with a lower priority.

igorjava2025 avatar Nov 28 '25 23:11 igorjava2025

For clarity, I am attaching load testing schedules

Image

left shedule - metric "processing_queue_size" from spring boot prometheus metrics middle schedule - metric "queue_size" from spring boot prometheus metrics right schedule - custom metric (message counter), it increments in the end of message processing (in the end of business logic)

low - refers to queue with low priority (10) medium- refers to queue with medium priority (40) high - refers to queue with high priority (80)

The graphs show that the queue size is growing. The number of threads is specially set to be small so that we can see the behavior. It can be seen from the graphs that even with high and medium priority messages, low priority messages are processed simultaneously with high priority and medium priority messages.

igorjava2025 avatar Nov 29 '25 00:11 igorjava2025

@igorjava2025 Thanks for bringing this up. The reason it processes messages from the other queue is to prevent starvation. If your requirement is to avoid processing them entirely, we can add specific properties in the code to enforce that.

sonus21 avatar Nov 29 '25 03:11 sonus21

@sonus21 Thank you for your quick reply

If your requirement is to avoid processing them entirely, we can add specific properties in the code to enforce that.

  1. Yes, it is necessary in my case.

  2. But in fact, I also need to not wait 1 minute for the next opportunity to receive messages from the queue with a high priority. I assume that this will mean a delay of 1 minute when processing messages with high priority. I'm not sure, but maybe customization of the value "Constants.MILLIS_IN_A_MINUTE" (in method "StrictPriorityPoller.getQueueToPoll()") will help solve this problem.

igorjava2025 avatar Nov 29 '25 11:11 igorjava2025

The main issue is that the system will always polls queues sequentially. For example, it checks queue1, then queue2, then queue3, which can significantly increase the number of Redis calls.

If I understand your requirement correctly, you want to poll the queues in priority order and stop as soon as no new messages are found in any of them, then repeat the cycle.

sonus21 avatar Nov 29 '25 11:11 sonus21

I just read the code again, this is how it works it will process one message per minute for any queue, is it processing more than one per minute? Because once we pull the message we set the last fetched message time to current time, which will not let this process the low priority queue.

https://github.com/sonus21/rqueue/blob/652ec46c52dc6f06604d500c15ac11a8633eb602/rqueue/src/main/java/com/github/sonus21/rqueue/listener/StrictPriorityPoller.java#L60

Please share the queue configuration. I can’t update the library right now, but if you create a PR, I can review and merge it.

sonus21 avatar Nov 29 '25 12:11 sonus21

If I understand your requirement correctly, you want to poll the queues in priority order and stop as soon as no new messages are found in any of them, then repeat the cycle.

when I think about it, it looks like this for correct algorithm work

while(true) {
  if (isNotEmpty(highPriorityQueue)) {
    // get messages from queue, process them and remove them from queue if messages will be processed without errors
    messages = poll(highPriorityQueue, batchSize);
    process(messages);
    if (messages.size >= batchSize) {
       // it means that queue with higher priority is not empty so the program can't go to queue with lower priority
       continue;
    } 
  }
  if (isNotEmpty(mediumPriorityQueue)) {
    messages = poll(mediumPriorityQueue, batchSize);
    process(messages);
    if (messages.size >= batchSize) {
       continue;
    }
  }
  if (isNotEmpty(lowPriorityQueue)) {
    messages = poll(lowPriorityQueue, batchSize);
    process(messages);
    if (messages.size >= batchSize) {
       continue;
    }
  }
}

igorjava2025 avatar Nov 29 '25 12:11 igorjava2025

Your assumption is correct. In practice, the library can’t control how users define priority, they might call it high/medium/low or a/b/c. We only care about knowing the priority, which is why everything is grouped. The only reliable approach is to poll all queues in priority order until one becomes inactive for any reason. The simplest fix is to disable the starvation check and observe if the behavior aligns with your requirement.

sonus21 avatar Nov 29 '25 12:11 sonus21

The simplest fix is to disable the starvation check and observe if the behavior aligns with your requirement.

Yes, maybe this will work

igorjava2025 avatar Nov 29 '25 12:11 igorjava2025

I just read the code again, this is how it works it will process one message per minute for any queue, is it processing more than one per minute?

Yes, it can process more than 1 message per minute. But also it can process only 1 message per minute. Such behavior is not always stable.

Please share the queue configuration

I create simple public project that can reproduce problem on local computer. Project has same queue configuration as in my previous load test so you can see all configurations in this project ( https://github.com/igorjava2025/rqueue-priority-load-test/tree/master )

igorjava2025 avatar Nov 30 '25 04:11 igorjava2025

The simplest fix is to disable the starvation check and observe if the behavior aligns with your requirement.

Unfortunately, this didn't solve my problem. Now I think it's not so easy to solve. Perhaps a different implementation of the StrictPriorityPoller class will be needed.

Below is the code I used to test the fixed version of StrictPriorityPoller (without starvation code)

private String getQueueToPoll() {
    long now = System.currentTimeMillis();
    // starvation
//    for (String queue : queues) {
//      if (eligibleForPolling(queue)) {
//        if (now - lastFetchedTime.get(queue) > Constants.MILLIS_IN_A_MINUTE) {
//          return queue;
//        }
//      }
//    }
    for (String queue : queues) {
      if (eligibleForPolling(queue)) {
        Long deactivationTime = queueDeactivationTime.get(queue);
        if (deactivationTime == null) {
          return queue;
        }
        if (now - deactivationTime > pollingInterval) {
          return queue;
        }
      }
    }
    return ALL_QUEUES_ARE_INELIGIBLE;
  }

Graph with the load test result (for rqueue version with commented starvation code) Image

Branch for load testing the fixed rqueue libraries: https://github.com/igorjava2025/rqueue-priority-load-test/tree/test-custom-rqueue-local-build

I think that for a correct implementation for my case, the next queue should not be processed until the previous queue has been processed. And the queue size should be taken into account minus the batch size, as I described earlier in the algorithm.

In the current implementation, the algorithm uses the deactivation time as a basis and moves on to the next queue if not enough time has passed since the previous deactivation. This doesn't take into account that the queue with a higher priority remains with a lot of messages. In the example above, this is the queue with medium priority. Low priority messages processing was finished before medium priority messages processing was finished.

igorjava2025 avatar Dec 07 '25 12:12 igorjava2025

But won’t this hurt performance? If we always poll high-priority first and then low-priority, won’t we end up repeatedly scanning from the top of the high-priority queue all the way down to the low-priority queue, over and over again? What’s the cost of enforcing such strict ordering every time?

Its possible to do that, you can run in a loop where we remove all the queues, and get the queue, i.e we should override poll method.

sonus21 avatar Dec 08 '25 13:12 sonus21

But won’t this hurt performance? If we always poll high-priority first and then low-priority, won’t we end up repeatedly scanning from the top of the high-priority queue all the way down to the low-priority queue, over and over again? What’s the cost of enforcing such strict ordering every time?

I'll take performance measurements before and after the fix. It's hard to say yet how significant the impact on performance will be.

Its possible to do that, you can run in a loop where we remove all the queues, and get the queue, i.e we should override poll method.

When I describe the algorithm, I didn't take into account the rqueue library internal logic. The fix should not create other bugs. Library users should also have a choice between the option they are currently using and the option of my case. I will also try to either minimize the performance decreasing or give users the ability to adjust settings that affect performance. I will try to prepare pull request taking these considerations into account.

igorjava2025 avatar Dec 09 '25 05:12 igorjava2025

Yeah, that makes sense. We should put this control behind a feature flag something like HARD_STRICT, which we can enable only for special cases like yours. As mentioned earlier, such strict rules usually aren’t necessary in most scenarios.

I’d also suggest creating a separate poller that follows this stricter algorithm instead of modifying the existing one.

sonus21 avatar Dec 10 '25 18:12 sonus21