rabbitmq-server icon indicating copy to clipboard operation
rabbitmq-server copied to clipboard

Post 3.13: Native AMQP 1.0

Open ansd opened this issue 10 months ago • 3 comments

Create v3.13.x branch before merging!

What

Similar to Native MQTT in https://github.com/rabbitmq/rabbitmq-server/pull/5895, this commits implements Native AMQP 1.0. By "native", we mean do not proxy via AMQP 0.9.1 anymore.

Why

Native AMQP 1.0 comes with the following major benefits:

  1. Similar to Native MQTT, this commit provides better throughput, latency, scalability, and resource usage for AMQP 1.0. See https://blog.rabbitmq.com/posts/2023/03/native-mqtt for native MQTT improvements. See further below for some benchmarks.
  2. Since AMQP 1.0 is not limited anymore by the AMQP 0.9.1 protocol, this commit allows implementing more AMQP 1.0 features in the future. Some features are already implemented in this commit (see next section).
  3. Simpler, better understandable, and more maintainable code.

Native AMQP 1.0 as implemented in this commit has the following major benefits compared to AMQP 0.9.1:

  1. Memory and disk alarms will only stop accepting incoming TRANSFER frames. New connections can still be created to consume from RabbitMQ to empty queues.
  2. Due to 4. no need anymore for separate connections for publishers and consumers as we currently recommend for AMQP 0.9.1. which potentially halves the number of physical TCP connections.
  3. When a single connection sends to multiple target queues, a single slow target queue won't block the entire connection. Publisher can still send data quickly to all other target queues.
  4. A publisher can request whether it wants publisher confirmation on a per-message basis. In AMQP 0.9.1 publisher confirms are configured per channel only.
  5. Consumers can change their "prefetch count" dynamically which isn't possible in our AMQP 0.9.1 implementation. See https://github.com/rabbitmq/rabbitmq-server/discussions/10174
  6. AMQP 1.0 is an extensible protocol

This commit also fixes dozens of bugs present in the AMQP 1.0 plugin in RabbitMQ 3.x - most of which cannot be backported due to the complexity and limitations of the old 3.x implementation.

This commit contains breaking changes and is therefore targeted for RabbitMQ 4.0.

Implementation details

  1. Breaking change: With Native AMQP, the behaviour of
Convert AMQP 0.9.1 message headers to application properties for an AMQP 1.0 consumer
amqp1_0.convert_amqp091_headers_to_app_props = false | true (default false)
Convert AMQP 1.0 Application Properties to AMQP 0.9.1 headers
amqp1_0.convert_app_props_to_amqp091_headers = false | true (default false)

will break because we always convert according to the message container conversions. For example, AMQP 0.9.1 x-headers will go into message-annotations instead of application properties. Also, false won’t be respected since we always convert the headers with message containers.

  1. Remove rabbit_queue_collector

rabbit_queue_collector is responsible for synchronously deleting exclusive queues. Since the AMQP 1.0 plugin never creates exclusive queues, rabbit_queue_collector doesn't need to be started in the first place. This will save 1 Erlang process per AMQP 1.0 connection.

  1. 7 processes per connection + 1 process per session in this commit instead of 7 processes per connection + 15 processes per session in 3.x Supervision hierarchy got re-designed.

  2. Use 1 writer process per AMQP 1.0 connection AMQP 0.9.1 uses a separate rabbit_writer Erlang process per AMQP 0.9.1 channel. Prior to this commit, AMQP 1.0 used a separate rabbit_amqp1_0_writer process per AMQP 1.0 session. Advantage of single writer proc per session (prior to this commit):

  • High parallelism for serialising packets if multiple sessions within a connection write heavily at the same time.

This commit uses a single writer process per AMQP 1.0 connection that is shared across all AMQP 1.0 sessions. Advantages of single writer proc per connection (this commit):

  • Lower memory usage with hundreds of thousands of AMQP 1.0 sessions
  • Less TCP and IP header overhead given that the single writer process can accumulate across all sessions bytes before flushing the socket.

In other words, this commit decides that a reader / writer process pair per AMQP 1.0 connection is good enough for bi-directional TRANSFER flows. Having a writer per session is too heavy. We still ensure high throughput by having separate reader, writer, and session processes.

  1. Transform rabbit_amqp1_0_writer into gen_server Why: Prior to this commit, when clicking on the AMQP 1.0 writer process in observer, the process crashed. Instead of handling all these debug messages of the sys module, it's better to implement a gen_server. There is no advantage of using a special OTP process over gen_server for the AMQP 1.0 writer. gen_server also provides cleaner format status output.

