Document that takeWhile can be used as "complete this stream when X comes in"
I've found myself needing this many times before, and have needed it on two separate occasions today. When a certain message passes through my flow, I want the flow to complete.
An example use case is a flow for a chat room (implemented using a merge sink), if a leave chat room message passes through the flow, you want to terminate that flow. The sender may not be able to do that, in my specific use case, a user has a single WebSocket connection that multiplexes messages for multiple chat rooms that they may currently be in. The flow is demultiplexed by a broadcast source, filtered for each room, and then sent to merge sinks for each room. The sender of the leave message doesn't want to close the WebSocket after sending the leave chat room message, so can't close the flow, it has to be closed by something in the flow post demux to dynamically remove that flow from the broadcast.
Example implementation:
private def completeWhen[T](predicate: T => Boolean): Flow[T, T, NotUsed] = Flow.fromGraph(new GraphStage[FlowShape[T, T]] {
val in = Inlet[T]("CompleteWhen.in")
val out = Outlet[T]("CompleteWhen.out")
override def shape = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush() = {
grab(in) match {
case complete if predicate(complete) =>
completeStage()
case other =>
push(out, other)
}
}
})
setHandler(out, new OutHandler {
override def onPull() = pull(in)
})
}
})
In principle, I +1 this. When working on actual apps one quickly gets to the point where such operation is needed and I've felt it myself too.
Observation though: Technically completeWhen is takeWhile(t => test(t)), however I totally see people missing that and a completeWhen would be helpful.
Having that said: I'm torn between including it and not...
- as it is exactly as
takeWhile, but with better names - so-1. - But I really saw myself and others be struggle and be confused about how to do this, until we noticed it's doable with a take - so
+1... - Maintenance cost of such alias is rather low I guess, because we would just delegate to
takeWhile.+0
Other opinions?
I completely missed that. And I think there's now an inclusive version of takeWhile to implement completeAfter isn't there?
The resolution to this issue may be the issue itself, now that the issue is here, Google and GitHub issues will find it, which would have been enough for me to find it.
would it be enough to clarify with docs, or perhaps the problem is only finding takeWhile so adding docs there would not help
The resolution to this issue may be the issue itself, now that the issue is here, Google and GitHub issues will find it, which would have been enough for me to find it.
I'm So Meta, Even This Acronym... :-)
I also checked other libraries and they don't expose an explicit "completeWhen", they all ride on the takeWhile / takeUntil semantics. Thus, I'd say it's rather a matter of getting used to the APIs (and it's not just us, I mean the general style of APIs), and we can add documentation about it but now I'm convinced to not add additional API.
Semantics reference for people finding this issue:
Yes, to complete after the final element has arrived (0 [1] 2 3 => x 1 2 3):
completeAfter(test) == takeUntil(test, inclusive = true)
and to complete once that element arrives (0 [1] 2 3 => x 2 3):
completeWhen(test) == takeUntil(test, inclusive = false)
Sorry, I did not mean to close - let's add some docs around this, in reference documentation maybe, and then close this.
Flux has takeUntil takeWhile skipUntil skipWhile
just found this on SOF https://stackoverflow.com/questions/63153839/how-to-stop-processing-further-elements-in-akka-streams