kafka icon indicating copy to clipboard operation
kafka copied to clipboard

KAFKA-17411: Create local state Standbys on start

Open nicktelford opened this issue 6 months ago • 4 comments

Instead of waiting until Tasks are assigned to us, we pre-emptively create a StandbyTask for each non-empty Task directory found on-disk.

We do this before starting any StreamThreads, and on our first assignment (after joining the consumer group), we recycle any of these StandbyTasks that were assigned to us, either as an Active or a Standby.

We can't just use these "initial Standbys" as-is, because they were constructed outside the context of a StreamThread, so we first have to update them with the context (log context, ChangelogReader, and source topics) of the thread that it has been assigned to.

The motivation for this is to (in a later commit) read StateStore offsets for unowned Tasks from the StateStore itself, rather than the .checkpoint file, which we plan to deprecate and remove.

There are a few additional benefits:

  1. Initializing these Tasks on start-up, instead of on-assignment, will reduce the time between a member joining the consumer group and beginning processing. This is especially important when active tasks are being moved over, for example, as part of a rolling restart.

  2. If a Task has corrupt data on-disk, it will be discovered on startup and wiped under EOS. This is preferable to wiping the state after being assigned the Task, because another instance may have non-corrupt data and would not need to restore (as much).

There is a potential performance impact: we open all on-disk Task StateStores, and keep them all open until we have our first assignment. This could require large amounts of memory, in particular when there are a large number of local state stores on-disk.

However, since old local state for Tasks we don't own is automatically cleaned up after a period of time, in practice, we will almost always only be dealing with the state that was last assigned to the local instance.

Committer Checklist (excluded from commit message)

  • [ ] Verify design and implementation
  • [ ] Verify test coverage and CI build status
  • [ ] Verify documentation (including upgrade notes)

nicktelford avatar Aug 19 '24 16:08 nicktelford