faststream icon indicating copy to clipboard operation
faststream copied to clipboard

Using multiple brokers

Open rjambrecic opened this issue 2 years ago • 6 comments

We must be able to produce to broker_2 while consuming from broker_1

rjambrecic avatar Sep 05 '23 06:09 rjambrecic

  • [ ] store multiple brokers in the FastStream object and run all of them at start and close
  • [x] supports cross-brokers logging (for now we have the only one access_logger and it can't be shared between brokers)
    • we can't use brokers' logger in startup and shoutdown hooks, but we can use the application one
  • [ ] test multiple brokers startup and shutdown
  • [ ] test cross-brokers messaging (should works)
  • [ ] test cross-brokers messaging with TestClient (should works)
  • [ ] refactor AsyncAPI docs generation for this case
  • [ ] test AsyncAPI docs generation
  • [ ] add the documentation page

Lancetnik avatar Sep 07 '23 18:09 Lancetnik

Dear Lancetnik, Unfortunately multiple rabbit brokers with FastAPI doen't working properly, so I'm really looking forward to it

dimbler avatar Apr 01 '24 09:04 dimbler

Sorry, but I can't promise to release this feature in a nearest time. I am planning to implement it in 1.0.0 in the end of this year, but it is still raw.

Lancetnik avatar Apr 01 '24 11:04 Lancetnik

I'm keeping my fingers crossed

dimbler avatar Apr 01 '24 12:04 dimbler

same thing here, have a case where i need to consume messages from RabbitMQ and then publish to Kafka, some sort of bridge

UPD: Seems its working when you need to publish to another broker

kafka_broker = KafkaBroker(bootstrap_servers= "127.0.0.1:9092")

rabbit_broker = RabbitBroker("amqp://127.0.0.1:5672", virtualhost="/")

exch = RabbitExchange("logs_exchange", type=ExchangeType.TOPIC)
logs_q = RabbitQueue("faststream-queue", auto_delete=False, routing_key="logs")
app = FastStream(rabbit_broker)


@rabbit_broker.subscriber(queue=logs_q, exchange=exch, no_ack=False)
async def handle_logs_msg(data, logger: Logger, msg: RabbitMessage):
    await kafka_broker.publish(data, topic="logs")
    await msg.ack()

@app.on_startup
async def on_start():
    await kafka_broker.start()

frct1 avatar May 10 '24 14:05 frct1