Rebus icon indicating copy to clipboard operation
Rebus copied to clipboard

Ordered consuming of messages

Open aristotelos opened this issue 3 years ago • 4 comments

I have a question on how to achieve ordered consumption of messages.

I read that Rebus by default guarantees at-least-once delivery. For RabbitMQ, I found out that publisher confirms are enabled by default (not documented on the wiki, but true). The queue is by default durable (not documented on the wiki, but true). Messages are by default persistent, not transient (not documented on the wiki, but true).

Now what I am struggling with still is that I want to make sure that messages are processed in order. I think that Rebus guarantees ordered delivery. Is that true? I found out that when message delivery fails, the same message is retried 4 times before other messages are consumed, so that the order is not broken when consumption fails for a couple of times. However, on the 5th failure it is put to the error queue. So far so good.

But how does this work when there are multiple instances? I want to design my application for high availability. So I have 2 instances of e.g. a "patient service". When both instances subscribe to a message using the same "subscriber queue", they will act as competing consumers, which breaks ordered processing (both instances will process messages which can conflict with each other, for example "update patient 1 name to Foo" and "update patient 1 name to Bar" are conflicting messages so should be processed in strict order to get the right result). How can I have only one instance do the processing until the other instance is down? Or at least have one instance at the time processing a message, not two at the same time?

aristotelos avatar Feb 08 '22 11:02 aristotelos

I think that Rebus guarantees ordered delivery. Is that true?

I always advise strongly against using message queues for things that require ordered delivery.

This is not because of Rebus, this is an inherent property of message queues.

Even when a message queue is FIFO, as you correctly point out there's lots of situations that can lead to (slightly, or greatly) reordered messages – concurrent receives/parallel processing being one, and retried/redelivered messages being another.

I know that e.g. Azure Service Bus supports "sessions", which is a way to circumvent these challenges, but my own experience is that one is better off with building queue-based consumers to be tolerant to the expected degree of reordering, or simply reach out for a technology (like Kafka) where ordering is built in.

Oh, and if you have many threads/processes competing for updates, and you expect contention around specific object instances, I suggest you take a look at e.g. the nifty DistributedLock library, which can help you dodge those situations.

How does that sound to you?

mookid8000 avatar Feb 08 '22 12:02 mookid8000

Thanks for that guidance. I had a look at Kafka already, and I agree that would be good fit for my purposes. I just was considering other options as well. It's good to know that message queues are not so fit for ordered delivery.

I am wondering how to achieve "building queue-based consumers to be tolerant to the expected degree of reordering". My use case is around processing HL7 v2 messages, and I am wondering how consumers could be made reordering-tolerant in that case. To come back to the example I already mentioned: two messages "update patient 1 name to Foo" and "update patient 1 name to Bar" are conflicting so should be processed in strict order to get the right result. What could help is have a concept of "version" of entities: "update patient 1 (with version 1234) name to Foo" and "update patient 1 (with version 1235) name to Bar" can be checked if the right version of the entity is present in the database when they are processed. If there are other patterns that can achieve ordered delivery with message queues, I would be interested to hear them.

aristotelos avatar Feb 08 '22 14:02 aristotelos

(...) I am wondering how consumers could be made reordering-tolerant in that case

Often, providing a revision number of some kind on the event can make filtering easy on the subscriber side, simply by discarding events with a revision number <= the last recorded revision. In some cases, you can even get away with adding the current time to the events, just note that DateTimeOffset.Now is not guaranteed to be monotonically increasing, because sometimes the clock on the server is adjusted backwards.

In cases where you want to do something after having received some specific, potentially reordered events, e.g. like

"receive A, receive B then do C"

or

"receive B, receive A then do C"

you might be able to use Rebus' sagas to coordinate your actions. You can read more on the wiki page about coordinating things that happen over time - or check out this sample if you want to see some code. 🙂


mookid8000 avatar Feb 08 '22 15:02 mookid8000

One comment about distributed locks: i could not managed to work affordably using azure SQL server

sabbadino avatar May 15 '22 08:05 sabbadino

I am struggling with this at the moment and I did actually write support for ordered consumption of messages in the MySQL transport and it is natively supported in other messaging systems also (like Google Pub Sub which I started building a transport for but never finished).

