Cannot recover when any changelog topic partition becomes empty (as a result of some retention policy)
Checklist
- [X] I have included information about relevant versions
- [X] I have verified that the issue persists when using the
masterbranch of Faust.
Steps to reproduce
- if a topic partition is not empty: low watermark = minimum/earliest available offset, high watermark = maximum/latest available offset +1
- if a topic partition is empty low watermark = high watermark
A changelog topic can become empty as a result of a Kafka cleanup policy (i.e., time/sized-based retention) The case when the topic is empty is not handled properly in Faust recovery.
The recovery service needs to replay messages between low watermark (earliest offset) to high watermark - 1 (latest offset). Faust does this for the active and the standby partitions. Afterwards, it runs some consistency checks.
Active partitions
Let's start with the active partitions:
- Building highwaters for active partitions https://github.com/faust-streaming/faust/blob/6588a9774babd6aee61f2ce69572b167850717b4/faust/tables/recovery.py#L387-L393
- https://github.com/faust-streaming/faust/blob/6588a9774babd6aee61f2ce69572b167850717b4/faust/tables/recovery.py#L655-L664
If the partition is empty
high - 1does not exist, and the recovery will fail. There is even aFIXMEin building the highwaters. In my opinion, it would be better to also get the low watermarks and-1 if high is None or low == high else high - 1
- https://github.com/faust-streaming/faust/blob/6588a9774babd6aee61f2ce69572b167850717b4/faust/tables/recovery.py#L655-L664
If the partition is empty
- Building earliest offsets for active partitions https://github.com/faust-streaming/faust/blob/6588a9774babd6aee61f2ce69572b167850717b4/faust/tables/recovery.py#L396-L401
- Similarly, if the topic partition is empty
low - 1(offsetlowafter the +1 adjustment) would not exist https://github.com/faust-streaming/faust/blob/6588a9774babd6aee61f2ce69572b167850717b4/faust/tables/recovery.py#L703-L713 In my opinion, this could beNone if offset is None else min(offset, highwaters.get(offset, offset))
- Similarly, if the topic partition is empty
Standby partitions
Moreover, recovering standby partitions has a separate issue in the consistency checks. First, let's see what is the sequence of steps for active partitions such that we can draw a parallel.
Active:
- Find latest/max offsets, min/earliest offsets, run consistency checks and seek to offset https://github.com/faust-streaming/faust/blob/6588a9774babd6aee61f2ce69572b167850717b4/faust/tables/recovery.py#L387-L404 https://github.com/faust-streaming/faust/blob/6588a9774babd6aee61f2ce69572b167850717b4/faust/tables/recovery.py#L425-L430
Standby:
- Find min/earliest offsets, seek to offsets, find max/latest offsets, run consistency checks https://github.com/faust-streaming/faust/blob/6588a9774babd6aee61f2ce69572b167850717b4/faust/tables/recovery.py#L417-L423 https://github.com/faust-streaming/faust/blob/6588a9774babd6aee61f2ce69572b167850717b4/faust/tables/recovery.py#L492-L526
The problem is that after seeking the offsets may be updated asynchronously so by the time the consistency checks run they may no longer hold.