faust icon indicating copy to clipboard operation
faust copied to clipboard

Question: Is it possible make an agent yield keyed messages while using sink topic?

Open navndn opened this issue 4 years ago • 5 comments

As the title says, I want to do something like below.. But currently the sink_topic in kafka will have null as key and whatever that is yielded is treated as value.


sink_topic = app.topic('sink_topic')

@app.agent(source_topic, sink=[sink_topic])
async def myagent(stream):
  async for key, value in stream.items():
    new_key, new_value = process_event(key, value) 
    yield (new_key, new_value)

navndn avatar Sep 09 '19 06:09 navndn

Not AFAIK, but you can do this:


@app.agent(source_topic)
async def myagent(stream):
  async for key, value in stream.items():
    new_key, new_value = process_event(key, value) 
    await sink_topic.send(key=new_key, value=new_value)

ask avatar Sep 27 '19 21:09 ask

I presume you need to supply a sink=[] keyword attribute to @app.agent() as well for that to work. The sink attribute is notably a collection, even when only one sink is listed. In the event that two or more channel-like objects are identified, does sink_topic multicast across them all?

jheinnic avatar Oct 28 '19 19:10 jheinnic

@jheinnic - no, that will work without sink=[], in this case sink_topic is created outside the funciton app.topic() or app.channel()

See for example https://github.com/marcosschroh/faust-docker-compose-example/blob/master/faust-project/example/users/agents.py#L23

forsberg avatar Aug 24 '20 13:08 forsberg

Noteworthy is that app.test_context() won't work, it expects the code to yield, but there's probably some workaround for that.

forsberg avatar Aug 24 '20 13:08 forsberg

It looks super-important to me. This example suggest we can just forget keys on producer sides and only re-key on consumer via GroupBy. Is it having drawbacks?

jairov4 avatar May 23 '22 14:05 jairov4