How: Message callbacks return a timeout of 0. After all messages in the inbox are processed, the timeout message is handled by flushing any pending bytes.

  1. Remove stats timer from writer AMQP 1.0 connections haven't emitted any stats previously.

  2. When there are contiguous queue confirmations in the session process mailbox, batch them. When the confirmations are sent to the publisher, a single DISPOSITION frame is sent for contiguously confirmed delivery IDs. This approach should be good enough. However it's sub optimal in scenarios where contiguous delivery IDs that need confirmations are rare, for example:

  • There are multiple links in the session with different sender settlement modes and sender publishes across these links interleaved.
  • sender settlement mode is mixed and sender publishes interleaved settled and unsettled TRANSFERs.
  1. Introduce credit API v2 Why: The AMQP 0.9.1 credit extension which is to be removed in 4.0 was poorly designed since basic.credit is a synchronous call into the queue process blocking the entire AMQP 1.0 session process.

How: Change the interactions between queue clients and queue server implementations:

  • Clients only request a credit reply if the FLOW's echo field is set
  • Include all link flow control state held by the queue process into a new credit_reply queue event:
    • available after the queue sends any deliveries
    • link-credit after the queue sends any deliveries
    • drain which allows us to combine the old queue events send_credit_reply and send_drained into a single new queue event credit_reply.
  • Include the consumer tag into the credit_reply queue event such that the AMQP 1.0 session process can process any credit replies asynchronously.

Link flow control state delivery-count also moves to the queue processes.

The new interactions are hidden behind feature flag credit_api_v2 to allow for rolling upgrades from 3.13 to 4.0.

  1. Use serial number arithmetic in quorum queues and session process.

  2. Completely bypass the rabbit_limiter module for AMQP 1.0 flow control. The goal is to eventually remove the rabbit_limiter module in 4.0 since AMQP 0.9.1 global QoS will be unsupported in 4.0. This commit lifts the AMQP 1.0 link flow control logic out of rabbit_limiter into rabbit_queue_consumers.

  3. Fix credit bug for streams: AMQP 1.0 settlements shouldn't top up link credit, only FLOW frames should top up link credit.

  4. Allow sender settle mode unsettled for streams since AMQP 1.0 acknowledgements to streams are no-ops (currently).

  5. Fix AMQP 1.0 client bugs Auto renewing credits should not be related to settling TRANSFERs. Remove field link_credit_unsettled as it was wrong and confusing. Prior to this commit auto renewal did not work when the sender uses sender settlement mode settled.

  6. Fix AMQP 1.0 client bugs The wrong outdated Link was passed to function auto_flow/2

  7. Use osiris chunk iterator Only hold messages of uncompressed sub batches in memory if consumer doesn't have sufficient credits. Compressed sub batches are skipped for non Stream protocol consumers.

  8. Fix incoming link flow control Always use confirms between AMQP 1.0 queue clients and queue servers. As already done internally by rabbit_fifo_client and rabbit_stream_queue, use confirms for classic queues as well.

  9. Include link handle into correlation when publishing messages to target queues such that session process can correlate confirms from target queues to incoming links.

  10. Only grant more credits to publishers if publisher hasn't sufficient credits anymore and there are not too many unconfirmed messages on the link.

  11. Completely ignore block and unblock queue actions and RabbitMQ credit flow between classic queue process and session process.

  12. Link flow control is independent between links. A client can refer to a queue or to an exchange with multiple dynamically added target queues. Multiple incoming links can also fan in to the same queue. However the link topology looks like, this commit ensures that each link is only granted more credits if that link isn't overloaded.

  13. A connection or a session can send to many different queues. In AMQP 0.9.1, a single slow queue will lead to the entire channel, and then entire connection being blocked. This commit makes sure that a single slow queue from one link won't slow down sending on other links. For example, having link A sending to a local classic queue and link B sending to 5 replica quorum queue, link B will naturally grant credits slower than link A. So, despite the quorum queue being slower in confirming messages, the same AMQP 1.0 connection and session can still pump data very fast into the classic queue.

  14. If cluster wide memory or disk alarm occurs. Each session sends a FLOW with incoming-window to 0 to sending client. If sending clients don’t obey, force disconnect the client.

