He-Pin(kerr)
He-Pin(kerr)
After I looked into fs and zio stream and reactor-core I found some operators which would be nice to be added to akka stream: link to https://rxjs.dev/api too. - zipWithLeft...
If you write a filter with collect, then it will failed with : ```scala "complete without demand if remaining elements are filtered out with collect" in { Source(1 to 1000).collect({...

Which will help distinguishing with the `expectCancellationWithCause` one.
Sometime a graphStage can be failed with an `AbruptTerminationException` or `AbruptStageTerminationException`, add a common trait for them can be easy to write test for abrupt termination.
``` [09-14 04:46:23.868] [info] - must use dedicated blocking-io-dispatcher by default *** FAILED *** (34 milliseconds) [09-14 04:46:23.868] [info] java.lang.AssertionError: Expected Actor[akka://MapWithResourceSpec/system/Materializers/StreamSupervisor-1176/flow-8-2-mapWithResource#1912087400] to use dispatcher [akka.stream.materializer.blocking-io-dispatcher], yet used: [akka.actor.default-dispatcher] [09-14...
I think these class should be marked as `final class`.
`Source.elements` will be easy to use than the current `Source.from(Arrays.asList(...))` In Flux we can use `Flux.fromArray` and `Flux.just(T...t)` refs:https://github.com/akka/akka/issues/17923 refs: https://github.com/akka/akka/pull/17950 refs: https://github.com/scala/bug/issues/8743 ```java ```
Just a like `.recoverWithRetries(-1, { case _: Throwable => Source.empty })` which recover the error with a complete. And in fs2 there is a `mask` method: ```scala def mask: Stream[F,...