cloudflow icon indicating copy to clipboard operation
cloudflow copied to clipboard

Streamlet is not stopped after Source is depleted in Tests

Open thomasschoeftner opened this issue 4 years ago • 1 comments

Hi,

I am logging this issue after conversation with @RayRoestenburg via gitter: https://gitter.im/lightbend/cloudflow?at=5ebbf38a7312422a87e1fb81

Describe the bug When testing Streamlets without Outlets - essentially an egress - the streamlet is not stopped after the source is depleted. The underlying akka-stream, however, seems to be closed. - I checked this by attaching a TestProbe to the sink in the Streamlet and the probe emits its completed message successfully.

This behavior leads to the problem that testing such an egress requires an explicit wait-time (not merely a timeout) so the streamlet does all the processing and we can validate the results. This wait time is

  • either hard-coded via Thread.sleep(ugly) or
  • solved by replacing the Streamlet's Sinkwith another sink that also emits all messages to a TestProbe (also injected during the test) which is used to fish for the completed message.

To Reproduce A complete sample with a simple-enough scenario for trying/testing this is available here https://github.com/thomasschoeftner/cloudflow-mocking-sample/ The last revsion in this repo holds the version where the TestProbe is injected. - The previous revision shows the variant with active waiting (Thread.sleep) in the Spec.

Expected behavior I would assume a Streamlet in testing-context to stop, when the source attached to the inlet closes.

Additional context This issue was discovered when combining ScalaMock (via ScalaTest) with Streamlets. Since Mocks are evaluated at the end of the test (without obvious ways to evaluate them asynchronously via the TestKit.run function), it is necessary to wait until the Streamlet is done processing the Source. StreamletExecution.completed never returns (because StreamletExecution.stop is not explicitly called when using BaseAkkaStreamletTestKit.run[T](streamlet: AkkaStreamlet, ip: List[InletTap[_]], op: List[OutletTap[_]])).

Versions Happened with CloudFlow 1.3.2 on OpenJDK 1.8 on MacOS. This includes:

  • Scala 2.12.10
  • Akka 2.5.29
  • ScalaTest 3.0.8
  • ScalaMock 4.4.0

thomasschoeftner avatar May 15 '20 10:05 thomasschoeftner

Great issue writeup and sample to reproduce!

RayRoestenburg avatar May 15 '20 11:05 RayRoestenburg