faust icon indicating copy to clipboard operation
faust copied to clipboard

Faust with auto.offset.reset=latest gets stuck in Recovery when Kafka deletes changelog segments during recovery

Open cristianmatache opened this issue 1 year ago • 1 comments

Checklist

  • [X] I have included information about relevant versions - all versions
  • [X] I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

The problem is that faust can get stuck in recovery forever when auto.offset.reset is "latest" (https://faust-streaming.github.io/faust/userguide/settings.html#consumer-auto-offset-reset). In recovery, the fetcher keeps fetching records from the changelog topics to rebuild its in-memory state. Say these changelog topic partitions are so large that recovery will take some time. During this recovery time, Kafka may independently remove some topic partition segments in line with the retention policy of the changelog topic. All this while the faust client is still fetching records from the changelog topic partitions. For example:

time t - changelog segments:   earliest |----------------------------|----------------------| latest offset
time t+1 - recovery progress:  earliest |............R
time t+2 - changelog segments:                          new earliest |----------------------| latest offset
time t+3 - recovery has offset R, but offset R+1 no longer exists in the changelog topic

Therefore at time t+3 since offset R no longer exists in the changelog topic the fetcher will jump to an offset as configured by auto.offset.reset. This is the normal Kafka consumer behavior when the offset of a consumer group is no longer in the topic. See: https://github.com/aio-libs/aiokafka/blob/49a557334584b5a09e96e78609b111777e14b2fb/aiokafka/consumer/fetcher.py#L794-L796

If auto.offset.reset=latest the fetcher jumps to latest offset + 1, instead of the new earliest. Therefore, the fetcher will wait for one more record to be published to the changelog topic. But we're still in recovery mode, so, by construction, nothing is publishing to the changelog. Recovery cannot finish and faust is stuck in recovery forever. See: https://github.com/faust-streaming/faust/blob/ff75c0be3d784d28b9f69ba3ea94be769b151b89/faust/tables/recovery.py#L888-L890

Remember that, faust applies the same auto.offset.reset to all input topics (including changelogs), and does not make any distinction while fetching. https://github.com/faust-streaming/faust/blob/ff75c0be3d784d28b9f69ba3ea94be769b151b89/faust/transport/drivers/aiokafka.py#L1010-L1030

This behavior is difficult to reproduce. If we set auto.offset.reset to latest. One would need to modify the retention policy of a changelog topic during recovery (maybe adding some asyncio.sleep in the recovery task to buy some time).

Expected behavior

Recovery should eventually finish.

Actual behavior

Faust is stuck in recovery.

cristianmatache avatar Nov 24 '24 13:11 cristianmatache

Suggestion:

    async def _fetch_records(
        self,
        consumer: aiokafka.AIOKafkaConsumer,
        active_partitions: Set[TP],
        timeout: Optional[float] = None,
        max_records: Optional[int] = None,
    ) -> RecordMap:
        if not self.consumer.flow_active:
            return {}
        fetcher = consumer._fetcher
        if consumer._closed or fetcher._closed:
            raise ConsumerStoppedError()

        # Force EARLIEST while recovering
        if self.app.in_recovery:
            fetcher._default_reset_strategy = OffsetResetStrategy.EARLIEST
        else:
            fetcher._default_reset_strategy = OffsetResetStrategy.from_str(self.app.conf.consumer_auto_offset_reset)

        with fetcher._subscriptions.fetch_context():
            try:
                return await fetcher.fetched_records(
                    active_partitions,
                    timeout=timeout,
                    max_records=max_records,
                )
            finally:
                pass

This would make recovery always complete. Ideally, faust would also detect when the offset is out of range (similar to aiokafka in https://github.com/aio-libs/aiokafka/blob/49a557334584b5a09e96e78609b111777e14b2fb/aiokafka/consumer/fetcher.py#L794-L796) and clear our partially rebuilt in-memory tables.

cristianmatache avatar Nov 24 '24 14:11 cristianmatache