kafka icon indicating copy to clipboard operation
kafka copied to clipboard

KAFKA-17411: Create local state Standbys on start

Open nicktelford opened this issue 1 year ago • 4 comments
trafficstars

Instead of waiting until Tasks are assigned to us, we pre-emptively create a StandbyTask for each non-empty Task directory found on-disk.

We do this before starting any StreamThreads, and on our first assignment (after joining the consumer group), we recycle any of these StandbyTasks that were assigned to us, either as an Active or a Standby.

We can't just use these "initial Standbys" as-is, because they were constructed outside the context of a StreamThread, so we first have to update them with the context (log context, ChangelogReader, and source topics) of the thread that it has been assigned to.

The motivation for this is to (in a later commit) read StateStore offsets for unowned Tasks from the StateStore itself, rather than the .checkpoint file, which we plan to deprecate and remove.

There are a few additional benefits:

  1. Initializing these Tasks on start-up, instead of on-assignment, will reduce the time between a member joining the consumer group and beginning processing. This is especially important when active tasks are being moved over, for example, as part of a rolling restart.

  2. If a Task has corrupt data on-disk, it will be discovered on startup and wiped under EOS. This is preferable to wiping the state after being assigned the Task, because another instance may have non-corrupt data and would not need to restore (as much).

There is a potential performance impact: we open all on-disk Task StateStores, and keep them all open until we have our first assignment. This could require large amounts of memory, in particular when there are a large number of local state stores on-disk.

However, since old local state for Tasks we don't own is automatically cleaned up after a period of time, in practice, we will almost always only be dealing with the state that was last assigned to the local instance.

Committer Checklist (excluded from commit message)

  • [ ] Verify design and implementation
  • [ ] Verify test coverage and CI build status
  • [ ] Verify documentation (including upgrade notes)

nicktelford avatar Aug 19 '24 16:08 nicktelford

@lucasbru @mjsax @ableegoldman @cadonna Here's the first PR for KIP-1035. Caching of lag sums is not yet implemented (that will be in a follow-up PR), to keep things smaller and simpler to review.

nicktelford avatar Aug 19 '24 16:08 nicktelford

PS. I reused KAFKA-14412 as the issue for this PR, and the other PRs in KIP-1035. Let me know if I should be opening a new ticket instead.

nicktelford avatar Aug 19 '24 16:08 nicktelford

Thanks for the PR @nicktelford -- will put it on top of my review backlog.

Given that we have two KIPs, it might be better for tracking to have a new JIRA for KIP-1035 -- using a single JIRA for two KIPs might become messy.

mjsax avatar Aug 23 '24 03:08 mjsax

Thanks for the PR @nicktelford -- will put it on top of my review backlog.

Given that we have two KIPs, it might be better for tracking to have a new JIRA for KIP-1035 -- using a single JIRA for two KIPs might become messy.

@mjsax I've opened KAFKA-17411 and updated the title of this PR to match.

nicktelford avatar Aug 23 '24 09:08 nicktelford

@mjsax Following our conversation about suspend/resume of StandbyTask on Slack, I've changed the implementation to no longer suspend the pending tasks after initializing them (see https://github.com/apache/kafka/pull/16922/commits/4c07d8facd12b84fff40c5e1b28f1b94ff4169e9).

This means they will technically be in the RUNNING state before they've been assigned, but they won't actually be executed, because they won't have been added to a TaskRegistry or StateUpdater.

My thinking was that if SUSPENDED gets removed in the future, it would make no sense to depend on it here, and if we used that state, we would need to resume some StandbyTasks after assignment, which currently is not possible.

nicktelford avatar Sep 05 '24 11:09 nicktelford

On the original version of the PR we discussed this race condition for "close unused pending tasks if last thread" -- is this resolve by moving this cleanup call into the state listener, ie, before we transit to RUNNING (would be great if you could confirm my understanding)

@mjsax Yes. KafkaStreams.StreamStateListener#maybeSetRunning() only sets the state to RUNNING once, after all the threads have reached the RUNNING state, so only that last RUNNING thread will call StateDirectory#closePendingTasks().

nicktelford avatar Oct 01 '24 15:10 nicktelford

@mjsax @cadonna I believe I've addressed all of your feedback and I've rebased against trunk. Can you let me know if there's anything else that needs changing?

nicktelford avatar Oct 15 '24 15:10 nicktelford

@ableegoldman also wanted to take a look...

mjsax avatar Oct 16 '24 21:10 mjsax

Sorry for the delay Nick! I had a half-finished review sitting around for the past three weeks 😞

ableegoldman avatar Oct 24 '24 04:10 ableegoldman

Sorry for the delay Nick! I had a half-finished review sitting around for the past three weeks 😞

No worries. I was on vacation last week anyway. :grin:

nicktelford avatar Oct 28 '24 10:10 nicktelford

Rebased against trunk

@cadonna @mjsax Is there anything else that needs to be addressed here?

nicktelford avatar Oct 29 '24 10:10 nicktelford

