faust
faust copied to clipboard
processing_guarantee="exactly_once" crashes at startup
Steps to reproduce
Create a simple app with processing_guarantee="exactly_once"
import faust
from faust.types import ProcessingGuarantee
class Content(faust.Record, include_metadata=False):
content_id: str
content: str
app = faust.App(
"faust-poc",
broker="aiokafka://kafka:9092",
store="memory://",
version=1,
topic_partitions=6,
topic_replication_factor=1,
producer_compression_type="gzip",
topic_disable_leader=True,
web_enabled=False,
table_standby_replicas=0,
processing_guarantee=ProcessingGuarantee.EXACTLY_ONCE
)
contents_topic = app.topic("contents", key_type=str, value_type=Content, value_serializer="json")
@app.agent(contents_topic)
async def process_contents(contents: faust.Stream[Content]):
async for content in contents:
print(content)
Expected behavior
The app should run
Actual behavior
It crashes with a "KeyError". A quick inverstigation shows that we try to lookup a group for a topic on the "PartitionAssignor" where it seems to never be set (no call to on_assignment
). The only on_assignment
I found is on the RoundRobinPartitionAssignor associated with the group coordinator
Full traceback
faust-poc-app-1 | [2023-01-30 09:20:32,410] [1] [ERROR] [^-App]: Crashed reason=KeyError('contents')
faust-poc-app-1 | Traceback (most recent call last):
faust-poc-app-1 | File "/opt/app/.cache/pypoetry/virtualenvs/faust-poc-tq7C0_9c-py3.10/lib/python3.10/site-packages/faust/app/base.py", line 1766, in _on_partitions_assigned
faust-poc-app-1 | await T(consumer.transactions.on_rebalance)(
faust-poc-app-1 | File "/opt/app/.cache/pypoetry/virtualenvs/faust-poc-tq7C0_9c-py3.10/lib/python3.10/site-packages/faust/utils/tracing.py", line 133, in corowrapped
faust-poc-app-1 | await_ret = await ret
faust-poc-app-1 | File "/opt/app/.cache/pypoetry/virtualenvs/faust-poc-tq7C0_9c-py3.10/lib/python3.10/site-packages/faust/transport/consumer.py", line 235, in on_rebalance
faust-poc-app-1 | assigned_tids = sorted(self._tps_to_transactional_ids(assigned))
faust-poc-app-1 | File "/opt/app/.cache/pypoetry/virtualenvs/faust-poc-tq7C0_9c-py3.10/lib/python3.10/site-packages/faust/transport/consumer.py", line 264, in _tps_to_transactional_ids
faust-poc-app-1 | for tpg in self._tps_to_active_tpgs(tps)
faust-poc-app-1 | File "/opt/app/.cache/pypoetry/virtualenvs/faust-poc-tq7C0_9c-py3.10/lib/python3.10/site-packages/faust/transport/consumer.py", line 269, in _tps_to_active_tpgs
faust-poc-app-1 | return {
faust-poc-app-1 | File "/opt/app/.cache/pypoetry/virtualenvs/faust-poc-tq7C0_9c-py3.10/lib/python3.10/site-packages/faust/transport/consumer.py", line 273, in <setcomp>
faust-poc-app-1 | assignor.group_for_topic(tp.topic),
faust-poc-app-1 | File "/opt/app/.cache/pypoetry/virtualenvs/faust-poc-tq7C0_9c-py3.10/lib/python3.10/site-packages/faust/assignor/partition_assignor.py", line 86, in group_for_topic
faust-poc-app-1 | return self._topic_groups[topic]
faust-poc-app-1 | KeyError: 'contents'
Versions
- Python version 3.10
- Faust version 0.10.3
- Operating system Ubuntu LTS
- Kafka version 2.8.1
- RocksDB version (if applicable)