kafka icon indicating copy to clipboard operation
kafka copied to clipboard

KAFKA-14133: Replace EasyMock with Mockito in streams test

Open mdedetrich opened this issue 2 years ago • 7 comments

Batch 3 of the tests detailed in https://issues.apache.org/jira/browse/KAFKA-14133 which use EasyMock and need to be moved to Mockito.

EDIT: This issue has been solved Unfortunately some tests are failing and I am not exactly sure how and why this is happening, specifically

  • shouldNotUpdateLimitForNonSourceStandbyChangelog
  • shouldOnlyRestoreStandbyChangelogInUpdateStandbyState
  • shouldRestoreToLimitInStandbyState

As you can see from the diff in this PR, the changes from EasyMock to Mockito in these functions was quite trivial/mechanical but the tests are failing with errors that are confusing to me, i.e.

[2022-08-17 12:13:33,803] DEBUG test-reader Transiting to update standby tasks: {} (org.apache.kafka.streams.processor.internals.StoreChangelogReader:317)
[2022-08-17 12:13:33,803] DEBUG test-reader Resumed partitions [] from the restore consumer (org.apache.kafka.streams.processor.internals.StoreChangelogReader:899)
[2022-08-17 12:13:33,804] DEBUG test-reader Added partitions [topic-0] to the restore consumer, current assignment is [topic-0] (org.apache.kafka.streams.processor.internals.StoreChangelogReader:861)
[2022-08-17 12:13:33,804] DEBUG test-reader Start restoring changelog partition topic-0 from current offset 0 to end offset 7. (org.apache.kafka.streams.processor.internals.StoreChangelogReader:918)
[2022-08-17 12:13:33,805] WARN test-reader Encountered org.apache.kafka.clients.consumer.OffsetOutOfRangeException fetching records from restore consumer for partitions [topic-0], it is likely that the consumer's position has fallen out of the topic partition offset range because the topic was truncated or compacted on the broker, marking the corresponding tasks as corrupted and re-initializing it later. (org.apache.kafka.streams.processor.internals.StoreChangelogReader:438)
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {topic-0=1}
	at org.apache.kafka.clients.consumer.MockConsumer.poll(MockConsumer.java:208)
	at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433)
	at org.apache.kafka.streams.processor.internals.StoreChangelogReaderTest.shouldRestoreToLimitInStandbyState(StoreChangelogReaderTest.java:836)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
	at org.mockito.internal.junit.JUnitSessionStore$1.evaluateSafely(JUnitSessionStore.java:55)
	at org.mockito.internal.junit.JUnitSessionStore$1.evaluate(JUnitSessionStore.java:43)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.junit.runners.Suite.runChild(Suite.java:128)
	at org.junit.runners.Suite.runChild(Suite.java:27)
	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
	at org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
	at org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
	at org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
	at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
	at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
	at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:99)
	at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:79)
	at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:75)
	at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:61)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
	at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
	at com.sun.proxy.$Proxy2.stop(Unknown Source)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker$3.run(TestWorker.java:193)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
	at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:133)
	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:71)
	at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
	at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)

Tasks [null] are corrupted and hence needs to be re-initialized
org.apache.kafka.streams.errors.TaskCorruptedException: Tasks [null] are corrupted and hence needs to be re-initialized
	at app//org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446)
	at app//org.apache.kafka.streams.processor.internals.StoreChangelogReaderTest.shouldRestoreToLimitInStandbyState(StoreChangelogReaderTest.java:836)