If cluster wide memory alarm clears: Each session resumes with a FLOW defaulting to initial incoming-window.

  1. All operations apart of publishing TRANSFERS to RabbitMQ can continue during cluster wide alarms, specifically, attaching consumers and consuming, i.e. emptying queues. There is no need for separate AMQP 1.0 connections for publishers and consumers as recommended in our AMQP 0.9.1 implementation.

  2. Flow control summary:

  • If queue becomes bottleneck, that’s solved by slowing down individual sending links (AMQP 1.0 link flow control).
  • If session becomes bottleneck (more unlikely), that’s solved by AMQP 1.0 session flow control.
  • If connection becomes bottleneck, it naturally won’t read fast enough from the socket causing TCP backpressure being applied. Nowhere will RabbitMQ internal credit based flow control (i.e. module credit_flow) be used on the incoming AMQP 1.0 message path.
  1. Register AMQP sessions Prefer local-only pg over our custom pg_local implementation as pg is a better process group implementation than pg_local. pg_local was identified as bottleneck in tests where many MQTT clients were disconnected at once.

  2. Start a local-only pg when Rabbit boots:

A scope can be kept local-only by using a scope name that is unique cluster-wide, e.g. the node name: pg:start_link(node()). Register AMQP 1.0 connections and sessions with pg.

