cloudflow icon indicating copy to clipboard operation
cloudflow copied to clipboard

It is not easy to use AkkaStreamletTestKit.outletToSink

Open vkorenev opened this issue 4 years ago • 3 comments

Is your feature request related to a problem? Please describe.

I'm trying to collect all messages sent to an outlet. Scaladoc for AkkaStreamletTestKit suggests that I can use outletToSink:

This method can be used to for instance quickly collect all output produced into a simple sequence using Sink.seq[T].

However, the type signature of outletToSink requires a Sink[..., NotUsed]. So this will not compile:

val testkit = AkkaStreamletTestKit(system)
val tap = testkit.outletToSink(streamlet.out, Sink.seq)

Replacing the materialized value will make the code compile:

val tap = testkit.outletToSink(streamlet.out, Sink.seq.mapMaterializedValue(_ => NotUsed))

However, most of the sinks return something useful as their materialized value. So using them with outletToSink does not make much sense.

Describe the solution you'd like

Allow outletToSink to accept a Sink with arbitrary materialized value type. Provide access to that materialized value.

vkorenev avatar Feb 20 '21 01:02 vkorenev

Update: I have found a way to get the materialized value by using Sink.preMaterialize()

val (messages, sink) = Sink.seq[(String, MyMessage)].preMaterialize()
val out = testkit.outletToSink(streamlet.out, sink)

Probably, it would be nice to mention this approach in the docs.

However, there is another problem. The collection which I get always contains only one message.

vkorenev avatar Feb 20 '21 02:02 vkorenev

Update 2: The problem with the sink receiving only one message seems to be related to the fact that I'm using write on WritableSinkRef to send a message. If I use plainSink, all the messages are delivered to the sink without problems.

vkorenev avatar Feb 20 '21 02:02 vkorenev

We can probably improve the documentation in that direction, please feel free to propose a PR and we make sure to check it quickly!

It's not really clear to me the context of "Update 2", do you want to share a little repro so that we can check what's going on?

andreaTP avatar Feb 22 '21 13:02 andreaTP