aio-pika
aio-pika copied to clipboard
Should I keep aio_pika connection always open?
In some examples the consumer main
function ends with await asyncio.Future()
so the process does not finish. What is the best practice when you have multiple consumers spread across multiple modules? My idea is to have dataclass
with the necessary data for creating each consumer and have a single process for all workers. Something like this:
#consumer_registry.py
from attrs import define
consumers = []
@define
class Consumer:
set_oqs: int
queue: dict
handler: Callable
def __attrs_post_init__(self):
consumers.append(self)
#some_consumer.py
first_consumer = Consumer(...)
second_consumer = Consumer(...)
#main.py
import anyio
import aio_pika
from consumer_registry import consumers
import some_consumer
def main():
connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
async with connection:
for c in consumers:
channel = await connection.channel()
await channel.set_qos(prefetch_count=c.set_oqs)
queue = await channel.declare_queue(**c.queue)
await queue.consume(c.handler, no_ack=True)
await anyio.sleep_forever()
if __name__ == "__main__":
anyio.run(main())
My idea is to have this running as a pod on k8s and have a periodic health check, but otherwise leave the process running like above. Is it fine doing that way?
The AMQP protocol is designed in such a way that the connection should be open for as long as possible. However, if your task is to periodically process accumulated messages, then the connection should be open only for the duration of message processing.
Tks @mosquito. That is what I understood. And that is why I thought I was missing something. Like having a main app that would run indefinitely and that I could add consumers to it, maybe using an decorator. Like we usually have for routes in web frameworks. Sorry if I am saying something silly (not much experience on my part).