kafka
kafka copied to clipboard
KAFKA-14133: Replace EasyMock with Mockito in streams test
Batch 3 of the tests detailed in https://issues.apache.org/jira/browse/KAFKA-14133 which use EasyMock and need to be moved to Mockito.
EDIT: This issue has been solved Unfortunately some tests are failing and I am not exactly sure how and why this is happening, specifically
-
shouldNotUpdateLimitForNonSourceStandbyChangelog
-
shouldOnlyRestoreStandbyChangelogInUpdateStandbyState
-
shouldRestoreToLimitInStandbyState
As you can see from the diff in this PR, the changes from EasyMock
to Mockito
in these functions was quite trivial/mechanical but the tests are failing with errors that are confusing to me, i.e.
[2022-08-17 12:13:33,803] DEBUG test-reader Transiting to update standby tasks: {} (org.apache.kafka.streams.processor.internals.StoreChangelogReader:317)
[2022-08-17 12:13:33,803] DEBUG test-reader Resumed partitions [] from the restore consumer (org.apache.kafka.streams.processor.internals.StoreChangelogReader:899)
[2022-08-17 12:13:33,804] DEBUG test-reader Added partitions [topic-0] to the restore consumer, current assignment is [topic-0] (org.apache.kafka.streams.processor.internals.StoreChangelogReader:861)
[2022-08-17 12:13:33,804] DEBUG test-reader Start restoring changelog partition topic-0 from current offset 0 to end offset 7. (org.apache.kafka.streams.processor.internals.StoreChangelogReader:918)
[2022-08-17 12:13:33,805] WARN test-reader Encountered org.apache.kafka.clients.consumer.OffsetOutOfRangeException fetching records from restore consumer for partitions [topic-0], it is likely that the consumer's position has fallen out of the topic partition offset range because the topic was truncated or compacted on the broker, marking the corresponding tasks as corrupted and re-initializing it later. (org.apache.kafka.streams.processor.internals.StoreChangelogReader:438)
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {topic-0=1}
at org.apache.kafka.clients.consumer.MockConsumer.poll(MockConsumer.java:208)
at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433)
at org.apache.kafka.streams.processor.internals.StoreChangelogReaderTest.shouldRestoreToLimitInStandbyState(StoreChangelogReaderTest.java:836)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.mockito.internal.junit.JUnitSessionStore$1.evaluateSafely(JUnitSessionStore.java:55)
at org.mockito.internal.junit.JUnitSessionStore$1.evaluate(JUnitSessionStore.java:43)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runners.Suite.runChild(Suite.java:128)
at org.junit.runners.Suite.runChild(Suite.java:27)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
at org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
at org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
at org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:99)
at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:79)
at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:75)
at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:61)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
at com.sun.proxy.$Proxy2.stop(Unknown Source)
at org.gradle.api.internal.tasks.testing.worker.TestWorker$3.run(TestWorker.java:193)
at org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:133)
at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:71)
at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
Tasks [null] are corrupted and hence needs to be re-initialized
org.apache.kafka.streams.errors.TaskCorruptedException: Tasks [null] are corrupted and hence needs to be re-initialized
at app//org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446)
at app//org.apache.kafka.streams.processor.internals.StoreChangelogReaderTest.shouldRestoreToLimitInStandbyState(StoreChangelogReaderTest.java:836)
@cadonna @clolov @C0urante @ijuma Maybe you have an idea what is going on here? Also note that due to Mockito not working with inline thenReturn
mock statements (see https://github.com/mockito/mockito/wiki/FAQ#can-i-thenreturn-an-inlined-mock-) I did some refactorings in StoreChangelogReaderTest
, i.e. mockTasks
and mockTask
have been moved from inner local methods to the class. This was also done as an attempt to solve the aforementioned problems but it ended up having zero effect (also note in general apart from these problems it had zero effect on the tests).
Also note that @SuppressWarnings("unchecked")
is necessary because unlike EasyMock, Mockito doesn't have any support for generics in the mock
method (see https://github.com/mockito/mockito/issues/1531)
Finally with ProcesserStateManagerTest
some verifications, stubbed calls have been removed for the same reasons behind https://github.com/apache/kafka/pull/12505#discussion_r947652273, i.e. MockitoJUnitRunner.StrictStubs
was complaining that there was extra erroneous stubs/verifications and I needed to remove them to make StrictStubs
happy. This stubbing was also unnecessary in EasyMock
.
Hey @mdedetrich--it looks like the issue here is caused by differences in default return values for boxed primitive types between Mockito mocks and EasyMock nice mocks; the former will return 0, but the latter will return null.
Adding when(storeMetadata.offset()).thenReturn(null);
to the beginning of each of the failing test cases mentioned in the description seems to fix the issue.
I'm not too familiar with the Streams codebase, so probably best to get review from someone else, but if there are any other strange issues that don't seem Streams-specific, let me know!
@C0urante Thanks for finding the root cause, will definitely keep this as a mental note when doing other migrations. I'll update the PR later tonight
PR has been rebased and pushed which fixes the problem described earlier, i.e. usage of when(storeMetadata.offset()).thenReturn(null);
@cadonna I have just rebased the PR with the latest streams changes that got merged into. trunk. I also did some more improvements for StreamThreadTest
with the usage of MockitoJUnitRunner.StrictStubs
, i.e. before where we had
verify(taskManager)
we now have
verify(taskManager).handleCorruption(corruptedTasks);
Also MockitoJUnitRunner.StrictStubs
revealed pointless stubs have been removed.
I also did 20k runs of StreamThreadTest
locally to test for flakiness (so good so far).
Thanks for the comments, I am still get my head around the tests and what precisely is being tested.
I forgot to mention earlier that StreamTaskTest
needs more work (doing this now, will push changes). There are other fixes needed due to the MockitoJUnitRunner.StrictStubs
, i.e. some tests throw exceptions and expect that exception path to never be hit and this needs to be replaced with verify(times(0))
or something similar.
@cadonna Okay so I have just pushed and rebased the PR with these changes
- As you have rightly pointed out there were a lot of things being stubbed/tested which made no sense due to peculiarities of how
EasyMock
works (which I am still trying to get my head around). I believe I have removed most of the unnecessaryreset
s/stubs. There are still someverify
's which can be argued are pointless (mainly in the setup of the mock), if you still want to remove these then let me know -
StreamTaskTest
previously withEasyMock
used stub exceptions to verify a method was not meant to be called, i.e.
This has now been replaced with usingEasyMock.expectLastCall().andThrow(new AssertionError("Should not have tried to checkpoint"));
never
, i.e. the above example translates toverify(stateManager, never()).checkpoint();
which achieves the same effect in a more principled way.
@cadonna I have addressed the comments which I could. I just noticed that I have gone back and forth between removing and adding the same stubs/verifications in various places without realizing due to me force pushing (which I also think effected your reviewing) so I pushed each of the changes as an individual commit to better track changes. If you want me to squash the PR then let me know.
@mdedetrich Some of our discussions on this PR are still unresolved. Did you forget to push some commits?
@cadonna Apologies, I must have missed the replies to the comments, will look at them today. Note that https://github.com/apache/kafka/pull/12524#discussion_r975464527 is still waiting for a response from you.
@mdedetrich I answered your question about InOrder
. Sorry, I somehow missed that.
Note that I unresolved other two comments: https://github.com/apache/kafka/pull/12524/files#r984332259 https://github.com/apache/kafka/pull/12524/files#r984332539 Let me know if my comments are not clear.
@cadonna
https://github.com/apache/kafka/pull/12524/files#r984332259
This one is resolved now, I think I somehow missed that, apologies
https://github.com/apache/kafka/pull/12524/files#r984332539
So this one is not clear to me, commented on it. I think you might be viewing an old commit?
Hello @mdedetrich and @cadonna! Is there something still remaining for this pull request or are we ready to merge it? There are around 4 pull requests remaining of this migration and I would like to get them to a completion 😊
Sorry I have just been busy with other things, will look into it next week
Can this be closed now? It looks like this has been done in https://github.com/apache/kafka/commit/d4c95cfc2a48bf5f09e5a213e172c8883b484534
Yes