faust
faust copied to clipboard
Wrong offset commit in case of gaps
Checklist
- [x] I have included information about relevant versions
- [x] I have verified that the issue persists when using the
masterbranch of Faust.
Context
In https://github.com/faust-streaming/faust/issues/312 (0.8.5) problems with gaps were fixed. But in https://github.com/faust-streaming/faust/issues/378 (0.9.0) normal behaviour with gaps (excluding compact topics) was partially broken.
Steps to reproduce
@app.agent(topic, concurrency=2)
async def test(stream: "StreamT[somestream]") -> None:
async for event in stream.events():
message = event.value
message_no = message.no
message_offset = event.message.offset
app.logger.info(f"Got message {message_no} offset={message_offset}")
if message_no == 1:
sleep_secs = 100
else:
sleep_secs = 1
app.logger.info(f"sleep {sleep_secs} for message {message_no=} {message_offset=}")
await asyncio.sleep(sleep_secs)
app.logger.info(f"End of processing {message_no=} {message_offset=}")
- print returning value in faust.transport.consumer.Consumer._filter_committable_offsets
print(f'faust.transport.consumer.Consumer._filter_committable_offsets: COMMIT OFFSETS: {commit_offsets}')
return commit_offsets
or print all offsets in faust.transport.consumer.Consumer._commit_offsets
print(f'faust.transport.consumer.Consumer._commit_offsets: COMMITTING OFFSETS = {table}')
- send messages to topic
- SIGKILL faust worker, and start it again, to ensure of what was committed and what first message for consume will be
Expected behavior
Faust commits offsets correctly, without loosing messages in case of gaps
Actual behavior
Faust commits wrong offsets, and looses messages in case of gaps (one and first message in gap, maybe this behavior can be reproduced when gap consists of first message after commit)
Full traceback 0.10.22
2024-02-08 10:47:42,795 faust.app.base INFO Got message 1 offset=2585
2024-02-08 10:47:42,795 faust.app.base INFO sleep 100 for message message_no=1 message_offset=2585
2024-02-08 10:47:42,796 faust.app.base INFO Got message 2 offset=2586
2024-02-08 10:47:42,796 faust.app.base INFO sleep 1 for message message_no=2 message_offset=2586
2024-02-08 10:47:43,647 mode.redirect WARNING faust.transport.consumer.Consumer._filter_committable_offsets: COMMIT OFFSETS ={}
2024-02-08 10:47:43,797 faust.app.base INFO End of processing message_no=2 message_offset=2586
2024-02-08 10:47:43,797 faust.app.base INFO Got message 3 offset=2587
2024-02-08 10:47:43,797 faust.app.base INFO sleep 1 for message message_no=3 message_offset=2587
2024-02-08 10:47:44,799 faust.app.base INFO End of processing message_no=3 message_offset=2587
2024-02-08 10:47:44,799 faust.app.base INFO Got message 4 offset=2588
2024-02-08 10:47:44,799 faust.app.base INFO sleep 1 for message message_no=4 message_offset=2588
2024-02-08 10:47:45,799 faust.app.base INFO End of processing message_no=4 message_offset=2588
2024-02-08 10:47:45,800 faust.app.base INFO Got message 5 offset=2589
2024-02-08 10:47:45,800 faust.app.base INFO sleep 1 for message message_no=5 message_offset=2589
2024-02-08 10:47:46,444 mode.redirect WARNING faust.transport.consumer.Consumer._filter_committable_offsets: COMMIT OFFSETS ={TopicPartition(topic='topic', partition=0): 2589}
2024-02-08 10:47:46,444 mode.redirect WARNING faust.transport.consumer.Consumer._commit_offsets: COMMITTING OFFSETS =┌Commit Offsets────────────────────────────────────────────────────┬────────┐
│ TP │ Offset │
├──────────────────────────────────────────────────────────────────┼────────┤
│ TopicPartition(topic='topic', partition=0) │ 2589 │
└──────────────────────────────────────────────────────────────────┴────────┘
2024-02-08 10:47:46,801 faust.app.base INFO End of processing message_no=5 message_offset=2589
2024-02-08 10:47:46,801 faust.app.base INFO Got message 6 offset=2590
2024-02-08 10:47:46,801 faust.app.base INFO sleep 1 for message message_no=6 message_offset=2590
2024-02-08 10:47:47,802 faust.app.base INFO End of processing message_no=6 message_offset=2590
2024-02-08 10:47:47,803 faust.app.base INFO Got message 7 offset=2591
2024-02-08 10:47:47,803 faust.app.base INFO sleep 1 for message message_no=7 message_offset=2591
zsh: killed
message 1 offset=2585 was not precessed
restart:
2024-02-08 10:49:31,463 faust.tables.recovery INFO [^---Recovery]: Worker ready
2024-02-08 10:49:31,463 faust.worker INFO [^Worker]: Ready
2024-02-08 10:49:31,639 aiokafka.conn INFO Authenticated as user via SCRAM-SHA-512
2024-02-08 10:49:31,840 aiokafka.conn INFO Authenticated as user via SCRAM-SHA-512
2024-02-08 10:49:32,064 faust.app.base INFO Got message 5 offset=2589
2024-02-08 10:49:32,064 faust.app.base INFO sleep 1 for message message_no=5 message_offset=2589
2024-02-08 10:49:32,065 faust.app.base INFO Got message 6 offset=2590
2024-02-08 10:49:32,065 faust.app.base INFO sleep 1 for message message_no=6 message_offset=2590
logs for 0.8.5
2024-02-08 10:58:38,755 faust.app.base INFO Got message 1 offset=2684
2024-02-08 10:58:38,755 faust.app.base INFO sleep 100 for message message_no=1 message_offset=2684
2024-02-08 10:58:38,756 faust.app.base INFO Got message 2 offset=2685
2024-02-08 10:58:38,756 faust.app.base INFO sleep 1 for message message_no=2 message_offset=2685
2024-02-08 10:58:39,051 mode.redirect WARNING faust.transport.consumer.Consumer._filter_committable_offsets: COMMIT OFFSETS: {}
2024-02-08 10:58:39,757 faust.app.base INFO End of processing message_no=2 message_offset=2685
2024-02-08 10:58:39,758 faust.app.base INFO Got message 3 offset=2686
2024-02-08 10:58:39,758 faust.app.base INFO sleep 1 for message message_no=3 message_offset=2686
2024-02-08 10:58:40,759 faust.app.base INFO End of processing message_no=3 message_offset=2686
2024-02-08 10:58:40,759 faust.app.base INFO Got message 4 offset=2687
2024-02-08 10:58:40,759 faust.app.base INFO sleep 1 for message message_no=4 message_offset=2687
2024-02-08 10:58:41,760 faust.app.base INFO End of processing message_no=4 message_offset=2687
2024-02-08 10:58:41,761 faust.app.base INFO Got message 5 offset=2688
2024-02-08 10:58:41,761 faust.app.base INFO sleep 1 for message message_no=5 message_offset=2688
2024-02-08 10:58:41,852 mode.redirect WARNING faust.transport.consumer.Consumer._filter_committable_offsets: COMMIT OFFSETS: {}
2024-02-08 10:58:42,762 faust.app.base INFO End of processing message_no=5 message_offset=2688
2024-02-08 10:58:42,764 faust.app.base INFO Got message 6 offset=2689
2024-02-08 10:58:42,764 faust.app.base INFO sleep 1 for message message_no=6 message_offset=2689
2024-02-08 10:58:43,765 faust.app.base INFO End of processing message_no=6 message_offset=2689
2024-02-08 10:58:43,765 faust.app.base INFO Got message 7 offset=2690
2024-02-08 10:58:43,765 faust.app.base INFO sleep 1 for message message_no=7 message_offset=2690
2024-02-08 10:58:44,654 mode.redirect WARNING faust.transport.consumer.Consumer._filter_committable_offsets: COMMIT OFFSETS: {}
2024-02-08 10:58:44,767 faust.app.base INFO End of processing message_no=7 message_offset=2690
2024-02-08 10:58:44,768 faust.app.base INFO Got message 8 offset=2691
2024-02-08 10:58:44,768 faust.app.base INFO sleep 1 for message message_no=8 message_offset=2691
2024-02-08 10:58:45,770 faust.app.base INFO End of processing message_no=8 message_offset=2691
2024-02-08 10:58:45,772 faust.app.base INFO Got message 9 offset=2692
2024-02-08 10:58:45,773 faust.app.base INFO sleep 1 for message message_no=9 message_offset=2692
2024-02-08 10:58:46,774 faust.app.base INFO End of processing message_no=9 message_offset=2692
2024-02-08 10:58:46,775 faust.app.base INFO Got message 10 offset=2693
2024-02-08 10:58:46,776 faust.app.base INFO sleep 1 for message message_no=10 message_offset=2693
2024-02-08 10:58:47,457 mode.redirect WARNING faust.transport.consumer.Consumer._filter_committable_offsets: COMMIT OFFSETS: {}
2024-02-08 10:58:47,777 faust.app.base INFO End of processing message_no=10 message_offset=2693
2024-02-08 10:58:47,779 faust.app.base INFO Got message 11 offset=2694
2024-02-08 10:58:47,780 faust.app.base INFO sleep 1 for message message_no=11 message_offset=2694
2024-02-08 10:58:48,780 faust.app.base INFO End of processing message_no=11 message_offset=2694
2024-02-08 10:58:48,781 faust.app.base INFO Got message 12 offset=2695
2024-02-08 10:58:48,781 faust.app.base INFO sleep 1 for message message_no=12 message_offset=2695
zsh: killed
restart:
2024-02-08 10:59:50,767 faust.app.base INFO Got message 1 offset=2684
2024-02-08 10:59:50,767 faust.app.base INFO sleep 100 for message message_no=1 message_offset=2684
2024-02-08 10:59:50,768 faust.app.base INFO Got message 2 offset=2685
2024-02-08 10:59:50,768 faust.app.base INFO sleep 1 for message message_no=2 message_offset=2685
2024-02-08 10:59:51,769 faust.app.base INFO End of processing message_no=2 message_offset=2685
2024-02-08 10:59:51,769 faust.app.base INFO Got message 3 offset=2686
2024-02-08 10:59:51,769 faust.app.base INFO sleep 1 for message message_no=3 message_offset=2686
2024-02-08 10:59:52,771 faust.app.base INFO End of processing message_no=3 message_offset=2686
2024-02-08 10:59:52,771 faust.app.base INFO Got message 4 offset=2687
2024-02-08 10:59:52,771 faust.app.base INFO sleep 1 for message message_no=4 message_offset=2687
2024-02-08 10:59:53,359 mode.redirect WARNING faust.transport.consumer.Consumer._filter_committable_offsets: COMMIT OFFSETS: {}
Versions
- Python version 3.9
- Faust version 0.10.22