faust icon indicating copy to clipboard operation
faust copied to clipboard

A race condition in recovery causes unrecoverable gaps in acked offsets and consequently blocks committing

Open cristianmatache opened this issue 1 year ago • 0 comments

Checklist

  • [X] I have included information about relevant versions
  • [X] I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Starting from a high-level description of the issue. To illustrate the race condition, we need to look first at some key design aspects:

  • Upon restart/rebalance Faust replays messages from the changelog topics (without consuming messages from the input topics)
  • Messages are consumed from Kafka asynchronously populating some in-memory queues. Agents will consume from these queues via async for ... in stream to process the messages.
  • Once the recovery from changelog topics is completed, Faust listens to the input topics (i.e., non-changelog topics), clearing the contents of all in-memory queues.
  • After agents process a message, the message becomes "acked". Faust maintains the offsets of all acked messages and asynchronously commits these acked offsets back to Kafka.
    • Faust has a consistency check that requires contiguous acked offsets to commit a new offset. That is, if the committed offset is N, and the acked offsets are N+1, N+2, N+3, Faust can safely commit offset N+3. However, if the committed offset is N, and the acked offsets are N+2, N+3, Faust will wait until N+1 is acked.

With the current setup, some message(s) may never arrive because the queues are cleaned up in flight, thus blocking committing.

  • After the recovery is completed, the async flow of messages from non-changelog topics is resumed: https://github.com/faust-streaming/faust/blob/6588a9774babd6aee61f2ce69572b167850717b4/faust/tables/recovery.py#L635-L637 The consumption flow is:
    • The Consumer reads messages asynchronously and puts them in the Conductor queue via the callback https://github.com/faust-streaming/faust/blob/6588a9774babd6aee61f2ce69572b167850717b4/faust/transport/consumer.py#L1201
  • Then we seek the offsets - this is a network call that may take some time, time during which the queues may be populated https://github.com/faust-streaming/faust/blob/6588a9774babd6aee61f2ce69572b167850717b4/faust/tables/recovery.py#L640-L642
  • Then we signal the resume: https://github.com/faust-streaming/faust/blob/6588a9774babd6aee61f2ce69572b167850717b4/faust/tables/recovery.py#L645
    • This clears the in-memory queues, if the queues are not empty that means those messages will never get acked: https://github.com/faust-streaming/mode/blob/c0aa58181432402dca30aa5978179383863a185a/mode/utils/queues.py#L78-L89

Messages never being acked cause the committing task to hang because there is no contiguous sequence of offsets.

  • https://github.com/faust-streaming/faust/blob/6588a9774babd6aee61f2ce69572b167850717b4/faust/transport/consumer.py#L976
  • https://github.com/faust-streaming/faust/blob/6588a9774babd6aee61f2ce69572b167850717b4/faust/transport/consumer.py#L994
  • Computing the new offset to commit fails https://github.com/faust-streaming/faust/blob/6588a9774babd6aee61f2ce69572b167850717b4/faust/transport/consumer.py#L1120-L1121

If there are any messages in flight in the queue, they will be cleaned up before being acked.

I think simply moving https://github.com/faust-streaming/faust/blob/6588a9774babd6aee61f2ce69572b167850717b4/faust/tables/recovery.py#L635-L637 after https://github.com/faust-streaming/faust/blob/6588a9774babd6aee61f2ce69572b167850717b4/faust/tables/recovery.py#L640-L642 will solve the issue.

cristianmatache avatar Dec 29 '23 16:12 cristianmatache