Making messages idempotent can be tough, but if you can do it is probably the best solution. In our cases we have messages that need to be processed in order and I am thinking of ways to make them be able to be processed out of order. Normally when everything is running smoothly, we have no issues as the MySQL transport guarantees that messages are not processed out of order if you provide an ordering key in the message header even with competing consumers on different nodes (we have 10 threads on three separate nodes, so 30 threads competing for messages).

The problem we have at the moment as mentioned here https://github.com/rebus-org/Rebus/issues/1067, is that things go haywire if you either defer a message, or punt it to the dead letter queue. If you defer a message as outlined here https://github.com/rebus-org/Rebus/wiki/Automatic-retries-and-error-handling, the act of deferring the message gives it a new visible timestamp so it will show up on the bus at a later time to be retried. But that means any other messages on the bus for the same ordering key will now be processed out of order (whoops).

So even if the underlying transport can guarantee ordered message processing like the MySQL transport or something based on Google Pub Sub for instance, once you either defer a message or have to send it to the dead letter queue to replay it later manually, things are now out of order. I have been trying to figure out a way to change the MySQL transport to keep the ordering alive for deferred or positioned messages, but there is no easy solution that is not going to kill the performance of the message bus.

So I think the solution I am going to take is to stop relying on message ordering complete, and work on making the messages we process fully idempotent so they can be processed out of order. Most likely using some kind of state machine or collision detection to handle situations where they are delivered out of order. Keeping the transport level ordering keys is nice because it does mean the edge cases where things can go wrong is way less likely, but you still have to handle the edge cases because they WILL happen, right?

kendallb avatar Dec 08 '22 15:12 kendallb

So I think the solution I am going to take is to stop relying on message ordering complete, and work on making the messages we process fully idempotent so they can be processed out of order.

Strictly speaking, order tolerance ("commutivity") and idempotency are two different things – but yes, ideally cousumers should be tolerant towards both, which can often be achieved (maybe not 100%, but to a high enough degree to make all but the most theoretical problems disappear) by keeping it in mind when designing message interactions.

Keeping the transport level ordering keys is nice because it does mean the edge cases where things can go wrong is way less likely, but you still have to handle the edge cases because they WILL happen, right?

When you say "ordering keys" here, do you really mean the message priority header defined here? (which may be used to assign a priority to messages, thus enabling that some messages "overtake" other messages in the queue)

And you are right: While you can rely on the transport's built-in ordering when you disable all kinds of parallel message processing, the dead-letter queue and whatever await bus.Defer-based custom retry logic may be in use still come up mess things up from time to time.

I usually recommend people to

EITHER

  • use a queue-based broker (Azure Service Bus, RabbitMQ, Amazon SQS, etc) and make their consumers order-tolerant and "sufficiently idempotent"

OR

  • use a log-based broker (like Kafka) coupled with infinite retries and no parallelism beyond the built-in key-based partitioning and EITHER (make their consumers idempotent OR commit message offsets transactionally along with whatever (database) work they're doing).

All of this is hard. 😐 And just like everything else in software development, most of the hard work will be in the design/implementation of features, and not in the choice of technology or in the initial configuration of things.

mookid8000 avatar Dec 14 '22 13:12 mookid8000

When you say "ordering keys" here, do you really mean the message priority header defined here? (which may be used to assign a priority to messages, thus enabling that some messages "overtake" other messages in the queue)

No, I mean the ordering key headers that I implemented as part of the MySQL transport. It works across multiple threads on multiple nodes and the performance is not ruined, so it works well for our purposes. We are still working on the solution to allow the messages to get processed out of order (ie: if they fail and end up in the dead letter queue to be replayed later, or even if they fail and retry later via deferred processing). But in the general case when the message processing is not crashing, they all get processed in order :)

At some point I plan to finish the Google Pub Sub support and potentially move our message processing off MySQL onto that, and I modeled the ordering key support on how they do it, so I would add the same support for it when I finally get it done. MySQL works nicely for our purpose now as the message volume is not that high, but mostly for fault tolerance it would be nice if our message queue is not using the same database tier that our normal site is using (although I suppose in the short term I could just spin up a separate MySQL cluster for Rebus messages, but I suspect Google Pub Sub would be cheaper than three new MySQL nodes :)

kendallb avatar Dec 16 '22 17:12 kendallb