rqueue icon indicating copy to clipboard operation
rqueue copied to clipboard

Grouping message in the same queue

Open khashish opened this issue 3 years ago • 6 comments

Is your feature request related to a problem? Please describe. Messages in the queue don’t have the option to have a group id to signal them for fifo execution

Describe the solution you'd like. When enqueuing a message I should provide a message group id that would allow all messages in that queue with the same group id to execute in sequence while still allowing concurrency across different group ids

Describe alternatives you've considered Using SQS fifo since it allows both concurrent execution across different group ids and respects sequential execution for messages with same group id

khashish avatar Oct 14 '20 21:10 khashish

How many group ids do you expect?

There's an easiest way to solve if you have handful number of group ids, you can use group id as priority.

Use equal priority and concurrency=1

@RqueueListener(value = "event_queue", priority="critical=1,high=1,medium=1,low=1", concurrency=1)
  public void onMessage(Event event) {
    log.info("Event : {}", event);
  }

You can also define groupIds in the application configuration

# g1,g2,g3,g4,g5,g6 are group ids
rqueue.event.queue.priority="g1=1,g2=1,g3=1,g4=1,g5=1,g6=1"


@RqueueListener(value = "event_queue", priority="${rqueue.event.queue.priority}",concurrency=1)
  public void onMessage(Event event) {
    log.info("Event : {}", event);
  }

sonus21 avatar Oct 15 '20 02:10 sonus21

@sonus21 Thanks for your reply.

The group ids are dynamic and not a predefined static list. Imagine that you are creating an online ordering service and you want to group the messages based on the order id allowing for messages related to the same order id to be enqueued in a FIFO manner while at the same time allowing messages across different order ids to be executed concurrently.

So if message 1 = {order_id: 1, action="create_order"} , message 2 = {order_id: 1, action="reserve_credit"} in that case message 1 is consumed always before message 2

if however message 1 = {order_id: 1, action="create_order"} , message 2 = {order_id: 2, action="reserve_credit"} then there is no need for these to be executed in any particular order meaning that there is no problem of executing them concurrently.

That way you are able to provide ordering guarantees when needed while still maintaining a good throughput by allowing for concurrent consumption when it is possible.

khashish avatar Nov 01 '20 10:11 khashish

@khashish Thanks for the detail.

This feature is quite complex to support, you can use sharding/partitioning concept to deal with this. For example in Kafka we create partitions, so we can create partition in similar fashion but we need to add application code to deal with partitioning.

You can use any hashing strategy to deal with this, for example a simple hashing strategy could be

group id = int(order id) % (number of partition)

Once you have group id, you can enqueue item in the corresponding group of a queue, I would suggest you set concurrency value to less than or equal to number of partitions and you should use weight=1.

What about increasing/decreasing number of partition?

There's a no rebalancing mechanism like Kafka, so we need to deal with that. One simple strategy could be, we set the number of partition to large value like 127/257/509 (prime number) etc but smaller concurrency like 10 so you have 10 workers and if you see you need more workers than you can increase concurrency later. Changing concurrency is very easy than changing number of partition.

What about retry mechanism? You should disable all retry on this queue, as it might cause problem. Using retry mechanism on this queue entry could lead to out of order execution. But you can retry the same message multiple times once it's removed from queue, this can be added per queue if it's required, currently it's a global value. The rqueue.retry.per.poll property is used for retry mechanism, default value is 1 you can change this to 3 if you see, that's applicable.

sonus21 avatar Nov 02 '20 02:11 sonus21

There's one more issue left to solve here, due to competing listeners/consumers out of order execution can happen. To avoid this problem, we should set only set of group ids aka priority in one listener and we should not run competing listeners. For example let's say I have 10 workers and 127 group ids, each worker should be processing only subset of groups, for example let's say I want to run 5 competing consumers, as we need to run only 10 workers and we've 5 competing consumers, each consumer should have concurrency of 2 (number of workers/number of competing consumers), given we have 127 worker group ids, we should distribute them across all consumers.

[0,25) => Listener on machine 1 [25,50) => Listener on machine 2 [50,75) => Listener on machine 3 [75,101) => Listener on machine 4 [101,127) => Listener on machine 5

sonus21 avatar Nov 04 '20 07:11 sonus21

Is there any plan to support this ?

febinct avatar Aug 07 '21 03:08 febinct

@febinct would workaround solve your usecase?

sonus21 avatar Aug 08 '21 17:08 sonus21