stan.py
stan.py copied to clipboard
Use async for
Hi, did you consider using something like async for?
Instead of
async def cb(msg):
print("Received a message (seq={}): {}".format(msg.seq, msg.data))
# Receive messages starting at a specific sequence number
await sc.subscribe("foo", start_at="sequence", sequence=3, cb=cb)
something like this
messages = await sc.subscribe("foo", start_at="sequence", sequence=3)
async for msg in messages:
print("Received a message (seq={}): {}".format(msg.seq, msg.data))
Haven't looked into it, but think that would be a nice addition to both the underlying asyncio-nats client and this one.
Also something similar to aioredis without using async generator would be fine:
async def reader(ch):
while (await ch.wait_message()):
msg = await ch.get_json()
print("Got Message:", msg)
Mostly because with the current asyncio-nats-streaming is a little bit tricky to achieve the same
async def scheduler():
"""Launch the task manager."""
log.info("Loading subscribers....")
for queue_name, funcs in _subscribers.items():
# add a callback when you receive a message
await streaming.subscribe(queue_name, on_message)
# really, really ugly
while True:
await asyncio.sleep(1)