faust
faust copied to clipboard
Question: Is it possible make an agent yield keyed messages while using sink topic?
As the title says, I want to do something like below.. But currently the sink_topic in kafka will have null
as key
and whatever that is yielded is treated as value
.
sink_topic = app.topic('sink_topic')
@app.agent(source_topic, sink=[sink_topic])
async def myagent(stream):
async for key, value in stream.items():
new_key, new_value = process_event(key, value)
yield (new_key, new_value)
Not AFAIK, but you can do this:
@app.agent(source_topic)
async def myagent(stream):
async for key, value in stream.items():
new_key, new_value = process_event(key, value)
await sink_topic.send(key=new_key, value=new_value)
I presume you need to supply a sink=[]
keyword attribute to @app.agent()
as well for that to work. The sink attribute is notably a collection, even when only one sink is listed. In the event that two or more channel-like objects are identified, does sink_topic
multicast across them all?
@jheinnic - no, that will work without sink=[]
, in this case sink_topic is created outside the funciton app.topic()
or app.channel()
See for example https://github.com/marcosschroh/faust-docker-compose-example/blob/master/faust-project/example/users/agents.py#L23
Noteworthy is that app.test_context()
won't work, it expects the code to yield
, but there's probably some workaround for that.
It looks super-important to me. This example suggest we can just forget keys on producer sides and only re-key on consumer via GroupBy. Is it having drawbacks?