faust
faust copied to clipboard
self._committed_offset.update(committable_offsets) if did_commit==False
Checklist
- [x] I have included information about relevant versions
- [ ] I have verified that the issue persists when using the
masterbranch of Faust.
Used 2 pods for consumption. We have a load just at 9 am for 1 hour every day. enable_auto_commit=False (by faust-streaming framework)
Heartbeat called _maybe_leave_group()
# If consumer is idle (no records consumed) for too long we need
# to leave the group
idle_time = self._subscription.fetcher_idle_time
if idle_time < self._max_poll_interval:
sleep_time = min(
sleep_time,
self._max_poll_interval - idle_time)
else:
await self._maybe_leave_group()
Logs from pod1 (pod1 continued to process messages):
{"name": "aiokafka.consumer.group_coordinator", "levelno": 20, "pathname": "/app/.local/lib/python3.9/site-packages/aiokafka/consumer/group_coordinator.py", "filename": "group_coordinator.py", "exc_info": null, "exc_text": null, "lineno": 359, "message": "LeaveGroup request succeeded", "@timestamp": "2022-06-07T15:35:02.033104", "level": "INFO", "@version": "1"}
{"name": "aiokafka.consumer.group_coordinator", "levelno": 20, "pathname": "/app/.local/lib/python3.9/site-packages/aiokafka/consumer/group_coordinator.py", "filename": "group_coordinator.py", "exc_info": null, "exc_text": null, "lineno": 384, "message": "Revoking previously assigned partitions frozenset({TopicPartition(topic='topic1', partition=0), TopicPartition(topic='topic2', partition=0), ....}) for group group-id-07-06-22", "@timestamp": "2022-06-07T15:35:02.033347", "level": "INFO", "@version": "1"}
Logs from pod2 (pod2 locked up):
{"name": "aiokafka.consumer.group_coordinator", "levelno": 20, "pathname": "/app/.local/lib/python3.9/site-packages/aiokafka/consumer/group_coordinator.py", "filename": "group_coordinator.py", "exc_info": null, "exc_text": null, "lineno": 359, "message": "LeaveGroup request succeeded", "@timestamp": "2022-06-07T15:35:02.033104", "level": "INFO", "@version": "1"}
{"name": "aiokafka.consumer.group_coordinator", "levelno": 20, "pathname": "/app/.local/lib/python3.9/site-packages/aiokafka/consumer/group_coordinator.py", "filename": "group_coordinator.py", "exc_info": null, "exc_text": null, "lineno": 384, "message": "Revoking previously assigned partitions frozenset({TopicPartition(topic='topic1', partition=0), TopicPartition(topic='topic2', partition=0), ....}) for group group-id-07-06-22", "@timestamp": "2022-06-07T15:35:02.033347", "level": "INFO", "@version": "1"}
{"name": "aiokafka.consumer.group_coordinator", "levelno": 40, "pathname": "/app/.local/lib/python3.9/site-packages/aiokafka/consumer/group_coordinator.py", "filename": "group_coordinator.py", "exc_info": null, "exc_text": null, "lineno": 1043, "message": "OffsetCommit failed for group group-id-07-06-22 due to group error ([Error 25] UnknownMemberIdError: group-id-07-06-22), will rejoin", "@timestamp": "2022-06-07T15:35:02.150061", "level": "ERROR", "@version": "1"}
{"name": "aiokafka.consumer.group_coordinator", "levelno": 40, "pathname": "/app/.local/lib/python3.9/site-packages/aiokafka/consumer/group_coordinator.py", "filename": "group_coordinator.py", "exc_info": null, "exc_text": null, "lineno": 1052, "message": "OffsetCommit failed for group group-id-07-06-22 due to group error ([Error 25] UnknownMemberIdError: group-id-07-06-22), will rejoin", "@timestamp": "2022-06-07T15:35:02.150212", "level": "ERROR", "@version": "1"}
Logs from 07/06/202
After research, I see it when did_commit is False by UnknownMemberIdError. Looks like need to add tab before these lines
Versions
- Python version 3.9.12
- Faust version 0.8.4
If you have a solution for this, could you please file a PR?