faststream icon indicating copy to clipboard operation
faststream copied to clipboard

Bug: [RabbitBroker] When defining two handlers with different routing keys only the first is registered

Open amir-dropit opened this issue 1 year ago • 4 comments

When defining two different handlers for messages on the same queue with different routing keys only the first is registered.

import asyncio

from faststream import FastStream
from faststream.rabbit import RabbitBroker, RabbitQueue

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")

async def main():
    app = FastStream(broker)
    await app.run()


@broker.subscriber(RabbitQueue("queue1",routing_key="*.CheckStatusCommand"),"xchng")
async def check_status_handler(body):
    print(f"check_status_handler: {body}")


@broker.subscriber(RabbitQueue("queue1",routing_key="*.GetUserCommand"),"xchng")
async def get_user_handler(body):
    print(f"get_user_handler: {body}")

if __name__ == "__main__":
    asyncio.run(main())

Expected behavior One queue with two bindings, one for each routing key will be created

Observed behavior Only one binding is created

Screenshots image

Additional context faststream = {extras = ["rabbit"], version = "^0.5.33"}

amir-dropit avatar Jan 09 '25 20:01 amir-dropit

The problem is in queue hash function - https://github.com/airtai/faststream/blob/main/faststream/rabbit/schemas/queue.py#L49

Anyway, it should works fine in 0.6.0, so I don't want to fix it in main if it is not strongly required

Also, you can take a look at #1308 to dig into the problem

Lancetnik avatar Jan 10 '25 15:01 Lancetnik

So I overridden the hash method and it did solve the binding issue on startup

class DistinctQueue(RabbitQueue):
    def __hash__(self):
        return hash(self.routing_key) + super.__hash__(self)

But the thing is when I receive any message to the queue with one of those routing keys in the example it bounces between the handlers on every receive. For example I have two handlers handlerA for routeA handlerB for routeB both on the same queue

When I send two messages with routeA, the first one is received by handlerA and the second by handlerB Same if I send with routeB, the first one is received by handlerA and the second by handlerB

amir-dropit avatar Jan 10 '25 23:01 amir-dropit

@Lancetnik will this issue be resolved in #2034 ?

amir-dropit avatar Jan 15 '25 08:01 amir-dropit

@Lancetnik will this issue be resolved in #2034 ?

It is already solved in 0.6. this issue is just a further improvements

Lancetnik avatar Jan 15 '25 09:01 Lancetnik

@Lancetnik is there a way to test 0.6 yet?

amir-dropit avatar Jun 05 '25 20:06 amir-dropit

@Lancetnik is there a way to test 0.6 yet?

You can install it from the source branch – all 5k tests are green. Also, I hope, we can make a public RC in a few weeks

Lancetnik avatar Jun 06 '25 05:06 Lancetnik

@Lancetnik thanks, I've checked out rev 9130e72 and ran the code below.

from fastapi import Depends, FastAPI
from faststream.rabbit import RabbitExchange, RabbitQueue, ExchangeType
from pydantic import BaseModel

from faststream.rabbit.fastapi import RabbitRouter, Logger

router = RabbitRouter("amqp://guest:guest@localhost:5672/")

class Incoming(BaseModel):
    m: dict


exch = RabbitExchange("my-exchange", auto_delete=True, type=ExchangeType.TOPIC)

queue_1 = RabbitQueue("test-queue-1", auto_delete=True, routing_key="*.info")
queue_2 = RabbitQueue("test-queue-1", auto_delete=True, routing_key="*.debug")


@router.subscriber(queue_1, exch)
async def base_handler1(message: Incoming, logger: Logger):
    logger.info(f"base_handler1 ${message}")


@router.subscriber(queue_2, exch)  # another service
async def base_handler2(message: Incoming, logger: Logger):
    logger.info(f"base_handler2 ${message}")



app = FastAPI()
app.include_router(router)

The result was one queue with two binding from the exchange but with two different consumers

Image

When sending 2 messages with the routing Shard=all.info to the exchange, the first was handled by base_handler1 and the second by base_handler2

Is there a way to keep one queue which handles all messages for a service and different handlers/subscribers unique by routing key or header?

amir-dropit avatar Jun 06 '25 14:06 amir-dropit

@amir-dropit it doesn't work this way in RMQ – you can't bind one queue to one exchange multiple times. 1 exchange - 1 binding - 1 queue This is how RabbitMQ works

You should create different queues and bind them to the same Topic exchange with different routing keys

Lancetnik avatar Jun 06 '25 18:06 Lancetnik

@Lancetnik what you're describing is more of a direct exchange approach. Topic exchange can have multiple bindings to the same queue Image See also this example https://www.rabbitmq.com/tutorials/tutorial-five-python#topic-exchange

The binding part works fine with version 0.6.0 as you can see from my prev post. But consuming the messages in the consumer side is faulty since different handlers are treated the same. Is there a way to route the message in the consumer side to a specific handler, maybe with a unique header or argument?

amir-dropit avatar Jun 07 '25 07:06 amir-dropit

@amir-dropit technical, it can, but it doesn't work as you describe. Binding is a rule, how messages will be delivered to queue. There no any routing inside queue, so all queue' subscribers consume all messages from it. So, you can bind a queue to an exchange by different routing keys, but it doesn't allow to filter messages on a final consumers side

There is no mechanism how you can split queue messages in different flows on a consumer side. To achieve that, you should split your flows by queues. One queue – one flow, that I told about last time.

Lancetnik avatar Jun 07 '25 07:06 Lancetnik