kafka
kafka copied to clipboard
KAFKA-18913: Removing _state.updater.enabled_ flag through the Stream…
…Thread#runOnceWithoutProcessingThreads flow.
Removed code of the method StreamThread#initializeAndRestorePhase, since it was only used in the negation flow. Also removed the flag entirely from the StreamThread#runOnceWithoutProcessingThreads method.
Will remove the flag and the related code entirely from the future commits!
Hi @cadonna ,
Would you be able to verify if the changes I have done are correct, like is this what is required?
Also TaskManager#needsInitializationOrRestoration had only one usage through StreamThread#initializeAndRestorePhase method (which was removed). And TaskManager#tryToCompleteRestoration also had only one usage but lot of test cases around 53 I guess. So wanted to know if I should go levels below and remove the code or just top level code.
I will do the changes to remove the flag from the other places and commit them, in the meantime would be great if you can verify if my approach is correct. Also could you also start the workflow as well?
Hi @cadonna ,
Thank you for the review!
Really sorry that you had to go through the indentation thing, I must have selected the whole file by accident and clicked ctrl+alt+l!
Anyway the indentation thing is now resolved in all places and some other reviews has also been resolved now. I'll attend to the rest(Removing the flag from the rest of the codebase) within this week.
Hi @cadonna ,
I have removed the stateUpdaterEnabled flag and the related code completely from the codebase, and only remaining part is to remove it from a readme file and a python test case. And sorry for the force pushes and the PR being a bit messy!
FYI: I will not be online for the next 1.5 weeks.
@janchilling Is the PR ready for re-review?
@janchilling Is the PR ready for re-review?
Hi @cadonna Not yet, I am having some university exams and assignments to complete, so could not focus on this for the past few weeks. But I'll make sure this is ready by the end of this week.
Have a few questions also, some test cases in the StreamThreadTest is failing due to this line, https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java/#L252 . Basically since in some test cases we don't directly create a thread (https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java/#L3647 ). And I am a bit unsure about what can be done here, so maybe can you give me a pointer?
Hi @cadonna Not yet, I am having some university exams and assignments to complete, so could not focus on this for the past few weeks. But I'll make sure this is ready by the end of this week.
No worries!
Hi @cadonna ,
I think I have completed all, sorry for the delay though.
Only one problem remains. Which is the StreamThreadTest#testNamedTopologyWithStreamsProtocol and StreamThreadTest#testStreamsRebalanceDataWithExtraCopartition tests will fail since there will be a StateUpdater thread running, which is created through the StreamThread#createAndStartStateUpdater in the StreamThread#create. These tests fails during the teardown process, due to the StateUpdater thread being left to run even though an IllegalStateException thrown before the StreamThread is created from the above 2 test cases. I guess this is a bug since there cannot be a StateUpdater Thread running when a StreamThread is not created.
I will create a separate PR to the above issue with the solution, I guess then we'll have to merge that PR before merging this.
Hi @janchilling ,
Sorry for the long silence but I was quite busy recently.
Yeah, I agree with you about the bug regarding the state updater thread that is not torn down after the IllegalStateException. However, rather than surrounding the code in the StreamThread#create() with a try-catch-clause, I propose to create an init() method in the TaskManager that starts the state updater. The init() method is then called in the beginning of the run() method of the stream thread. In such a way we have starting and shutting down the state updater thread encapsulated in the TaskManager which ensures that the state updater thread it not started before the stream thread is started.
Could you open a separate PR just for that? If that solves the issue, we would first merge that PR and then merge this PR on top of the that PR.
Hey @janchilling are you still working on this? Just checking.
@lucasbru Yes, they are. They were sidetracked by the following PR: https://github.com/apache/kafka/pull/19889#issuecomment-2937205109 See also the comment about availability in the next two weeks.
Hi @cadonna ,
I rewrote the StreamThreadTest#shouldOnlyCompleteShutdownAfterRebalanceNotInProgress . Can you check if what I have written is correct? I specifically had to check if the StateUpdater thread is running, because otherwise the test fail. Therefore I checked if a StateUpdater thread is present from the Thread Stacktraces.
Also, can you help me identify the test cases that needs a rewrite from the TaskManagerTest class (This would be of great help). I will be available from today onwards and will give full focus to this and try to finish this by next week.
Hi @cadonna ,
I kinda need help with updating the Test cases in the TaskManagerTest class.
@janchilling Thank you for your patience! I am sorry, I am quite busy at the moment and I am not able to carve out some time to review this PR. \cc @lucasbru @mjsax @bbejeck
@janchilling -- Had a very brief look into this PR, and it is very large. I would propose to split it up, into multiple smaller PRs to allow us to make incremental progress.
Closing this PR due to inactivity. It did become stale, and somebody else picked up this ticket now.