faust icon indicating copy to clipboard operation
faust copied to clipboard

Yields Not Captured By Test Agent when has Sink

Open Robbie-Palmer opened this issue 3 years ago • 0 comments

Checklist

  • [X] I have included information about relevant versions
  • [ ] I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Create an agent that receives a message, iterates over it and yields its values Yields to a sink topic Use the test_context of this agent to put example data on it for testing

from asyncio import get_event_loop

from faust import App

app = App('faust-testing', broker='kafka:URL')
app.conf.broker_client_id = app.conf.id

in_topic = app.topic('in-data', internal=True, value_serializer='raw')
out_topic = app.topic('out-data', internal=True, value_serializer='raw')


@app.agent(in_topic, sink=[out_topic])
async def processing_agent(stream):
    async for message_bytes in stream:
        for byte in message_bytes:
            yield byte


async def test_agent():
    message = '12345'
    async with processing_agent.test_context() as agent:
        await agent.put(message)
    for actual, expected in zip(agent.results.values(), message):
        assert actual == expected
    assert len(agent.results.values()) == len(message), f'{len(agent.results.values())} != {len(message)}'


if __name__ == '__main__':
    event_loop = get_event_loop()
    app.loop = event_loop
    event_loop.run_until_complete(test_agent())

Expected behavior

Expect all elements in the iterable message to be stored in the test agent results aka be:

{
    "0": "1",
    "1": "2",
    "2": "3",
    "3": "4",
    "4": "5"
}

Actual behavior

Only the first two messages are stored in the test app aka:

{
    "0": "1",
    "1": "2"
}

If the sink parameter is removed from the agent, then it behaves as expected

@app.agent(in_topic)
async def processing_agent(stream):
    async for message_bytes in stream:
        for byte in message_bytes:
            yield byte

Versions

  • Python version: 3.9.12
  • Faust version: 0.8.4
  • Operating system: macOS Monterey v12.3.1
  • Kafka version: N/A
  • RocksDB version (if applicable): N/A

Robbie-Palmer avatar May 16 '22 17:05 Robbie-Palmer