cloudflow
cloudflow copied to clipboard
It is not easy to use AkkaStreamletTestKit.outletToSink
Is your feature request related to a problem? Please describe.
I'm trying to collect all messages sent to an outlet. Scaladoc for AkkaStreamletTestKit
suggests that I can use outletToSink
:
This method can be used to for instance quickly collect all output produced into a simple sequence using
Sink.seq[T]
.
However, the type signature of outletToSink
requires a Sink[..., NotUsed]
. So this will not compile:
val testkit = AkkaStreamletTestKit(system)
val tap = testkit.outletToSink(streamlet.out, Sink.seq)
Replacing the materialized value will make the code compile:
val tap = testkit.outletToSink(streamlet.out, Sink.seq.mapMaterializedValue(_ => NotUsed))
However, most of the sinks return something useful as their materialized value. So using them with outletToSink
does not make much sense.
Describe the solution you'd like
Allow outletToSink
to accept a Sink
with arbitrary materialized value type. Provide access to that materialized value.
Update:
I have found a way to get the materialized value by using Sink.preMaterialize()
val (messages, sink) = Sink.seq[(String, MyMessage)].preMaterialize()
val out = testkit.outletToSink(streamlet.out, sink)
Probably, it would be nice to mention this approach in the docs.
However, there is another problem. The collection which I get always contains only one message.
Update 2:
The problem with the sink receiving only one message seems to be related to the fact that I'm using write
on WritableSinkRef
to send a message. If I use plainSink
, all the messages are delivered to the sink without problems.
We can probably improve the documentation in that direction, please feel free to propose a PR and we make sure to check it quickly!
It's not really clear to me the context of "Update 2", do you want to share a little repro so that we can check what's going on?