feat: add amqp exchange routing pattern and ref in channel operation
Description
In Java Spring applications, it is possible to define an application that connects to an amqp broker, defines exchanges, queues and listens to incoming message on these queues.
My current understanding of amqp:
- A publisher connects to a broker
- The publisher sends a message with a routing key to an exchange
- The broker matches the message's routing key with the routing patterns of the defined exchanges and forwards the message to those queues.
- (Bindings are used to connect an exchange with a queue, which do have a n-to-m mapping)
- The consumer receives message from the queue
In AsyncAPI, the amqp binding supports two types of channels: routingKey (which is an exchange) and queue. Unfortunately, there is no link between the two.
Proposal: Add (optional) fields:
nameto specify the actual routing pattern used (in the (topic) exchange or amqp binding)channel.$refto connect theis=routingKeychannel type to theis=queuechannel type.
I am looking forward to your thoughts.
Example:
asyncapi: 3.0.0
info:
title: Springwolf example project - AMQP
version: 1.0.0
description: Springwolf example project to demonstrate springwolfs abilities
defaultContentType: application/json
servers:
amqp-server:
host: amqp:5672
protocol: amqp
channels:
queue-update-id_#_CRUD-topic-exchange-1-id:
address: CRUD-topic-exchange-1
messages:
java.lang.String:
$ref: "#/components/messages/java.lang.String"
bindings:
amqp:
is: routingKey
exchange:
name: CRUD-topic-exchange-1
type: topic
durable: true
autoDelete: false
vhost: /
bindingVersion: 0.3.0
queue-update_id:
address: queue-update
bindings:
amqp:
is: queue
queue:
name: queue-update
durable: false
exclusive: false
autoDelete: false
vhost: /
bindingVersion: 0.3.0
components:
schemas:
SpringRabbitListenerDefaultHeaders:
type: object
properties: {}
examples:
- {}
java.lang.String:
title: String
type: string
examples:
- '"string"'
messages:
java.lang.String:
headers:
$ref: "#/components/schemas/SpringRabbitListenerDefaultHeaders"
payload:
schemaFormat: application/vnd.aai.asyncapi+json;version=3.0.0
schema:
$ref: "#/components/schemas/java.lang.String"
name: java.lang.String
title: String
bindings:
amqp:
bindingVersion: 0.3.0
operations:
queue-update-id_#_CRUD-topic-exchange-1-id_receive_bindingsUpdate:
action: receive
channel:
$ref: "#/channels/queue-update-id_#_CRUD-topic-exchange-1-id"
bindings:
amqp:
expiration: 0
bindingVersion: 0.3.0
messages:
- $ref: "#/channels/queue-update-id_#_CRUD-topic-exchange-1-id/messages/java.lang.String"
@timonback this repo is destination point
Start from creating PR in schemas repo - https://github.com/asyncapi/spec-json-schemas/tree/master/bindings/amqp
Create new version 0.4.0 and change schema
Thank you @Pakisan for the pointing out the correct repo. I re-created this PR as https://github.com/asyncapi/spec-json-schemas/pull/555
@jonaslagoni @dalelane @smoya @GreenRover please, take a look
this PR is ready https://github.com/asyncapi/spec-json-schemas/pull/555
@jonaslagoni @dalelane @smoya @GreenRover please, take a look
this PR is ready asyncapi/spec-json-schemas#555
up
@derberg @fmvilas @dalelane @smoya @char0n @GreenRover
Sorry my knowledge about amqp is very low, i am not sure if i can help.
@derberg did not go into specifics of whether something made sense for AMQP, so I will stay out of the specifics :) Already reviewed https://github.com/asyncapi/spec-json-schemas/pull/555
Thank you for the review @derberg!
I have updated the PR to:
- update the description, including link to reference object
- differentiate between exchange and queue by
channel id - update the version
Example:
Producer sends a message to the exchange address signup with the routingKey user.signup.
Because a TopicExchange is used, the routingKey is identical to the queue address and the message gets routed to the user.signup queue.
Consumer will get message from user.signup queue.
This proposal adds a 1:1 mapping between exchange and queue to indicate and thereby implementing the simple TopicExchange case.
Based on the amqp implemention in Spring Boot, it is possible that 1 exchange routes to multiple different queues using different routing keys (i.e. topic exchange user.signup, company.signup)
The current implementation in Springwolf will an exchange channel per binding/routing key (by using different channels ids). I wonder whether this exchange duplication is ideal?
Similarly, an exchange may have multiple amqp bindings. For example, an exchange can be of type=topic for one routingKey and of type=fanout for a different one. Creating multiple amqp (exchange) channel bindings is probably not intended and duplication the best work-around (for now).
@timonback not sure I get it, but if you say that an exchange can have routingKey pointing to multiple channels, why not making channel actually channels and an array of $ref?
look at this example from the spec: https://www.asyncapi.com/docs/reference/specification/v3.0.0#channelObject where you can specify that channel is available only on specific servers, notice that one can specify servers array of reference objects
Hi everyone,
I have be asked by @timonback to "give some clarity to https://github.com/asyncapi/bindings/pull/259 and how exchanges and queues are connected in amqp? (is it a n:m mapping?)"
I am not really an expert, but I'll try ;-) Here is an answer from Copilot, answer that I changed a bit. I am not 100% sure this correct, but it the explanation bellow makes sense to me, so I am reasonably confident.
" In RabbitMQ, exchanges and queues are connected through bindings. Here’s a brief overview of how it works:
- Exchanges: These are message routing agents that receive messages from producers and route them to queues based on certain criteria.
- Queues: These store messages until they are consumed by a consumer.
- Bindings: These are the links between an exchange and a queue. A binding can include a routing key or pattern that the exchange uses to determine how to route messages to the queue.
Regarding the n:m mapping:
Yes, it is possible to have an n:m mapping between exchanges and queues. This means that multiple exchanges can route messages to multiple queues, and vice versa. This is achieved through multiple bindings. For example, you can bind multiple queues to a single exchange, and you can also bind a single queue to multiple exchanges12. "
This answer is 'in line' with my current understanding of how exchanges, bindings and queues are linked together.
I also add a copy of a comment I posted on SpringWolf's Github about that:
" Just adding a link and a picture to illustrate the different kind of exchanges supported by RabbitMQ:
src: https://hevodata.com/learn/rabbitmq-exchange-type/
So the 'complete chain' is:
Producer > Channel > Exchange > Binding > Routing Key > Queue > Consumer "
So, theoretically, I think that we could have:
- 2 exchanges,
Exchange_1andExchange_2( each exchange type can be either "Topic" or "Direct"), - a
Binding_1with exchange =Exchange_1and routing key =Queue_1 - a
Binding_2with exchange =Exchange_2and routing key =Queue_1(same routing key as binding 1) - and 1 Queue,
Queue_1.
I don't know if it's good practice or not, but I suppose it's a valid use case. Visually, this would be :
Thank you @pdalfarr for this explanation and pictures.
I'll attempt to summarize this GH issue
- AsyncAPI uses the concept of
channelsto describe an entity that canreceive/sendmessages. - In AsyncAPI
operationsare used to describe individualmessagesthat is eitherreceived/sendon one channel. - Amqp has the concept of an
exchange(channelin AsyncAPI), where an applicationsendmessage to. - Amqp has the concept of a
queue(alsochannelin AsyncAPI), where an application canreceivea message from. - Amqp
exchangesandqueuesallow for a m:n mapping, also in multiple layers (exchange1 -> exchange2 -> queue1 + queue2) - AsyncAPI
operationcannot be used to describe this broker routing, as associates 1 channel +receive/sendwith messages.
For details, check @pdalfarr comment above: https://github.com/asyncapi/bindings/pull/259#issuecomment-2466171062
Original issue
It would be great to have a way to describe, how messages are routed on the broker
Initial idea/improvement
Allow to link a channel (exchange) to another (1) channel (queue/exchange).
Problem:
- amqp is actually an n:m mapping
- Only one channel binding is supported for amqp, while one exchange (
channel) may have multiple targets and therefore routingKeys
Also, how to translate amqp bindings (basically the m:n mapping) to AsyncAPI?
Alternative
Rather use the operation object to bind a channel (exchange) to a amqp queue.
The target queue + routingKey could be part of the amqp operation binding.
What are your thoughts? Does this fit into the current AsyncAPI version?
(As I am not familiar enough with amqp, I cannot and do not plan a re-design of the amqp binding. If it turns out, that the use case is a non-goal or design limitation, that is fine with me.)
great summary @timonback thanks @pdalfarr for your explanation
I only do not understand what you mean by
AsyncAPI operation cannot be used to describe this broker routing, as associates 1 channel + receive/send with messages.
also lemme explain below a bit more:
Only one channel binding is supported for amqp, while one exchange (channel) may have multiple targets and therefore routingKeys
yes, one channel one binding, but AsyncAPI Channel is not necessarily 1:1 mapping with a real channel in the broker. With AsyncAPI v3 channel address is no longer an identifier of the channel. This means you can have multiple channels with the same address. You can for example model something like https://www.asyncapi.com/docs/concepts/asyncapi-document/reply-info#multiple-channels-with-single-message-when-reply-address-is-known with AsyncAPI v3
This pull request has been automatically marked as stale because it has not had recent activity :sleeping:
It will be closed in 120 days if no further activity occurs. To unstale this pull request, add a comment with detailed explanation.
There can be many reasons why some specific pull request has no activity. The most probable cause is lack of time, not lack of interest. AsyncAPI Initiative is a Linux Foundation project not owned by a single for-profit company. It is a community-driven initiative ruled under open governance model.
Let us figure out together how to push this pull request forward. Connect with us through one of many communication channels we established here.
Thank you for your patience :heart: