faststream
faststream copied to clipboard
Feature: Publisher support for async iterator
I have a yielding generator that produces msgs into a topic.
I wish the following syntax was valid:
import asyncio
import random
from contextlib import asynccontextmanager
from typing import AsyncIterator
from faststream import ContextRepo, FastStream
from faststream.kafka import KafkaBroker
broker = KafkaBroker("localhost:9092")
@broker.publisher("number_of_the_day")
async def num() -> AsyncIterator[int]:
while True:
yield random.randint(0, 10)
await asyncio.sleep(86400)
@asynccontextmanager
async def lifespan(context: ContextRepo):
asyncio.create_task(num())
yield
async def main():
await FastStream(broker, lifespan=lifespan).run()
if __name__ == '__main__':
asyncio.run(main())
Current workaround
import functools
def pub(publisher):
def decorator(func):
@functools.wraps(func)
async def inner(*args, **kwargs):
async for x in func(*args, **kwargs):
await publisher.publish(x)
return inner
return decorator
# [...]
@pub(broker.publisher("number_of_the_day"))
async def num() -> AsyncIterator[int]:
while True:
yield random.randint(0, 10)
await asyncio.sleep(86400)
# [...]
Seems like a good idea, but you can't use @broker.publisher(...)
without @broker.subscriber(...)
. So, the valid case should looks like
@broker.subscriber("in")
@broker.publisher("out")
async def handler(...):
yield from iterbale_interactor(...)
It should publishes N messages at 1 consumer call
I would in fact also like publisher
to be usable standalone.
In my example I am scheduling it in the background using asyncio.create_task
as part of lifespan
with the help of my "workaround" pub
decorator.
You can use publisher.publish
interface alone
I don't like an idea to create publish side-effect on original function call, so all interfaces can be used only in explicit way: @broker.publisher(...)
only with @broker.subscriber(...)
, publisher.publish(...)
- in other cases