rabbitmq-server
rabbitmq-server copied to clipboard
Poison Message Handling in Classic Queues
Proposed Changes
The following changes are an implementation of message delivery limits in Classic Queues, mainly for the purpose of handling and processing poison messages in Classic Queues, as is the case with Quorum Queues.
These changes allow Message delivery limits in Classic Queues to be configured as follows:
1. Standard Policy
rabbitmqctl set_policy my-custom-dl-policy ".*" '{ "delivery-limit": 10 }' --apply-to queues
2. Operator Policy
rabbitmqctl set_operator_policy my-custom-dl-policy ".*" '{ "delivery-limit": 10 }' --apply-to queues
3. Queue Declare Argument
args.put( "x-delivery-limit", 10 );
channel.queueDeclare( “my-new-queue”, false, false, false, args );
Notes
- Message delivery limits in Classic Queues are for
defaultqueue only. - Queues of mode
lazyare not (yet) supported. - A new
classic_delivery_limitsfeature flag is introduced. - On enabling
classic_delivery_limitsfeature flag,message_propertiesof each message are expanded to include a message delivery counter, i.e.message_properties_v2 - Expanding the
message_propertiesis carried out both in the transient state and persistence layer of each queue. - In the persistence layer, the queue index (on disk) journal and segment files are scanned and each publish record binary payload header is extended to include a 4-byte delivery counter field.
- In the transient layer, the internal backing queue working queues for alpha, beta and gamma message contents are updated/upgraded to include the new delivery counter field
- Delta message records are updated when the queue index publish records are extended.
- A
rabbit_transforminterface is introduced, for managing and keeping track of changes and transition versions ofmessage_propertiesmessage record. This is also used for verifying if theclassic_delivery_limitsfeature flag is enabled/active or not. - The
rabbit_transforminterface creates arabbit_transformMnesia table for retaining transformation identities and versions that have been applied to the broker - The
rabbit_transforminterface can be used for any other internal entity that undergoes a change/transformation - whose name and version needs to be retained in the broker. The actual transform function remains private/hard-coded in the system. - During upgrade, permission to carry-out certain "steps" is queried through a
rabbit_transform:is_permitted/{1,2}operation, which also check for any active alarms. - For this purpose, the
rabbit_alarmmanager is extended to retain active alarm activity in a local ETS table, configured for{read_concurrency, true}. - When multiple queues are upgraded to use this feature/delivery limits, they concurrently
verify permission to do so through the
rabbit_alarmmanager, hence need for concurrent read(s) optimization. - Messages exceeding configured delivery limit are dropped/dead-lettered with reason
expired, to maximize performance of delivery limit expiry, with message expiry, retaining O(N) time complexity on the expiry run (where N is the queue depth).
Sponsors
This work has been carried out by Erlang Solutions and sponsored by Bloomberg
Types of Changes
What types of changes does your code introduce to this project?
Put an x in the boxes that apply
- [ ] Bug fix (non-breaking change which fixes issue #NNNN)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause an observable behavior change in existing systems)
- [ ] Documentation improvements (corrections, new content, etc)
- [ ] Cosmetic change (whitespace, formatting, etc)
Checklist
Put an x in the boxes that apply. You can also fill these out after creating
the PR. If you're unsure about any of them, don't hesitate to ask on the
mailing list. We're here to help! This is simply a reminder of what we are
going to look for before merging your code.
- [x] I have read the
CONTRIBUTING.mddocument - [x] I have signed the CA (see https://cla.pivotal.io/sign/rabbitmq)
- [x] All tests pass locally with my changes
- [x] I have added tests that prove my fix is effective or that my feature works
- [ ] I have added necessary documentation (if appropriate)
- [ ] Any dependent changes have been merged and published in related repositories
Related changes: https://github.com/rabbitmq/rabbitmq-ct-helpers/pull/46
This may address #2013
This PR will take a long time to test in various mixed-version cluster and upgrade scenarios. How long would it take to upgrade a node with a backlog of 10 GiB of data, for example?
Yes, those will take time. On upgrade, its only message metadata that is upgraded, i.e. total number of messages is the deciding factor. One mixed version upgrade test we ran, took ~14 seconds to upgrade 6-Million messages (i.e. 1000 queues, each with ~6000 messages), on a 3-node cluster. We added some configurable thresholds, which when reached, an upgrade is not permitted as a way of placing limits on loaded nodes.
Hi @Ayanda-D!
Could you please explain the intent behind the transformation? I understand the need to convert the messages on disk to remember their delivery count. But does it mean records in queue processes are converted from one record version to a new one?
Hi @dumbbell 👋 yes, the primary intent of transformations is to convert the message
property records both on disk and memory, to include delivery count field when the
feature flag is enabled. On the queue process, the backing queue state internal queue
(q1, q2, q3, q4) contents are also upgraded by applying the transform function,
message_properties:upgrade_to/2 to each entry. Basically fold through each
and apply transform function. This is through the new BQ:transform/5 callback.
For mirrored queues, a transformation command is also issued via gm:broadcast/2
to known queue mirrors across the cluster. A transformation, or feature flag enable is only
successful if all participants complete their local BQ transformation operations.
Likewise, on disk messages, delivery count field is also included, by scanning through queue index and extending the message properties binary payload representation with an additional 4-byte field.
I don't understand the requirement to convert all in-memory records. Considering the record as an opaque value and always using a module to access and modify it should make the use of any version of the record transparent to the code handling it. When the #amqqueue{} record was extended to allow the implementation of quorum queues, no record was modified in memory: both versions existed side by side inside processes and transferred between processes and nodes.
Can't the module wrapping all accesses to the record take care of the bits missing from an old version of the record?
Can a record be converted to a newer version only when needed?
I agree with @dumbbell that the code should handle both "v1" and "v2" versions of the records, eliminating the need for a transformation phase.
There may a need to transform the messages stored on disk because the current file format may not have room for the new delivery count. I have some comments on that as well, but I don't know enough the on-disk file format yet. I would like to sort out the in-memory records first.
It would be nice to come up with a way to version the file format to avoid transformations, IMHO.
The problem is with maintaining the delivery counter, per message. It has to be remembered “somewhere”, which, in memory, is the backing queue state internal queues (q1 - q4). The counter is incremented on each message consume operation (pending ACK), and retained in state. The transformation in memory adds the new counter field and initializes to 0. So yes, the record can be converted to a newer version only when needed, but we would still need to store and track the updated counter for each message. This could mean designating another in area memory for keeping track of the counter, which can get impractical with, e.g. millions of messages. So this dynamically updates the entries held in the backing queue state (q1 - q4).
@dumbbell i should've also mentioned, that all poison message handling is being processed in memory. Persisted entries will only initialize message properties record (set delivery counter = 0) on node restarts, or when paging kicks in. This is also why lazy queues aren’t supported (yet). Maintaining the counter on disk would be too expensive on throughput and latency.
Also, expiry of messages by delivery count is timer based. So in the case of
existing messages, if we apply the delivery limit policy on a set of queues,
a fold operation on the BQ state entries is carried out on the existing messages,
expiring those whose delivery count exceeds the limit specified on the policy
(with BQ:dropwhile/2 ). Hope i understood/answering your questions right ?
Yes, I understand you need to keep this counter somewhere and message properties might be a good place, I don't judge that (I don't know that specific record usage). In other words, I'm fine with the fact you used and expanded this record for the purpose of this feature.
I'm more concerned by the patch going through every copies of the record to update it. I believe it would be enough to take care of record updates only when we need to.
Though I'm not suggesting you should do that, the record conversion could be further delayed until the second delivery is attempted: we implicitly know that a message pending ack was delivered once at least.
@dumbbell Ok, understood. So we not only handle/transform the bits of record changes in the code, but also "BQ:publish" the transformed entries back into the backing queue state. I didn’t evaluate having messages coexist on different record versions, in the same queue. All entries in memory and on disk go for the same versions from onset (with only the counter value expected to change and deviate during runtime).
Approach of gradually transforming the message props, couple of questions come to mind:
-
Queue process will maintain different message property versions. I understand this is ok for different
#amqqueue{}for quorum and classic queues, however sounds safer as there isn’t need to convert/upgrade classic queues to quorum queues dynamically? Both queue types coexist and operate in isolation, which seems safer. In this case everything is within the same classic queue process (state). Not saying it wouldn’t work, probably would (with abstraction ofv1andv2modules), but need to evaluate full implications on queue process operations when multiple versions coexist. The backing queue inter-twines quite a lot of operations, even lifecycle of message (transitioning from alpha, beta, gamma, delta types). -
If the persisted records are upgraded to
v2when feature flag is enabled, we can have a case where, for the same message, we havev1message property record in memory and correspondingv2on the persisted queue index. This should be ok, if message property records in memory eventually transform and catchup on demand. But we need to evaluate the full implications. On node restarts, in memory record will be initialized from the queue index, tov2. -
Messages published to mirrored queues could have different version to message property on the master queue. Since only new arrivals replicated to mirrors by the channel (and gm broadcast). Thereafter, synchronization will only kick in if there are shifts in membership (both auto-synch and manual). So unless there are membership changes to trigger auto synch, or synchronization is manually triggered, only the queue master process in-memory message props will be dynamically/gradually transformed on demand. Not sure how mirrors will also keep up, and also gradually transform without triggering synchronization? Also, message props that have already been synchronized will not be replicated to mirrors (if transformation is gradual, new record will only exist later on, after
v1message prop is in mirror).
So current approach is transform everything on disk and memory, from onset, on
initialization. Mirrored queues are also instructed to transform from onset. (I needed
something like mnesia:transform_table/4 like we do for mnesia record entries which
we need to transform from one version to another on upgrades/feature flag enable -
but in this case for queues, hence add the rabbit_amqqueue:transform/3 interface).
Then the transform table keeps track of applied transformations, and reference point
for checking if particular feature flag was enabled or not. Similar to how we validate
other feature flags if enabled or not.
Also note that during transformation, a copy of the backing queue state ( #vqstate{} )
is transformed, and only after successful completion is it applied to the queue process
state ( #q{} ). If any interruption, fallback / the original backing queue state is reapplied
to #q{}.
But happy to take up suggested approach if we can safely handle these cases I’ve mentioned above^. Particularly with mirrored setup. There could be more points of consideration to the above, we’ll continue to think it through.
In the case of the #amqqueue{} record versions and classic queues, the queue processes don't know/don't care about the version of the record they use. We have to convert the two involved Mnesia tables because Mnesia requires all entries to have the same form, but that's it. When RabbitMQ reads from/queries the two Mnesia tables, they don't know what version of the record they will get. When they have to write to Mnesia tables, they use a small retry in the transaction: if the write fails (because they have an old record and Mnesia tables were converted meanwhile), they upgrade their record and write again.
In the context of a mirrored classic queue, master and slave processes don't care either what they have. If a slave queue process uses a different version of the record than the corresponding master, that's ok. The Mnesia tables are converted in a single transaction for the entire cluster because they are replicated. The feature flag is also enabled everywhere. But queue processes, master or slave, don't look at the feature flag itself, they just work with whatever is returned from Mnesia.
So for classic queues, they work equally no matter the version of the record. For quorum queues, the declare is denied unless the feature flag is enabled. If it is enabled, a new version of the record is initialized for that queue.
For the delivery count, those are good questions. I need to better understand how this works first. Let me throw more questions back at you! :-)
- To keep slave processes in sync, how does it work currently when you need to communicate increased delivery_count?
- Apart from keeping slaves in sync, only the master queue process is interested in the new record version if I understand correctly, so no slave queue processes would have to perform any conversion in the end. Is that correct?
- Is the
#message_properties{}record stored in the#amqqueue{}record? I don't think so, but I'm not 100% sure. - The
#message_properties{}record is written to the disk in the queue index only, is that correct? In other words, it only makes sense to a message in a particular queue, no matter if it was queued somewhere else as well?
About what we write to disk:
- We need to convert the file at some point, I agree. It could be when the feature flag is enabled, or when the queue has to write a new record to it and it's not yet converted. I feel the latter can put less load on the system and some files may not be converted at all in the end. I don't know what is the cost of writing that new file and how it would impact the queue's regular processing.
- The file unfortunately lacks versionning AFAIK. We need to take this opportunity to add magic number and a version at the beginning of the queue index file. I think we should write the version of the
#message_properties{}record as well as part of the record binary. This has many benefits:- we know what to expect from the file
- we continue to support the old and new format
- the file is self-sufficient: other tools can read those files without any access to the state of the feature flags
- thanks to the magic number, we can recognize the file in tools such as file(1) and we can even determine the endianness if the stored fields are sensitive to that if we pay attention to the magic number we select.
I think this should help with the difference of versions between the in-memory record and the on-disk one.
Thanks for elaborating on #amqqueue{} - I guess same ideas on assuming queue master agnostic mirrors can also be
adopted here regarding #message_properties{}. On your questions:
To keep slave processes in sync, how does it work currently when you need to communicate increased delivery_count?
Realtime sync of delivery_count to mirror processes is currently not supported. That’s one limitation we have. The most we support is on mirror promotion. I added a consume_redelivery_limit_with_mirror_promotion test case in the classic_delivery_limits_SUITE to validate how this would work. We can’t keep the mirror process delivery counts up-to-date (there was a possibility of having a standalone active sync process, but seemed it could be a stretch for “only” such a purpose).
Apart from keeping slaves in sync, only the master queue process is interested in the new record version if I understand correctly, so no slave queue processes would have to perform any conversion in the end. Is that correct?
Yes, that’s correct. Currently, it is to handle the mirror promotion case, to retain the delivery count which the queue master had at the time of promotion. Otherwise, there is no other case/reason why the mirror processes should do the conversion.
Is the #message_properties{} record stored in the #amqqueue{} record? I don't think so, but I'm not 100% sure.
It’s stored in backing queue state queues #vqstate{}. And note, as part of memory management, part of
the queue process procedures is to push in-memory messages to disk, i.e. push_betas_to_deltas. Just double
check for the mix of v1 and v2 records in memory. I already create a v2 record, when betas are being pushed
to disk, dependent on feature flag enable. So v1 records to be converted to v2 when being flushed to disk.
The #message_properties{} record is written to the disk in the queue index only, is that correct? In other words, it only makes sense to a message in a particular queue, no matter if it was queued somewhere else as well?
Yes, only the local queue index keeps track of the #message_properties{}. Remote mirrors will use their own,
however, when index is loaded from disk and there is shift in cluster membership, those message props are
replicated across to peer mirrors.
Regarding writing to disk. Backing queues have a write once optimization to the queue index. After the 1st write,
writes to the index are avoided. So if a message had been published as v1 before feature flag was enabled,
and index already written on disk, if we enable feature flag after, to convert to v2 (with the gradual conversion
approach), the entry in queue index will still be skipped from the write once optimization. So I’m not sure how
you can make this gradual without repeated disk writes to the index - this would impact performance on the
message publish path.
Enhancing the #message_properties{} to have version number could help. The queue index entries have
transitioned, and currently none have an associated version tag. To parse journal entries from disk to memory
is dependent on the anticipated payload size. I added PUB_RECORD_SIZE_BYTES_v1 and
PUB_RECORD_SIZE_BYTES_v2 macros in the queue index to handle different persisted entries, for
maintaining backward compatibility.
Sounds like good suggestions on the gradual approach of handling backward compatibility in memory. Anything/clarifications, you need from our side, let us know (time limited on bigger changes on this feature a.t.m, but Q&A, is ok!)
Delivery count sync in mirror slaves
You say that a slave gets up-to-date delivery counts when it is promoted as a queue master, but I don't understand how this works after looking at the patch. Could you please explain?
Regardless of how the delivery count is synced, I agree that it's not critical if we miss some updates/syncs. If the message is delivered more times than the configured limit because of a situation in the broker, that's fine.
Gradual conversion & queue index storage
Even though message properties are converted to the new record only when needed, the queue index could be converted entirely at once (regardless of the version of the records in memory). As soon as a message property record is written to the queue index, if it is v2 and the file was never converted, it is converted at that point (all records in the file are v2).
I would not add the version inside the record itself, just in the file (something like one byte for the record version, followed by all the bytes of the record binary). And as you already did, the version of the record is enough to know how many bytes to read from the file to get the record.
Other ideas
While we are already discussing several aspects at the same time, let me mention a few other ideas.
Delivery count vs redelivery count
After regarding all your good comments, I now think the approach could be more optimistic by recording the redelivery count instead of the delivery count. I.e. most of the time, the message is delivered, consumed and removed from the queue. In this case, we don't need to record it was delivered once.
I feel it's more important to track the fact it was delivered again. This way, we only do things like updating the queue index only if it is really necessary.
Future-proofness
As we are breaking the compatibility with the existing record and we have to convert files on disk, it's a good opportunity to revisit a few things at the same time IMHO.
First, is a record still the best way to store those properties? If RabbitMQ was written today, I bet we would use a map, not a record. A map is extensible and future proof. The downside is when you have typo in the key which could let bugs sneak in.
What about we start to use maps instead of a v2 record? It is far from trivial because it means:
- more places to change
- a larger patch to
rabbit_queue_indexto be able to read and write variable-size entries (it kind of already does that for the embedded message) - we still need a wrapper module in my opinion (i.e. something we can backport to e.g. 3.8.x (without the actual meat of the patch) so plugin developers can start to use a single API and have a plugin compatible with 3.8.x+)
If it's impossible to go that road, then I would include "reserved" space in the file, perhaps reserved fields in the record (I don't know about that last part). By reserved space in the file, I mean the message property entry would contain:
- a message properties record version (integer)
- the existing field: id, expiry, side, delivery count...
- 64 "reserved" bytes (the number is arbitrary here)
In the future, if we are to add more fields to the record, then we won't have to rewrite the file: we can take advantage of those reserved bytes. We will only need to convert the file when the reserved bytes are consumed and insufficient to write the next new record field.
What do you think?
On the case of queue mirrors, yes it’s only full sync or mirror addition on a cluster node, rather than “promotion” to be precise on the wording. The syncher process will take care of this. So no active synchronization in either case. & yes, delivery count keeping up is not mandatory. So gradual conversion across the cluster will be ok.
More on the Queue Index
Also on the queue index, one aspect to place into consideration is the queue index segments
held in memory. I have just pushed a commit to flush the queue index journal on transformation
to be included when the persistence layer of the index is being transformed. As an
enhancement, the queue process can/could support a transforming state, to block any
new incoming messages, until the queue index persistence layer has been transformed,
to ensure no new incoming messages are processed (cached and flushed to disk while on v1)
until feature flag enable completion.
Redelivery count vs Delivery count
On redelivery count, the rabbit queue process always attempts to first deliver to consumer on message arrival, before queueing it. So in way, we start tracking the delivery count after the first attempt of delivery from the backing queue. On the messaging path, the actual work being done is very minimal - the only thing that’s being done is check if the feature flag is enabled, then increment the counter. The queue index is never updated (only as already is being done with default classic queue - so no changes on that path). The backing queue behaviour doesnt offer much flexibility either to track redeliveries, it only requeues based on message ACK tags (dequeueing the alpha/beta queues in the process to acquire the message property record in the process). Future work would be to efficiently persist delivery count, allowing support for lazy queues, and also keep mirrors up-to-date across the cluster.
Then on expiring messages, the delivery count expiry timer running in isolation folds through the backing queue with a predicate for dropping messages exceeding delivery limit. This condition is added to the expiry condition, to ensure the backing-queue fold is not carried out multiple times (if, e.g both message expiry and delivery count are both set on a queue).
Maps vs records
Maps would be good but (based on personal preference) the non-flexibility/brittleness of records
could guarantee conformance between the queue-process, backing queue, syncher process (its the
#messsage_properties{} that’s also replicated on sync), as well the queue index. To keep these in line
with a fixed protocol or convention over a record sounds safer. Or maybe an options field map within #messsage_properties{}, allowing some extensibility on the existing record.
Reserved space in the persisted message property entry sounds good. There’ll probably be more message properties added in future, as new functionality is introduced. Then we only change the persisted record when need arise, i.e. allocated 64bytes are used up.