In future we should remove pg_local and instead use the new local-only pg for all registered processes such as AMQP 0.9.1 connections and channels.

  1. Requeue messages if link detached Although the spec allows to settle delivery IDs on detached links, RabbitMQ does not respect the 'closed' field of the DETACH frame and therefore handles every DETACH frame as closed. Since the link is closed, we expect every outstanding delivery to be requeued. In addition to consumer cancellation, detaching a link therefore causes in flight deliveries to be requeued. Note that this behaviour is different from merely consumer cancellation in AMQP 0.9.1: "After a consumer is cancelled there will be no future deliveries dispatched to it. Note that there can still be "in flight" deliveries dispatched previously. Cancelling a consumer will neither discard nor requeue them." [https://www.rabbitmq.com/consumers.html#unsubscribing] An AMQP receiver can first drain, and then detach to prevent "in flight" deliveries

  2. Init AMQP session with BEGIN frame Similar to how there can't be an MQTT processor without a CONNECT frame, there can't be an AMQP session without a BEGIN frame. This allows having strict dialyzer types for session flow control fields (i.e. not allowing 'undefined').

  3. Move serial_number to AMQP 1.0 common lib such that it can be used by both AMQP 1.0 server and client

  4. Fix AMQP client to do serial number arithmetic.

  5. AMQP client: Differentiate between delivery-id and transfer-id for better understandability.

  6. Fix link flow control in classic queues This commit fixes

java -jar target/perf-test.jar -ad false -f persistent -u cq -c 3000 -C 1000000 -y 0

followed by

./omq -x 0 amqp -T /queue/cq -D 1000000 --amqp-consumer-credits 2

Prior to this commit, (and on RabbitMQ 3.x) the consuming would halt after around 8 - 10,000 messages.

The bug was that in flight messages from classic queue process to session process were not taken into account when topping up credit to the classic queue process. Fixes https://github.com/rabbitmq/rabbitmq-server/issues/2597

The solution to this bug (and a much cleaner design anyway independent of this bug) is that queues should hold all link flow control state including the delivery-count.

Hence, when credit API v2 is used the delivery-count will be held by the classic queue process, quorum queue process, and stream queue client instead of managing the delivery-count in the session.

  1. The double level crediting between (a) session process and rabbit_fifo_client, and (b) rabbit_fifo_client and rabbit_fifo was removed. Therefore, instead of managing 3 separate delivery-counts (i. session, ii. rabbit_fifo_client, iii. rabbit_fifo), only 1 delivery-count is used in rabbit_fifo. This is a big simplification.

  2. This commit fixes quorum queues without bumping the machine version nor introducing new rabbit_fifo commands.

Whether credit API v2 is used is solely determined at link attachment time depending on whether feature flag credit_api_v2 is enabled.

Even when that feature flag will be enabled later on, this link will keep using credit API v1 until detached (or the node is shut down).

Eventually, after feature flag credit_api_v2 has been enabled and a subsequent rolling upgrade, all links will use credit API v2.

This approach is safe and simple.

The 2 alternatives to move delivery-count from the session process to the queue processes would have been:

i. Explicit feature flag credit_api_v2 migration function

  • Can use a gen_server:call and only finish migration once all delivery-counts were migrated. Cons:
  • Extra new message format just for migration is required.
  • Risky as migration will fail if a target queue doesn’t reply.

ii. Session always includes DeliveryCountSnd when crediting to the queue: Cons:

  • 2 delivery counts will be hold simultaneously in session proc and queue proc; could be solved by deleting the session proc’s delivery-count for credit-reply
  • What happens if the receiver doesn’t provide credit for a very long time? Is that a problem?
  1. Support stream filtering in AMQP 1.0 (by @acogoluegnes) Use the x-stream-filter-value message annotation to carry the filter value in a published message. Use the rabbitmq:stream-filter and rabbitmq:stream-match-unfiltered filters when creating a receiver that wants to filter out messages from a stream.

  2. Remove credit extension from AMQP 0.9.1 client

  3. Support maintenance mode closing AMQP 1.0 connections.

  4. Remove AMQP 0.9.1 client dependency from AMQP 1.0 implementation.

  5. Move AMQP 1.0 plugin to the core. AMQP 1.0 is enabled by default. The old rabbitmq_amqp1_0 plugin will be kept as a no-op plugin to prevent deployment tools from failing that execute:

rabbitmq-plugins enable rabbitmq_amqp1_0
rabbitmq-plugins disable rabbitmq_amqp1_0
  1. Breaking change: Remove CLI command rabbitmqctl list_amqp10_connections. Instead, list both AMQP 0.9.1 and AMQP 1.0 connections in list_connections:
rabbitmqctl list_connections protocol
Listing connections ...
protocol
{1, 0}
{0,9,1}

Benchmarks

Throughput & Latency

Setup:

  • Single node Ubuntu 22.04
  • Erlang 26.1.1

Start RabbitMQ:

make run-broker PLUGINS="rabbitmq_management rabbitmq_amqp1_0" FULL=1 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 3"

Predeclare durable classic queue cq1, durable quorum queue qq1, durable stream queue sq1.

Start client: https://github.com/ssorj/quiver https://hub.docker.com/r/ssorj/quiver/tags (digest 453a2aceda64)

docker run -it --rm --add-host host.docker.internal:host-gateway ssorj/quiver:latest
bash-5.1# quiver --version
quiver 0.4.0-SNAPSHOT
  1. Classic queue
quiver //host.docker.internal//amq/queue/cq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000

This commit:

Count ............................................. 1,000,000 messages
Duration ............................................... 73.8 seconds
Sender rate .......................................... 13,548 messages/s
Receiver rate ........................................ 13,547 messages/s
End-to-end rate ...................................... 13,547 messages/s

Latencies by percentile:

          0% ........ 0 ms       90.00% ........ 9 ms
         25% ........ 2 ms       99.00% ....... 14 ms
         50% ........ 4 ms       99.90% ....... 17 ms
        100% ....... 26 ms       99.99% ....... 24 ms

RabbitMQ 3.x (main branch as of 30 January 2024):

---------------------- Sender -----------------------  --------------------- Receiver ----------------------  --------
Time [s]      Count [m]  Rate [m/s]  CPU [%]  RSS [M]  Time [s]      Count [m]  Rate [m/s]  CPU [%]  RSS [M]  Lat [ms]
-----------------------------------------------------  -----------------------------------------------------  --------
     2.1        130,814      65,342        6     73.6       2.1          3,217       1,607        0      8.0       511
     4.1        163,580      16,367        2     74.1       4.1          3,217           0        0      8.0         0
     6.1        229,114      32,767        3     74.1       6.1          3,217           0        0      8.0         0
     8.1        261,880      16,367        2     74.1       8.1         67,874      32,296        8      8.2     7,662
    10.1        294,646      16,367        2     74.1      10.1         67,874           0        0      8.2         0
    12.1        360,180      32,734        3     74.1      12.1         67,874           0        0      8.2         0
    14.1        392,946      16,367        3     74.1      14.1         68,604         365        0      8.2    12,147
    16.1        458,480      32,734        3     74.1      16.1         68,604           0        0      8.2         0
    18.1        491,246      16,367        2     74.1      18.1         68,604           0        0      8.2         0
    20.1        556,780      32,767        4     74.1      20.1         68,604           0        0      8.2         0
    22.1        589,546      16,375        2     74.1      22.1         68,604           0        0      8.2         0
receiver timed out
    24.1        622,312      16,367        2     74.1      24.1         68,604           0        0      8.2         0
quiver:  error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
Traceback (most recent call last):
  File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
    _plano.wait(receiver, check=True)
  File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
    raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
  1. Quorum queue:
quiver //host.docker.internal//amq/queue/qq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000

This commit:

Count ............................................. 1,000,000 messages
Duration .............................................. 101.4 seconds
Sender rate ........................................... 9,867 messages/s
Receiver rate ......................................... 9,868 messages/s
End-to-end rate ....................................... 9,865 messages/s

Latencies by percentile:

          0% ....... 11 ms       90.00% ....... 23 ms
         25% ....... 15 ms       99.00% ....... 28 ms
         50% ....... 18 ms       99.90% ....... 33 ms
        100% ....... 49 ms       99.99% ....... 47 ms

RabbitMQ 3.x:

---------------------- Sender -----------------------  --------------------- Receiver ----------------------  --------
Time [s]      Count [m]  Rate [m/s]  CPU [%]  RSS [M]  Time [s]      Count [m]  Rate [m/s]  CPU [%]  RSS [M]  Lat [ms]
-----------------------------------------------------  -----------------------------------------------------  --------
     2.1        130,814      65,342        9     69.9       2.1         18,430       9,206        5      7.6     1,221
     4.1        163,580      16,375        5     70.2       4.1         18,867         218        0      7.6     2,168
     6.1        229,114      32,767        6     70.2       6.1         18,867           0        0      7.6         0
     8.1        294,648      32,734        7     70.2       8.1         18,867           0        0      7.6         0
    10.1        360,182      32,734        6     70.2      10.1         18,867           0        0      7.6         0
    12.1        425,716      32,767        6     70.2      12.1         18,867           0        0      7.6         0
receiver timed out
    14.1        458,482      16,367        5     70.2      14.1         18,867           0        0      7.6         0
quiver:  error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
Traceback (most recent call last):
  File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
    _plano.wait(receiver, check=True)
  File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
    raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
  1. Stream:
quiver-arrow send //host.docker.internal//amq/queue/sq1 --durable --count 1m -d 10m --summary --verbose

This commit:

Count ............................................. 1,000,000 messages
Duration ................................................ 8.7 seconds
Message rate ........................................ 115,154 messages/s

RabbitMQ 3.x:

Count ............................................. 1,000,000 messages
Duration ............................................... 21.2 seconds
Message rate ......................................... 47,232 messages/s

Memory usage

Start RabbitMQ:

ERL_MAX_PORTS=3000000 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+P 3000000 +S 6" make run-broker PLUGINS="rabbitmq_amqp1_0" FULL=1 RABBITMQ_CONFIG_FILE="rabbitmq.conf"
/bin/cat rabbitmq.conf

tcp_listen_options.sndbuf  = 2048
tcp_listen_options.recbuf  = 2048
vm_memory_high_watermark.relative = 0.95
vm_memory_high_watermark_paging_ratio = 0.95
loopback_users = none

Create 50k connections with 2 sessions per connection, i.e. 100k session in total:

package main

import (
	"context"
	"log"
	"time"

	"github.com/Azure/go-amqp"
)

func main() {
	for i := 0; i < 50000; i++ {
		conn, err := amqp.Dial(context.TODO(), "amqp://nuc", &amqp.ConnOptions{SASLType: amqp.SASLTypeAnonymous()})
		if err != nil {
			log.Fatal("dialing AMQP server:", err)
		}
		_, err = conn.NewSession(context.TODO(), nil)
		if err != nil {
			log.Fatal("creating AMQP session:", err)
		}
		_, err = conn.NewSession(context.TODO(), nil)
		if err != nil {
			log.Fatal("creating AMQP session:", err)
		}
	}
	log.Println("opened all connections")
	time.Sleep(5 * time.Hour)
}

This commit:

erlang:memory().
[{total,4586376480},
 {processes,4025898504},
 {processes_used,4025871040},
 {system,560477976},
 {atom,1048841},
 {atom_used,1042841},
 {binary,233228608},
 {code,21449982},
 {ets,108560464}]

erlang:system_info(process_count).
450289

7 procs per connection + 1 proc per session. (7 + 2*1) * 50,000 = 450,000 procs

RabbitMQ 3.x:

erlang:memory().
[{total,15168232704},
 {processes,14044779256},
 {processes_used,14044755120},
 {system,1123453448},
 {atom,1057033},
 {atom_used,1052587},
 {binary,236381264},
 {code,21790238},
 {ets,391423744}]

erlang:system_info(process_count).
1850309

7 procs per connection + 15 per session (7 + 2*15) * 50,000 = 1,850,000 procs

50k connections + 100k session require with this commit: 4.5 GB in RabbitMQ 3.x: 15 GB

Future work

Ideally until 4.0:

  1. More efficient parser and serializer
  2. TODO in mc_amqp: Do not store the parsed message on disk.
  3. Implement both AMQP HTTP extension and AMQP management extension to allow AMQP clients to create RabbitMQ objects (queues, exchanges, ...): https://github.com/rabbitmq/rabbitmq-server/pull/10559

ansd avatar Aug 07 '23 10:08 ansd