alpakka-kafka icon indicating copy to clipboard operation
alpakka-kafka copied to clipboard

Missing default testkit config breaks simple integration tests that use Alpakka Kafka Testkit

Open seglo opened this issue 5 years ago • 3 comments

Versions used

Akka version: 2.5.20 Alpakka Kafka version: 1.0.1

Expected Behavior

Using Alpakka Kafka Testkit with defaults should run a simple integration test.

Actual Behavior

Several Akka Testkit and Alpakka Kafka Testkit settings are required to run relatively simple integration tests. I've noticed that the following configuration is necessary to run most tests out of the box (this can be found in /tests/src/test/resources/application.conf).

akka {
  test {
    single-expect-default = 10s
  }

  kafka.consumer {
    stop-timeout = 10ms
  }
}

For example, running akka.kafka.scaladsl.IntegrationSpec, "produce to plainSink and consume from plainSource". When akka.test.single-expect-default is not defined at something greater than than the default of 3s I get the following error.

assertion failed: timeout (3 seconds) during expectMsg while waiting for OnNext(1)
java.lang.AssertionError: assertion failed: timeout (3 seconds) during expectMsg while waiting for OnNext(1)
	at scala.Predef$.assert(Predef.scala:223)
	at akka.testkit.TestKitBase.expectMsg_internal(TestKit.scala:402)
	at akka.testkit.TestKitBase.expectMsg(TestKit.scala:379)
	at akka.testkit.TestKitBase.expectMsg$(TestKit.scala:379)
	at akka.testkit.TestKit.expectMsg(TestKit.scala:896)
	at akka.stream.testkit.TestSubscriber$ManualProbe.$anonfun$expectNextN$1(StreamTestKit.scala:397)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at akka.stream.testkit.TestSubscriber$ManualProbe.expectNextN(StreamTestKit.scala:397)
	at akka.kafka.scaladsl.IntegrationSpec.$anonfun$new$3(IntegrationSpec.scala:55)
	at akka.stream.testkit.scaladsl.StreamTestKit$.assertAllStagesStopped(StreamTestKit.scala:30)
	at akka.kafka.scaladsl.IntegrationSpec.$anonfun$new$2(IntegrationSpec.scala:43)
	at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:1078)
	at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
	at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
	at akka.kafka.scaladsl.SpecBase.withFixture(SpecBase.scala:13)
	at org.scalatest.WordSpecLike.invokeWithFixture$1(WordSpecLike.scala:1076)
	at org.scalatest.WordSpecLike.$anonfun$runTest$1(WordSpecLike.scala:1088)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
	at org.scalatest.WordSpecLike.runTest(WordSpecLike.scala:1088)
	at org.scalatest.WordSpecLike.runTest$(WordSpecLike.scala:1070)
	at akka.kafka.scaladsl.SpecBase.runTest(SpecBase.scala:13)
	at org.scalatest.WordSpecLike.$anonfun$runTests$1(WordSpecLike.scala:1147)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:396)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:373)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:410)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:379)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
	at org.scalatest.WordSpecLike.runTests(WordSpecLike.scala:1147)
	at org.scalatest.WordSpecLike.runTests$(WordSpecLike.scala:1146)
	at akka.kafka.scaladsl.SpecBase.runTests(SpecBase.scala:13)
	at org.scalatest.Suite.run(Suite.scala:1147)
	at org.scalatest.Suite.run$(Suite.scala:1129)
	at akka.kafka.testkit.scaladsl.ScalatestKafkaSpec.org$scalatest$BeforeAndAfterAll$$super$run(ScalatestKafkaSpec.scala:11)
	at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
	at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
	at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
	at akka.kafka.scaladsl.SpecBase.org$scalatest$WordSpecLike$$super$run(SpecBase.scala:13)
	at org.scalatest.WordSpecLike.$anonfun$run$1(WordSpecLike.scala:1192)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
	at org.scalatest.WordSpecLike.run(WordSpecLike.scala:1192)
	at org.scalatest.WordSpecLike.run$(WordSpecLike.scala:1190)
	at akka.kafka.scaladsl.SpecBase.run(SpecBase.scala:13)
	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1346)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1340)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1340)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:1031)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:1010)
	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1506)
	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
	at org.scalatest.tools.Runner$.run(Runner.scala:850)
	at org.scalatest.tools.Runner.run(Runner.scala)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:131)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)

When akka.kafka.consumer.stop-timeout is not set to something less than 5 seconds I regularly get the following exception.

