faust
faust copied to clipboard
Writing data to faust table is causing FutureMessage exception
Checklist
- [x] I have included information about relevant versions
- [ ] I have verified that the issue persists when using the
master
branch of Faust.
Steps to reproduce
I have an application that contains 3 topic agents. The topics have different configuration (partition).
All 3 of the agents use a shared table (not global table) that has been instantiated using the use_partitioner=True
. The table can be modified by all 3 agents.
The app is using uvloop.
Expected behavior
As each agent runs in a round robin order, there shouldn't be any issue while writing to the table
Actual behavior
One of the agent is raising this error:
Exception in callback functools.partial(<bound method Topic._on_published of <Topic: app-test_topic-changelog@0x7fe6dc7230a0>>, message=<FutureMessage finished exception=UnknownError()>, state={<Monitor: running >: 22569.982169473}, producer=<Producer: running >)
handle: <Handle functools.partial(<bound method Topic._on_published of <Topic: app-test_topic-changelog@0x7fe6dc7230a0>>, message=<FutureMessage finished exception=UnknownError()>, state={<Monitor: running >: 22569.982169473}, producer=<Producer: running >)>
Traceback (most recent call last):
File "/app/.venv/lib/python3.8/site-packages/faust/topics.py", line 463, in _on_published
res: RecordMetadata = fut.result()
kafka.errors.UnknownError: [Error -1] UnknownError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
File "/app/.venv/lib/python3.8/site-packages/faust/topics.py", line 467, in _on_published
f"_on_published error for message topic "
AttributeError: 'FutureMessage' object has no attribute 'channel'
and this one
FutureMessage exception was never retrieved
future: <FutureMessage finished exception=UnknownError()>
Traceback (most recent call last):
File "/app/.venv/lib/python3.8/site-packages/faust/topics.py", line 463, in _on_published
res: RecordMetadata = fut.result()
kafka.errors.UnknownError: [Error -1] UnknownError
Versions
- Python version 3.8
- Faust version 0.6.6
- Operating system docker on MAC
- Kafka version 2.4.0
- RocksDB version (if applicable)