Warning - producer buffer full
Steps to reproduce
We have several streaming agents, some update tables, some send to other topics. In a steady state this seems to work fine, however when ingesting peaking data (e.g. when resetting consumer offsets to 2, ingesting large amounts of data) we get a bunch of warnings (functionality seems to be fine).
Expected behavior
We don't get any warnings
Actual behavior
We get a bunch of warnings about producer buffer size.
Attempted fixes
We've managed to resolve this by: Increasing max_messages:
app.producer.buffer.max_messages = 10000
Adding await asyncio.sleep(0.1)
@app.agent(value_topic)
async def handle_new_value(stream):
async for value in stream:
await asyncio.sleep(0.1)
Full traceback
[2021-07-15 16:35:08,344] [283726] [WARNING] producer buffer full size 102
[2021-07-15 16:35:22,189] [283726] [INFO] producer flush took 13.844183206558228
Versions
- Python version: Python 3.8.10
- Faust version: '0.6.1'
- Operating system: Ubuntu / macOS
- Kafka (library) version: 2.0.2
Observing the same issue as you, if you're using Table for state, there is a similar discussion in this slack thread.
we use the threaded producer for the changelogs that gives better performance
which I believe refers to this
https://github.com/faust-streaming/faust/blob/d9533ca36afc2d0731095078438f748bc739d943/faust/types/settings/settings.py#L1330-L1342
You might want to give that a shot. I cannot help much further, trying to setup first faust pipeline myself.
I have the same issue. I think its because of table updates where the producer tries to update change-log topic per message and its buffer gets full. Also I couldn't find a way for batching messages and updating the table once per batch. Take a look at this issue.
@deadpassive I found the solution and answer it here.