aio-pika icon indicating copy to clipboard operation
aio-pika copied to clipboard

RPC reply queue

Open taybin opened this issue 4 years ago • 1 comments

Should the RPC pattern use the direct reply-to queue amq.rabbitmq.reply-to as described here: https://www.rabbitmq.com/direct-reply-to.html?

I'm also surprised to see a reply-to queue being created by the RPC server as well, and neither reply-to queue being declared with exclusive=True.

Is the RPC pattern meant to be an example or is it a work in progress or is there a reason for these choices?

Thank you! This library is terrific. I'm just a little confused about a couple details in the code for the RPC pattern.

taybin avatar May 06 '20 00:05 taybin

So, I started looking into this today for the same reason. I think the aio-pika RPC pattern is cool, but I wasn't entirely sure how I could RPC proxy multiple exchanges/queues with the current API (and I think it's missing some documentation for anything past simple examples).

I tried porting this: https://pika.readthedocs.io/en/stable/examples/direct_reply_to.html

Clearly not for production, just a proof of concept, but it seems to work as per https://www.rabbitmq.com/direct-reply-to.html

The rpc.server.queue is my request queue.

server.py

import asyncio

from aio_pika import connect_robust
from aio_pika.message import IncomingMessage, Message

channel = None


async def on_message(message: IncomingMessage):
    print(f"Server received: {str(message.body)} ==> {message}")

    if not channel:
        return

    await channel.default_exchange.publish(
        Message("Polo".encode()),
        routing_key=message.reply_to,
    )


async def main():
    global channel
    connection = await connect_robust("amqp://guest:guest@localhost")

    channel = await connection.channel()
    queue = await channel.declare_queue(
        "rpc.server.queue", exclusive=True, auto_delete=True
    )
    await queue.consume(on_message)
    return connection


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    connection = loop.run_until_complete(main())
    try:
        loop.run_forever()
    finally:
        loop.run_until_complete(connection.close())
        loop.shutdown_asyncgens()

client.py

import asyncio

from aio_pika import connect_robust
from aio_pika.message import IncomingMessage, Message


async def on_message(message: IncomingMessage):
    print(f"Client received: {str(message.body)}")


async def main():
    connection = await connect_robust("amqp://guest:guest@localhost")

    async with connection:
        channel = await connection.channel()
        queue = await channel.declare_queue("amq.rabbitmq.reply-to")
        await queue.consume(
            on_message,
            no_ack=True,
        )
        await channel.default_exchange.publish(
            Message("Marco".encode(), reply_to="amq.rabbitmq.reply-to"),
            routing_key="rpc.server.queue",
        )
        await asyncio.sleep(1)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

sureshjoshi avatar Nov 27 '20 04:11 sureshjoshi