faust
faust copied to clipboard
Yields Not Captured By Test Agent when has Sink
Checklist
- [X] I have included information about relevant versions
- [ ] I have verified that the issue persists when using the
masterbranch of Faust.
Steps to reproduce
Create an agent that receives a message, iterates over it and yields its values
Yields to a sink topic
Use the test_context of this agent to put example data on it for testing
from asyncio import get_event_loop
from faust import App
app = App('faust-testing', broker='kafka:URL')
app.conf.broker_client_id = app.conf.id
in_topic = app.topic('in-data', internal=True, value_serializer='raw')
out_topic = app.topic('out-data', internal=True, value_serializer='raw')
@app.agent(in_topic, sink=[out_topic])
async def processing_agent(stream):
async for message_bytes in stream:
for byte in message_bytes:
yield byte
async def test_agent():
message = '12345'
async with processing_agent.test_context() as agent:
await agent.put(message)
for actual, expected in zip(agent.results.values(), message):
assert actual == expected
assert len(agent.results.values()) == len(message), f'{len(agent.results.values())} != {len(message)}'
if __name__ == '__main__':
event_loop = get_event_loop()
app.loop = event_loop
event_loop.run_until_complete(test_agent())
Expected behavior
Expect all elements in the iterable message to be stored in the test agent results aka be:
{
"0": "1",
"1": "2",
"2": "3",
"3": "4",
"4": "5"
}
Actual behavior
Only the first two messages are stored in the test app aka:
{
"0": "1",
"1": "2"
}
If the sink parameter is removed from the agent, then it behaves as expected
@app.agent(in_topic)
async def processing_agent(stream):
async for message_bytes in stream:
for byte in message_bytes:
yield byte
Versions
- Python version: 3.9.12
- Faust version: 0.8.4
- Operating system: macOS Monterey v12.3.1
- Kafka version: N/A
- RocksDB version (if applicable): N/A