kafka
kafka copied to clipboard
KAFKA-17411: Create local state Standbys on start
Instead of waiting until Tasks are assigned to us, we pre-emptively
create a StandbyTask for each non-empty Task directory found on-disk.
We do this before starting any StreamThreads, and on our first
assignment (after joining the consumer group), we recycle any of these
StandbyTasks that were assigned to us, either as an Active or a
Standby.
We can't just use these "initial Standbys" as-is, because they were
constructed outside the context of a StreamThread, so we first have to
update them with the context (log context, ChangelogReader, and source
topics) of the thread that it has been assigned to.
The motivation for this is to (in a later commit) read StateStore
offsets for unowned Tasks from the StateStore itself, rather than the
.checkpoint file, which we plan to deprecate and remove.
There are a few additional benefits:
-
Initializing these Tasks on start-up, instead of on-assignment, will reduce the time between a member joining the consumer group and beginning processing. This is especially important when active tasks are being moved over, for example, as part of a rolling restart.
-
If a Task has corrupt data on-disk, it will be discovered on startup and wiped under EOS. This is preferable to wiping the state after being assigned the Task, because another instance may have non-corrupt data and would not need to restore (as much).
There is a potential performance impact: we open all on-disk Task StateStores, and keep them all open until we have our first assignment. This could require large amounts of memory, in particular when there are a large number of local state stores on-disk.
However, since old local state for Tasks we don't own is automatically cleaned up after a period of time, in practice, we will almost always only be dealing with the state that was last assigned to the local instance.
Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
@lucasbru @mjsax @ableegoldman @cadonna Here's the first PR for KIP-1035. Caching of lag sums is not yet implemented (that will be in a follow-up PR), to keep things smaller and simpler to review.
PS. I reused KAFKA-14412 as the issue for this PR, and the other PRs in KIP-1035. Let me know if I should be opening a new ticket instead.
Thanks for the PR @nicktelford -- will put it on top of my review backlog.
Given that we have two KIPs, it might be better for tracking to have a new JIRA for KIP-1035 -- using a single JIRA for two KIPs might become messy.
Thanks for the PR @nicktelford -- will put it on top of my review backlog.
Given that we have two KIPs, it might be better for tracking to have a new JIRA for KIP-1035 -- using a single JIRA for two KIPs might become messy.
@mjsax I've opened KAFKA-17411 and updated the title of this PR to match.
@mjsax Following our conversation about suspend/resume of StandbyTask on Slack, I've changed the implementation to no longer suspend the pending tasks after initializing them (see https://github.com/apache/kafka/pull/16922/commits/4c07d8facd12b84fff40c5e1b28f1b94ff4169e9).
This means they will technically be in the RUNNING state before they've been assigned, but they won't actually be executed, because they won't have been added to a TaskRegistry or StateUpdater.
My thinking was that if SUSPENDED gets removed in the future, it would make no sense to depend on it here, and if we used that state, we would need to resume some StandbyTasks after assignment, which currently is not possible.
On the original version of the PR we discussed this race condition for "close unused pending tasks if last thread" -- is this resolve by moving this cleanup call into the state listener, ie, before we transit to RUNNING (would be great if you could confirm my understanding)
@mjsax Yes. KafkaStreams.StreamStateListener#maybeSetRunning() only sets the state to RUNNING once, after all the threads have reached the RUNNING state, so only that last RUNNING thread will call StateDirectory#closePendingTasks().
@mjsax @cadonna I believe I've addressed all of your feedback and I've rebased against trunk. Can you let me know if there's anything else that needs changing?
@ableegoldman also wanted to take a look...
Sorry for the delay Nick! I had a half-finished review sitting around for the past three weeks 😞
Sorry for the delay Nick! I had a half-finished review sitting around for the past three weeks 😞
No worries. I was on vacation last week anyway. :grin:
Rebased against trunk
@cadonna @mjsax Is there anything else that needs to be addressed here?
Merged to trunk. Thanks Nick!
Sweet! One down! -- Thanks a lot Nick, and Bruno/Sophie for helping with the review.
@nicktelford It seems this PR introduced some new behavior that breaks the E2E test StreamsUpgradeTest#test_rolling_upgrade_with_2_bounces. Could you please take a look at the following error and issue: KAFKA-17978?
[2024-11-10 17:47:44,001] ERROR stream-thread [StreamsUpgradeTest-4b4612d4-0d0f-41e2-8117-e1e9ba6a0d15-StreamThread-1] Failed to recycle task 1_0 cleanly. Attempting to close remaining tasks before re-throwing: (org.apache.kafka.streams.processor.internals.TaskManager)
org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic StreamsUpgradeTest-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000011-topic is unknown to the topology. This may happen if different KafkaStreams instances of the same application execute different Topologies. Note that Topologies are only identical if all operators are added in the same order.
at org.apache.kafka.streams.processor.internals.StreamTask$RecordQueueCreator.createQueue(StreamTask.java:1410)
at org.apache.kafka.streams.processor.internals.StreamTask.createPartitionQueues(StreamTask.java:238)
at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:214)
at org.apache.kafka.streams.processor.internals.ActiveTaskCreator.createActiveTaskFromStandby(ActiveTaskCreator.java:206)
at org.apache.kafka.streams.processor.internals.TaskManager.convertStandbyToActive(TaskManager.java:896)
at org.apache.kafka.streams.processor.internals.TaskManager.recycleTaskFromStateUpdater(TaskManager.java:992)
at org.apache.kafka.streams.processor.internals.TaskManager.handleStartupTaskReuse(TaskManager.java:568)
at org.apache.kafka.streams.processor.internals.TaskManager.handleTasksWithStateUpdater(TaskManager.java:545)
at org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:398)
at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1559)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:327)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:416)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:504)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:415)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:511)
at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.updateAssignmentMetadataIfNeeded(ClassicKafkaConsumer.java:657)
at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:621)
at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:602)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:836)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1266)
at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:1211)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:944)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
@chia7712 I'm afraid I'm unable to run this test suite; I get: FAIL: TimeoutError("Kafka server didn't finish startup in 60 seconds") for every test.
I think it's highly unlikely that this PR causes the error you've posted, as we didn't make any changes to the topology whatsoever. The full unit and integration test suite passes, which suggests that we're correctly updating the source topic partitions and topology metadata (these tests would otherwise fail).
Are you able to:
- Verify that it was definitely introduced by this commit, and was not present in the immediately preceding commit on trunk?
- Assuming the above, maybe provide a unit/integration test that I can run in order to dig into the issue?
- Failing that, can you provide any additional information that would help debug this issue?
@chia7712 Do you happen to see an error like:
WARN Detected unmatched input partitions for task 1_0 when recycling it from standby to active
Immediately before the above stack trace?
@chia7712 Sorry about this. I've managed to figure out how to run the E2E tests myself and it looks like this PR does cause it to break from version 3.4.1 onwards. I'm not sure why it only starts to break after 3.4.1, my best guess is prior to that topology metadata wasn't included in the rebalance.
I'm going to do some digging to see if I can understand what's gone wrong here. Will keep you updated on my progress.
@nicktelford thanks for the confirmation. Sorry for the delay in my response—I was a bit busy yesterday. I went through the PR and left a question. It might not be the root cause, but it would be great if you could take a look. Thanks!
@nicktelford @chia7712 This might be related to the situation described in the following comment: https://github.com/apache/kafka/blob/a951b73fa5d8ca63d4b22f25068e3f9a8501a3ff/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L519-L532
@cadonna yes, my comment (https://github.com/apache/kafka/pull/16922#discussion_r1837437347) was inspired by your comment :)
@cadonna @chia7712 Thanks guys, you're absolutely right. I hadn't realised this and spent several hours trying to figure out why topics in the topology metadata could lose their applicationId prefix :upside_down_face:
I've identified the root cause and I'm running a fix through all the test suites right now. Will open a new PR once I've verified it works as intended.
Reverted this PR in 4.0 branch (cf https://issues.apache.org/jira/browse/KAFKA-18498)