Bug: [RabbitBroker] When defining two handlers with different routing keys only the first is registered
When defining two different handlers for messages on the same queue with different routing keys only the first is registered.
import asyncio
from faststream import FastStream
from faststream.rabbit import RabbitBroker, RabbitQueue
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
async def main():
app = FastStream(broker)
await app.run()
@broker.subscriber(RabbitQueue("queue1",routing_key="*.CheckStatusCommand"),"xchng")
async def check_status_handler(body):
print(f"check_status_handler: {body}")
@broker.subscriber(RabbitQueue("queue1",routing_key="*.GetUserCommand"),"xchng")
async def get_user_handler(body):
print(f"get_user_handler: {body}")
if __name__ == "__main__":
asyncio.run(main())
Expected behavior One queue with two bindings, one for each routing key will be created
Observed behavior Only one binding is created
Screenshots
Additional context faststream = {extras = ["rabbit"], version = "^0.5.33"}
The problem is in queue hash function - https://github.com/airtai/faststream/blob/main/faststream/rabbit/schemas/queue.py#L49
Anyway, it should works fine in 0.6.0, so I don't want to fix it in main if it is not strongly required
Also, you can take a look at #1308 to dig into the problem
So I overridden the hash method and it did solve the binding issue on startup
class DistinctQueue(RabbitQueue):
def __hash__(self):
return hash(self.routing_key) + super.__hash__(self)
But the thing is when I receive any message to the queue with one of those routing keys in the example it bounces between the handlers on every receive.
For example I have two handlers
handlerA for routeA
handlerB for routeB
both on the same queue
When I send two messages with routeA, the first one is received by handlerA and the second by handlerB
Same if I send with routeB, the first one is received by handlerA and the second by handlerB
@Lancetnik will this issue be resolved in #2034 ?
@Lancetnik will this issue be resolved in #2034 ?
It is already solved in 0.6. this issue is just a further improvements
@Lancetnik is there a way to test 0.6 yet?
@Lancetnik is there a way to test 0.6 yet?
You can install it from the source branch – all 5k tests are green. Also, I hope, we can make a public RC in a few weeks
@Lancetnik thanks, I've checked out rev 9130e72 and ran the code below.
from fastapi import Depends, FastAPI
from faststream.rabbit import RabbitExchange, RabbitQueue, ExchangeType
from pydantic import BaseModel
from faststream.rabbit.fastapi import RabbitRouter, Logger
router = RabbitRouter("amqp://guest:guest@localhost:5672/")
class Incoming(BaseModel):
m: dict
exch = RabbitExchange("my-exchange", auto_delete=True, type=ExchangeType.TOPIC)
queue_1 = RabbitQueue("test-queue-1", auto_delete=True, routing_key="*.info")
queue_2 = RabbitQueue("test-queue-1", auto_delete=True, routing_key="*.debug")
@router.subscriber(queue_1, exch)
async def base_handler1(message: Incoming, logger: Logger):
logger.info(f"base_handler1 ${message}")
@router.subscriber(queue_2, exch) # another service
async def base_handler2(message: Incoming, logger: Logger):
logger.info(f"base_handler2 ${message}")
app = FastAPI()
app.include_router(router)
The result was one queue with two binding from the exchange but with two different consumers
When sending 2 messages with the routing Shard=all.info to the exchange, the first was handled by base_handler1 and the second by base_handler2
Is there a way to keep one queue which handles all messages for a service and different handlers/subscribers unique by routing key or header?
@amir-dropit it doesn't work this way in RMQ – you can't bind one queue to one exchange multiple times. 1 exchange - 1 binding - 1 queue This is how RabbitMQ works
You should create different queues and bind them to the same Topic exchange with different routing keys
@Lancetnik what you're describing is more of a direct exchange approach.
Topic exchange can have multiple bindings to the same queue
See also this example
https://www.rabbitmq.com/tutorials/tutorial-five-python#topic-exchange
The binding part works fine with version 0.6.0 as you can see from my prev post. But consuming the messages in the consumer side is faulty since different handlers are treated the same. Is there a way to route the message in the consumer side to a specific handler, maybe with a unique header or argument?
@amir-dropit technical, it can, but it doesn't work as you describe. Binding is a rule, how messages will be delivered to queue. There no any routing inside queue, so all queue' subscribers consume all messages from it. So, you can bind a queue to an exchange by different routing keys, but it doesn't allow to filter messages on a final consumers side
There is no mechanism how you can split queue messages in different flows on a consumer side. To achieve that, you should split your flows by queues. One queue – one flow, that I told about last time.