A race condition in recovery causes unrecoverable gaps in acked offsets and consequently blocks committing
Checklist
- [X] I have included information about relevant versions
- [X] I have verified that the issue persists when using the
masterbranch of Faust.
Steps to reproduce
Starting from a high-level description of the issue. To illustrate the race condition, we need to look first at some key design aspects:
- Upon restart/rebalance Faust replays messages from the changelog topics (without consuming messages from the input topics)
- Messages are consumed from Kafka asynchronously populating some in-memory queues. Agents will consume from these queues via
async for ... in streamto process the messages. - Once the recovery from changelog topics is completed, Faust listens to the input topics (i.e., non-changelog topics), clearing the contents of all in-memory queues.
- After agents process a message, the message becomes "acked". Faust maintains the offsets of all acked messages and asynchronously commits these acked offsets back to Kafka.
- Faust has a consistency check that requires contiguous acked offsets to commit a new offset. That is, if the committed offset is N, and the acked offsets are N+1, N+2, N+3, Faust can safely commit offset N+3. However, if the committed offset is N, and the acked offsets are N+2, N+3, Faust will wait until N+1 is acked.
With the current setup, some message(s) may never arrive because the queues are cleaned up in flight, thus blocking committing.
- After the recovery is completed, the async flow of messages from non-changelog topics is resumed: https://github.com/faust-streaming/faust/blob/6588a9774babd6aee61f2ce69572b167850717b4/faust/tables/recovery.py#L635-L637 The consumption flow is:
- The
Consumerreads messages asynchronously and puts them in theConductorqueue via the callback https://github.com/faust-streaming/faust/blob/6588a9774babd6aee61f2ce69572b167850717b4/faust/transport/consumer.py#L1201
- The
- Then we seek the offsets - this is a network call that may take some time, time during which the queues may be populated https://github.com/faust-streaming/faust/blob/6588a9774babd6aee61f2ce69572b167850717b4/faust/tables/recovery.py#L640-L642
- Then we signal the resume: https://github.com/faust-streaming/faust/blob/6588a9774babd6aee61f2ce69572b167850717b4/faust/tables/recovery.py#L645
- This clears the in-memory queues, if the queues are not empty that means those messages will never get acked: https://github.com/faust-streaming/mode/blob/c0aa58181432402dca30aa5978179383863a185a/mode/utils/queues.py#L78-L89
Messages never being acked cause the committing task to hang because there is no contiguous sequence of offsets.
- https://github.com/faust-streaming/faust/blob/6588a9774babd6aee61f2ce69572b167850717b4/faust/transport/consumer.py#L976
- https://github.com/faust-streaming/faust/blob/6588a9774babd6aee61f2ce69572b167850717b4/faust/transport/consumer.py#L994
- Computing the new offset to commit fails https://github.com/faust-streaming/faust/blob/6588a9774babd6aee61f2ce69572b167850717b4/faust/transport/consumer.py#L1120-L1121
If there are any messages in flight in the queue, they will be cleaned up before being acked.
I think simply moving https://github.com/faust-streaming/faust/blob/6588a9774babd6aee61f2ce69572b167850717b4/faust/tables/recovery.py#L635-L637 after https://github.com/faust-streaming/faust/blob/6588a9774babd6aee61f2ce69572b167850717b4/faust/tables/recovery.py#L640-L642 will solve the issue.