@cadonna @clolov @C0urante @ijuma Maybe you have an idea what is going on here? Also note that due to Mockito not working with inline thenReturn mock statements (see https://github.com/mockito/mockito/wiki/FAQ#can-i-thenreturn-an-inlined-mock-) I did some refactorings in StoreChangelogReaderTest, i.e. mockTasks and mockTask have been moved from inner local methods to the class. This was also done as an attempt to solve the aforementioned problems but it ended up having zero effect (also note in general apart from these problems it had zero effect on the tests).

Also note that @SuppressWarnings("unchecked") is necessary because unlike EasyMock, Mockito doesn't have any support for generics in the mock method (see https://github.com/mockito/mockito/issues/1531)

Finally with ProcesserStateManagerTest some verifications, stubbed calls have been removed for the same reasons behind https://github.com/apache/kafka/pull/12505#discussion_r947652273, i.e. MockitoJUnitRunner.StrictStubs was complaining that there was extra erroneous stubs/verifications and I needed to remove them to make StrictStubs happy. This stubbing was also unnecessary in EasyMock.

mdedetrich avatar Aug 17 '22 10:08 mdedetrich

Hey @mdedetrich--it looks like the issue here is caused by differences in default return values for boxed primitive types between Mockito mocks and EasyMock nice mocks; the former will return 0, but the latter will return null.

Adding when(storeMetadata.offset()).thenReturn(null); to the beginning of each of the failing test cases mentioned in the description seems to fix the issue.

I'm not too familiar with the Streams codebase, so probably best to get review from someone else, but if there are any other strange issues that don't seem Streams-specific, let me know!

C0urante avatar Aug 17 '22 15:08 C0urante

@C0urante Thanks for finding the root cause, will definitely keep this as a mental note when doing other migrations. I'll update the PR later tonight

mdedetrich avatar Aug 17 '22 16:08 mdedetrich

PR has been rebased and pushed which fixes the problem described earlier, i.e. usage of when(storeMetadata.offset()).thenReturn(null);

mdedetrich avatar Aug 18 '22 07:08 mdedetrich

@cadonna I have just rebased the PR with the latest streams changes that got merged into. trunk. I also did some more improvements for StreamThreadTest with the usage of MockitoJUnitRunner.StrictStubs, i.e. before where we had

verify(taskManager)

we now have

verify(taskManager).handleCorruption(corruptedTasks);

Also MockitoJUnitRunner.StrictStubs revealed pointless stubs have been removed.

I also did 20k runs of StreamThreadTest locally to test for flakiness (so good so far).

mdedetrich avatar Sep 12 '22 10:09 mdedetrich

Thanks for the comments, I am still get my head around the tests and what precisely is being tested.

I forgot to mention earlier that StreamTaskTest needs more work (doing this now, will push changes). There are other fixes needed due to the MockitoJUnitRunner.StrictStubs, i.e. some tests throw exceptions and expect that exception path to never be hit and this needs to be replaced with verify(times(0)) or something similar.

mdedetrich avatar Sep 12 '22 15:09 mdedetrich

@cadonna Okay so I have just pushed and rebased the PR with these changes

  • As you have rightly pointed out there were a lot of things being stubbed/tested which made no sense due to peculiarities of how EasyMock works (which I am still trying to get my head around). I believe I have removed most of the unnecessary resets/stubs. There are still some verify's which can be argued are pointless (mainly in the setup of the mock), if you still want to remove these then let me know
  • StreamTaskTest previously with EasyMock used stub exceptions to verify a method was not meant to be called, i.e.
    EasyMock.expectLastCall().andThrow(new AssertionError("Should not have tried to checkpoint"));
    
    This has now been replaced with using never, i.e. the above example translates to verify(stateManager, never()).checkpoint(); which achieves the same effect in a more principled way.

mdedetrich avatar Sep 20 '22 10:09 mdedetrich

@cadonna I have addressed the comments which I could. I just noticed that I have gone back and forth between removing and adding the same stubs/verifications in various places without realizing due to me force pushing (which I also think effected your reviewing) so I pushed each of the changes as an individual commit to better track changes. If you want me to squash the PR then let me know.

mdedetrich avatar Sep 20 '22 14:09 mdedetrich

@mdedetrich Some of our discussions on this PR are still unresolved. Did you forget to push some commits?

cadonna avatar Sep 30 '22 07:09 cadonna

@cadonna Apologies, I must have missed the replies to the comments, will look at them today. Note that https://github.com/apache/kafka/pull/12524#discussion_r975464527 is still waiting for a response from you.

mdedetrich avatar Sep 30 '22 08:09 mdedetrich

@mdedetrich I answered your question about InOrder. Sorry, I somehow missed that.

Note that I unresolved other two comments: https://github.com/apache/kafka/pull/12524/files#r984332259 https://github.com/apache/kafka/pull/12524/files#r984332539 Let me know if my comments are not clear.

cadonna avatar Sep 30 '22 08:09 cadonna

@cadonna

https://github.com/apache/kafka/pull/12524/files#r984332259

This one is resolved now, I think I somehow missed that, apologies

https://github.com/apache/kafka/pull/12524/files#r984332539

So this one is not clear to me, commented on it. I think you might be viewing an old commit?

mdedetrich avatar Sep 30 '22 08:09 mdedetrich

Hello @mdedetrich and @cadonna! Is there something still remaining for this pull request or are we ready to merge it? There are around 4 pull requests remaining of this migration and I would like to get them to a completion 😊

clolov avatar Feb 02 '23 13:02 clolov

Sorry I have just been busy with other things, will look into it next week

mdedetrich avatar Feb 02 '23 18:02 mdedetrich

Can this be closed now? It looks like this has been done in https://github.com/apache/kafka/commit/d4c95cfc2a48bf5f09e5a213e172c8883b484534

mimaison avatar Jun 10 '24 09:06 mimaison

Yes

cadonna avatar Jun 10 '24 10:06 cadonna