Futures timed out after [5 seconds]
java.util.concurrent.TimeoutException: Futures timed out after [5 seconds]
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:259)
	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263)
	at scala.concurrent.Await$.$anonfun$result$1(package.scala:219)
	at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:57)
	at scala.concurrent.Await$.result(package.scala:146)
	at akka.stream.testkit.scaladsl.StreamTestKit$.printDebugDump(StreamTestKit.scala:68)
	at akka.stream.testkit.scaladsl.StreamTestKit$.$anonfun$assertNoChildren$1(StreamTestKit.scala:56)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at akka.testkit.TestKitBase.within(TestKit.scala:360)
	at akka.testkit.TestKitBase.within$(TestKit.scala:348)
	at akka.testkit.TestKit.within(TestKit.scala:896)
	at akka.testkit.TestKitBase.within(TestKit.scala:374)
	at akka.testkit.TestKitBase.within$(TestKit.scala:374)
	at akka.testkit.TestKit.within(TestKit.scala:896)
	at akka.stream.testkit.scaladsl.StreamTestKit$.assertNoChildren(StreamTestKit.scala:47)
	at akka.stream.testkit.scaladsl.StreamTestKit$.assertAllStagesStopped(StreamTestKit.scala:31)
	at akka.kafka.scaladsl.IntegrationSpec.$anonfun$new$2(IntegrationSpec.scala:43)
	at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:1078)
	at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
	at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
	at akka.kafka.scaladsl.SpecBase.withFixture(SpecBase.scala:13)
	at org.scalatest.WordSpecLike.invokeWithFixture$1(WordSpecLike.scala:1076)
	at org.scalatest.WordSpecLike.$anonfun$runTest$1(WordSpecLike.scala:1088)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
	at org.scalatest.WordSpecLike.runTest(WordSpecLike.scala:1088)
	at org.scalatest.WordSpecLike.runTest$(WordSpecLike.scala:1070)
	at akka.kafka.scaladsl.SpecBase.runTest(SpecBase.scala:13)
	at org.scalatest.WordSpecLike.$anonfun$runTests$1(WordSpecLike.scala:1147)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:396)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:373)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:410)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:379)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
	at org.scalatest.WordSpecLike.runTests(WordSpecLike.scala:1147)
	at org.scalatest.WordSpecLike.runTests$(WordSpecLike.scala:1146)
	at akka.kafka.scaladsl.SpecBase.runTests(SpecBase.scala:13)
	at org.scalatest.Suite.run(Suite.scala:1147)
	at org.scalatest.Suite.run$(Suite.scala:1129)
	at akka.kafka.testkit.scaladsl.ScalatestKafkaSpec.org$scalatest$BeforeAndAfterAll$$super$run(ScalatestKafkaSpec.scala:11)
	at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
	at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
	at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
	at akka.kafka.scaladsl.SpecBase.org$scalatest$WordSpecLike$$super$run(SpecBase.scala:13)
	at org.scalatest.WordSpecLike.$anonfun$run$1(WordSpecLike.scala:1192)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
	at org.scalatest.WordSpecLike.run(WordSpecLike.scala:1192)
	at org.scalatest.WordSpecLike.run$(WordSpecLike.scala:1190)
	at akka.kafka.scaladsl.SpecBase.run(SpecBase.scala:13)
	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1346)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1340)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1340)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:1031)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:1010)
	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1506)
	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
	at org.scalatest.tools.Runner$.run(Runner.scala:850)
	at org.scalatest.tools.Runner.run(Runner.scala)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:131)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)

Notes

I realize such timing issues can have different results depending on an invidiual machine's resources, but given this configuration is set by default for the Alpakka Kafka tests, and that it's easy to recreate the same errors without the config in a sample project I think it might make sense to include these defaults to users using the testkit.

I suggest including this default config when users run tests using the Alpakka Kafka Testkit, or to highlight this config in the documentation.

seglo avatar Apr 16 '19 17:04 seglo

Thank you for this suggestion. The resolution of config doesn't allow us to put an application.conf to overwrite other settings in Alpakka Kafka. I've however added some text to the documentation about stop-timeout: https://github.com/akka/alpakka-kafka/blob/master/docs/src/main/paradox/testing.md#testing-from-java-code

ennru avatar Apr 30 '19 09:04 ennru

@ennru Right, I thought that might be a problem.

Those docs are a big improvement! But I think it's still a little vague about the two suggestions I made in this issue. I suspect most users developing new integration tests a la Alpakka Kafka tests style will run into these exact two issues every time, because there's no inheritance of configuration that works. Do you mind if I could write a PR to include more details in the testing section of the docs?

seglo avatar Apr 30 '19 20:04 seglo

Configuration can be inherited on config level:

MyConsumerConfigInTests: ${akka.kafka.consumer} {
  stop-timeout = 2s
}

But please, go ahead and suggest improvements to the docs.

ennru avatar May 02 '19 06:05 ennru