kafka
kafka copied to clipboard
KAFKA-17411: Create local state Standbys on start
Instead of waiting until Tasks are assigned to us, we pre-emptively
create a StandbyTask
for each non-empty Task directory found on-disk.
We do this before starting any StreamThread
s, and on our first
assignment (after joining the consumer group), we recycle any of these
StandbyTask
s that were assigned to us, either as an Active or a
Standby.
We can't just use these "initial Standbys" as-is, because they were
constructed outside the context of a StreamThread
, so we first have to
update them with the context (log context, ChangelogReader
, and source
topics) of the thread that it has been assigned to.
The motivation for this is to (in a later commit) read StateStore
offsets for unowned Tasks from the StateStore
itself, rather than the
.checkpoint
file, which we plan to deprecate and remove.
There are a few additional benefits:
-
Initializing these Tasks on start-up, instead of on-assignment, will reduce the time between a member joining the consumer group and beginning processing. This is especially important when active tasks are being moved over, for example, as part of a rolling restart.
-
If a Task has corrupt data on-disk, it will be discovered on startup and wiped under EOS. This is preferable to wiping the state after being assigned the Task, because another instance may have non-corrupt data and would not need to restore (as much).
There is a potential performance impact: we open all on-disk Task StateStores, and keep them all open until we have our first assignment. This could require large amounts of memory, in particular when there are a large number of local state stores on-disk.
However, since old local state for Tasks we don't own is automatically cleaned up after a period of time, in practice, we will almost always only be dealing with the state that was last assigned to the local instance.
Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)