pekko icon indicating copy to clipboard operation
pekko copied to clipboard

Feature Request: Add Flow#concatAllDeferred operator.

Open He-Pin opened this issue 11 months ago • 5 comments

Motivation: The original issue is https://github.com/apache/pekko/pull/1623 and https://github.com/apache/pekko/discussions/1566 , which do help find some problems, but with how the current interpreter and concatAllLazy are implemented, we can not fix the problem.

refs: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#concat-java.lang.Iterable-

So a new operator is needed.

Modification: I would like to add a new operator concatAllDeferred to support this usage.

  def concatAllDeferred[U >: Out](those: Graph[SourceShape[U], _]*): Repr[U] =
    concatLazy(Source.lazySource(
      () => Source(those).flatMapConcat(ConstantFun.scalaIdentityFunction)))

Result:

package org.apache.pekko.stream.scaladsl

import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.util.ByteString

import scala.concurrent.Await

object PekkoQuickstart extends App {
  private implicit val system: ActorSystem = ActorSystem()

  val s = Source
    .repeat(())
    .map(_ => ByteString('a' * 400000))
    .take(1000000)
    .prefixAndTail(50000)
    .flatMapConcat { case (prefix, tail) => Source(prefix).concatLazy(tail) }

  val r = Source.empty
    .concatAllDeferred(List.tabulate(30000)(_ => s): _*)
    .runWith(Sink.ignore)

  Await.result(r, scala.concurrent.duration.Duration.Inf)
  println(r.value)

  //  Source
  //    .repeat(s)
  //    .take(30000)
  //    .flatMapConcat(x => x)
  //    .runWith(Sink.ignore)
  //    .onComplete(println(_))

  //  Source.empty
  //    .concatAllLazy(List.tabulate(30000)(_ => Source.lazySource(() => s)): _*)
  //    .runWith(Sink.ignore).onComplete(println(_))
}

runs without problem

image

He-Pin avatar Dec 31 '24 11:12 He-Pin

Is adding a new operator for this worth it? Since it's just almost the same as calling flatMapConcat directly?

For more context, this is only really a problem when using the graphDSL and having to connect different these kinds of flows into a many-port Concat stage.

queimadus avatar Dec 31 '24 12:12 queimadus

For easy usage, I think that's ok, but at least, we need to update the doc about this, the current two pending PRs are still valid. And it help me find some problems in the interpreter too.

He-Pin avatar Dec 31 '24 12:12 He-Pin

@raboof wdyt about this, I think this is the only way for @queimadus 's user case, or we can add some documents about that.

This method should be handy.

He-Pin avatar Jan 02 '25 14:01 He-Pin

Hmm, this seems like quite a corner case.

If you're using Streams like this, you'll have to be quite aware of those intricacies already. At that point perhaps it isn't too much to ask the user to combine concatLazy, lazySource and flatMapConcat (would flatten work here?). Adding concatAllDeferred might make things shorter, but I'm not sure it'd make things easier to understand. What would the scaladoc say - can we give a clear description of in what kinds of situations you'd use this function?

raboof avatar Jan 02 '25 16:01 raboof

eg: the concatAllLazy will materialize all graphs before they are pulled and keep them in memory until the resulting graph completes, if you have large numbers of graphs that can not be what you want, try concatAllDeferred.

At least it took me many hours to find the true problem.

He-Pin avatar Jan 02 '25 16:01 He-Pin