Merged to trunk. Thanks Nick!

ableegoldman avatar Oct 29 '24 19:10 ableegoldman

Sweet! One down! -- Thanks a lot Nick, and Bruno/Sophie for helping with the review.

mjsax avatar Oct 30 '24 06:10 mjsax

@nicktelford It seems this PR introduced some new behavior that breaks the E2E test StreamsUpgradeTest#test_rolling_upgrade_with_2_bounces. Could you please take a look at the following error and issue: KAFKA-17978?

[2024-11-10 17:47:44,001] ERROR stream-thread [StreamsUpgradeTest-4b4612d4-0d0f-41e2-8117-e1e9ba6a0d15-StreamThread-1] Failed to recycle task 1_0 cleanly. Attempting to close remaining tasks before re-throwing: (org.apache.kafka.streams.processor.internals.TaskManager)
org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic StreamsUpgradeTest-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000011-topic is unknown to the topology. This may happen if different KafkaStreams instances of the same application execute different Topologies. Note that Topologies are only identical if all operators are added in the same order.
	at org.apache.kafka.streams.processor.internals.StreamTask$RecordQueueCreator.createQueue(StreamTask.java:1410)
	at org.apache.kafka.streams.processor.internals.StreamTask.createPartitionQueues(StreamTask.java:238)
	at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:214)
	at org.apache.kafka.streams.processor.internals.ActiveTaskCreator.createActiveTaskFromStandby(ActiveTaskCreator.java:206)
	at org.apache.kafka.streams.processor.internals.TaskManager.convertStandbyToActive(TaskManager.java:896)
	at org.apache.kafka.streams.processor.internals.TaskManager.recycleTaskFromStateUpdater(TaskManager.java:992)
	at org.apache.kafka.streams.processor.internals.TaskManager.handleStartupTaskReuse(TaskManager.java:568)
	at org.apache.kafka.streams.processor.internals.TaskManager.handleTasksWithStateUpdater(TaskManager.java:545)
	at org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:398)
	at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1559)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:327)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:416)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:504)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:415)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:511)
	at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.updateAssignmentMetadataIfNeeded(ClassicKafkaConsumer.java:657)
	at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:621)
	at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:602)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:836)
	at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1266)
	at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:1211)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:944)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)

chia7712 avatar Nov 10 '24 18:11 chia7712

@chia7712 I'm afraid I'm unable to run this test suite; I get: FAIL: TimeoutError("Kafka server didn't finish startup in 60 seconds") for every test.

I think it's highly unlikely that this PR causes the error you've posted, as we didn't make any changes to the topology whatsoever. The full unit and integration test suite passes, which suggests that we're correctly updating the source topic partitions and topology metadata (these tests would otherwise fail).

Are you able to:

  1. Verify that it was definitely introduced by this commit, and was not present in the immediately preceding commit on trunk?
  2. Assuming the above, maybe provide a unit/integration test that I can run in order to dig into the issue?
  3. Failing that, can you provide any additional information that would help debug this issue?

nicktelford avatar Nov 11 '24 11:11 nicktelford

@chia7712 Do you happen to see an error like:

WARN Detected unmatched input partitions for task 1_0 when recycling it from standby to active

Immediately before the above stack trace?

nicktelford avatar Nov 11 '24 12:11 nicktelford

@chia7712 Sorry about this. I've managed to figure out how to run the E2E tests myself and it looks like this PR does cause it to break from version 3.4.1 onwards. I'm not sure why it only starts to break after 3.4.1, my best guess is prior to that topology metadata wasn't included in the rebalance.

I'm going to do some digging to see if I can understand what's gone wrong here. Will keep you updated on my progress.

nicktelford avatar Nov 11 '24 20:11 nicktelford

@nicktelford thanks for the confirmation. Sorry for the delay in my response—I was a bit busy yesterday. I went through the PR and left a question. It might not be the root cause, but it would be great if you could take a look. Thanks!

chia7712 avatar Nov 12 '24 03:11 chia7712

@nicktelford @chia7712 This might be related to the situation described in the following comment: https://github.com/apache/kafka/blob/a951b73fa5d8ca63d4b22f25068e3f9a8501a3ff/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L519-L532

cadonna avatar Nov 12 '24 07:11 cadonna

@cadonna yes, my comment (https://github.com/apache/kafka/pull/16922#discussion_r1837437347) was inspired by your comment :)

chia7712 avatar Nov 12 '24 07:11 chia7712

@cadonna @chia7712 Thanks guys, you're absolutely right. I hadn't realised this and spent several hours trying to figure out why topics in the topology metadata could lose their applicationId prefix :upside_down_face:

I've identified the root cause and I'm running a fix through all the test suites right now. Will open a new PR once I've verified it works as intended.

nicktelford avatar Nov 12 '24 11:11 nicktelford

Reverted this PR in 4.0 branch (cf https://issues.apache.org/jira/browse/KAFKA-18498)

mjsax avatar Jan 13 '25 19:01 mjsax