aio-pika
aio-pika copied to clipboard
RPC reply queue
Should the RPC pattern use the direct reply-to queue amq.rabbitmq.reply-to
as described here: https://www.rabbitmq.com/direct-reply-to.html?
I'm also surprised to see a reply-to queue being created by the RPC server as well, and neither reply-to queue being declared with exclusive=True
.
Is the RPC pattern meant to be an example or is it a work in progress or is there a reason for these choices?
Thank you! This library is terrific. I'm just a little confused about a couple details in the code for the RPC pattern.
So, I started looking into this today for the same reason. I think the aio-pika RPC pattern is cool, but I wasn't entirely sure how I could RPC proxy multiple exchanges/queues with the current API (and I think it's missing some documentation for anything past simple examples).
I tried porting this: https://pika.readthedocs.io/en/stable/examples/direct_reply_to.html
Clearly not for production, just a proof of concept, but it seems to work as per https://www.rabbitmq.com/direct-reply-to.html
The rpc.server.queue
is my request queue.
server.py
import asyncio
from aio_pika import connect_robust
from aio_pika.message import IncomingMessage, Message
channel = None
async def on_message(message: IncomingMessage):
print(f"Server received: {str(message.body)} ==> {message}")
if not channel:
return
await channel.default_exchange.publish(
Message("Polo".encode()),
routing_key=message.reply_to,
)
async def main():
global channel
connection = await connect_robust("amqp://guest:guest@localhost")
channel = await connection.channel()
queue = await channel.declare_queue(
"rpc.server.queue", exclusive=True, auto_delete=True
)
await queue.consume(on_message)
return connection
if __name__ == "__main__":
loop = asyncio.get_event_loop()
connection = loop.run_until_complete(main())
try:
loop.run_forever()
finally:
loop.run_until_complete(connection.close())
loop.shutdown_asyncgens()
client.py
import asyncio
from aio_pika import connect_robust
from aio_pika.message import IncomingMessage, Message
async def on_message(message: IncomingMessage):
print(f"Client received: {str(message.body)}")
async def main():
connection = await connect_robust("amqp://guest:guest@localhost")
async with connection:
channel = await connection.channel()
queue = await channel.declare_queue("amq.rabbitmq.reply-to")
await queue.consume(
on_message,
no_ack=True,
)
await channel.default_exchange.publish(
Message("Marco".encode(), reply_to="amq.rabbitmq.reply-to"),
routing_key="rpc.server.queue",
)
await asyncio.sleep(1)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())