akka icon indicating copy to clipboard operation
akka copied to clipboard

Document that takeWhile can be used as "complete this stream when X comes in"

Open jroper opened this issue 8 years ago • 8 comments

I've found myself needing this many times before, and have needed it on two separate occasions today. When a certain message passes through my flow, I want the flow to complete.

An example use case is a flow for a chat room (implemented using a merge sink), if a leave chat room message passes through the flow, you want to terminate that flow. The sender may not be able to do that, in my specific use case, a user has a single WebSocket connection that multiplexes messages for multiple chat rooms that they may currently be in. The flow is demultiplexed by a broadcast source, filtered for each room, and then sent to merge sinks for each room. The sender of the leave message doesn't want to close the WebSocket after sending the leave chat room message, so can't close the flow, it has to be closed by something in the flow post demux to dynamically remove that flow from the broadcast.

jroper avatar Jun 30 '17 13:06 jroper

Example implementation:

  private def completeWhen[T](predicate: T => Boolean): Flow[T, T, NotUsed] = Flow.fromGraph(new GraphStage[FlowShape[T, T]] {
    val in = Inlet[T]("CompleteWhen.in")
    val out = Outlet[T]("CompleteWhen.out")
    override def shape = FlowShape(in, out)
    override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
      setHandler(in, new InHandler {
        override def onPush() = {
          grab(in) match {
            case complete if predicate(complete) =>
              completeStage()
            case other =>
              push(out, other)
          }
        }
      })
      setHandler(out, new OutHandler {
        override def onPull() = pull(in)
      })
    }
  })

jroper avatar Jun 30 '17 13:06 jroper

In principle, I +1 this. When working on actual apps one quickly gets to the point where such operation is needed and I've felt it myself too.

Observation though: Technically completeWhen is takeWhile(t => test(t)), however I totally see people missing that and a completeWhen would be helpful.

Having that said: I'm torn between including it and not...

  • as it is exactly as takeWhile, but with better names - so -1.
  • But I really saw myself and others be struggle and be confused about how to do this, until we noticed it's doable with a take - so +1...
  • Maintenance cost of such alias is rather low I guess, because we would just delegate to takeWhile. +0

Other opinions?

ktoso avatar Jun 30 '17 14:06 ktoso

I completely missed that. And I think there's now an inclusive version of takeWhile to implement completeAfter isn't there?

jroper avatar Jun 30 '17 14:06 jroper

The resolution to this issue may be the issue itself, now that the issue is here, Google and GitHub issues will find it, which would have been enough for me to find it.

jroper avatar Jun 30 '17 14:06 jroper

would it be enough to clarify with docs, or perhaps the problem is only finding takeWhile so adding docs there would not help

patriknw avatar Jun 30 '17 14:06 patriknw

The resolution to this issue may be the issue itself, now that the issue is here, Google and GitHub issues will find it, which would have been enough for me to find it.

I'm So Meta, Even This Acronym... :-)

I also checked other libraries and they don't expose an explicit "completeWhen", they all ride on the takeWhile / takeUntil semantics. Thus, I'd say it's rather a matter of getting used to the APIs (and it's not just us, I mean the general style of APIs), and we can add documentation about it but now I'm convinced to not add additional API.


Semantics reference for people finding this issue:

Yes, to complete after the final element has arrived (0 [1] 2 3 => x 1 2 3):

completeAfter(test) == takeUntil(test, inclusive = true)

and to complete once that element arrives (0 [1] 2 3 => x 2 3):

completeWhen(test)  == takeUntil(test, inclusive = false)

ktoso avatar Jul 01 '17 08:07 ktoso

Sorry, I did not mean to close - let's add some docs around this, in reference documentation maybe, and then close this.

ktoso avatar Jul 01 '17 08:07 ktoso

Flux has takeUntil takeWhile skipUntil skipWhile

He-Pin avatar Sep 04 '22 18:09 He-Pin

just found this on SOF https://stackoverflow.com/questions/63153839/how-to-stop-processing-further-elements-in-akka-streams

He-Pin avatar Sep 26 '22 13:09 He-Pin