Feature Request: Add Flow#concatAllDeferred operator.
Motivation:
The original issue is https://github.com/apache/pekko/pull/1623 and https://github.com/apache/pekko/discussions/1566 ,
which do help find some problems, but with how the current interpreter and concatAllLazy are implemented, we can not fix the problem.
refs: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#concat-java.lang.Iterable-
So a new operator is needed.
Modification:
I would like to add a new operator concatAllDeferred to support this usage.
def concatAllDeferred[U >: Out](those: Graph[SourceShape[U], _]*): Repr[U] =
concatLazy(Source.lazySource(
() => Source(those).flatMapConcat(ConstantFun.scalaIdentityFunction)))
Result:
package org.apache.pekko.stream.scaladsl
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.util.ByteString
import scala.concurrent.Await
object PekkoQuickstart extends App {
private implicit val system: ActorSystem = ActorSystem()
val s = Source
.repeat(())
.map(_ => ByteString('a' * 400000))
.take(1000000)
.prefixAndTail(50000)
.flatMapConcat { case (prefix, tail) => Source(prefix).concatLazy(tail) }
val r = Source.empty
.concatAllDeferred(List.tabulate(30000)(_ => s): _*)
.runWith(Sink.ignore)
Await.result(r, scala.concurrent.duration.Duration.Inf)
println(r.value)
// Source
// .repeat(s)
// .take(30000)
// .flatMapConcat(x => x)
// .runWith(Sink.ignore)
// .onComplete(println(_))
// Source.empty
// .concatAllLazy(List.tabulate(30000)(_ => Source.lazySource(() => s)): _*)
// .runWith(Sink.ignore).onComplete(println(_))
}
runs without problem
Is adding a new operator for this worth it? Since it's just almost the same as calling flatMapConcat directly?
For more context, this is only really a problem when using the graphDSL and having to connect different these kinds of flows into a many-port Concat stage.
For easy usage, I think that's ok, but at least, we need to update the doc about this, the current two pending PRs are still valid. And it help me find some problems in the interpreter too.
@raboof wdyt about this, I think this is the only way for @queimadus 's user case, or we can add some documents about that.
This method should be handy.
Hmm, this seems like quite a corner case.
If you're using Streams like this, you'll have to be quite aware of those intricacies already. At that point perhaps it isn't too much to ask the user to combine concatLazy, lazySource and flatMapConcat (would flatten work here?). Adding concatAllDeferred might make things shorter, but I'm not sure it'd make things easier to understand. What would the scaladoc say - can we give a clear description of in what kinds of situations you'd use this function?
eg: the concatAllLazy will materialize all graphs before they are pulled and keep them in memory until the resulting graph completes, if you have large numbers of graphs that can not be what you want, try concatAllDeferred.
At least it took me many hours to find the true problem.