Add Flow#collectNonNull operator
This operator is been used in Spring ai, and I think it's actually useful than a map to Optional/Option and a then filter/collect
@raboof @mdedetrich @pjfanning wdyt, If you think this is ok, I will work on this weekend.
Not sure that this makes sense as null as an element is not allowed in streams as per the reactive specification. If you send null in a Stream then it terminates immediately. In other words stream elements always have to be not null unless your desire is to explicitly terminate it.
That's why for Kotlin use of pekko streams you have to use Java Optional and not ?.
I don't find the name very clear - I had to google it and found https://kotlinlang.org/api/core/kotlin-stdlib/kotlin.collections/map-not-null.html (I didn't find it in Spring AI).
Couldn't you achieve this with a single collect, as that allows a partial function?
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#mapNotNull-java.util.function.Function- That true, @raboof yes, but this is more for a Java api.
And the use case in Spring ai https://github.com/spring-projects/spring-ai/blob/main/spring-ai-client-chat/src/main/java/org/springframework/ai/chat/client/DefaultChatClient.java#L589
@mdedetrich This operator will make avoid something like Source.map(Optional.ofNullable(...)).filter(Optional::isPresent).map(Optional::get)
It's an interesting observation that https://pekko.apache.org/japi/pekko/snapshot/org/apache/pekko/stream/javadsl/Flow.html#collect(scala.PartialFunction) is present on the Java API, but uses scala.PartialFunction.
We do supply some helper methods to make it easier to create Scala partial functions from Java (https://pekko.apache.org/docs/pekko/current/stream/operators/Source-or-Flow/collect.html), but I see how this is a bit icky.
For 2.0.0, it might be interesting to revisit if we want to have a replacement for Java. "A regular Java function, and when it returns 'null' that means it is not defined for that input" might be an interesting option, if the reactive spec indeed specifies null can never be an 'element' value. Perhaps we should look at other places where we use partial functions in Java APIs to see if that would be an option.
It's interesting that Flux also has something called 'collect', but that is more like a 'Sink'.
I like your idea, maybe a collectNonNull when it returns null, just dropped, especially with Java 21.
@mdedetrich This operator will make avoid something like
Source.map(Optional.ofNullable(...)).filter(Optional::isPresent).map(Optional::get)
Yes but how is this even going to work? On a fundamental level, pekko streams can never have null as an element, see https://doc.akka.io/libraries/akka-core/current/stream/stream-flows-and-basics.html#illegal-stream-elements.
I don't think this is even possible
It's possible, the null will just be ignored for some operator, and trigger another pull(in) if the upstream is not completed and not been pulled yet. just as the mapAsync operator, where when you return a CompletionStage<T> but the value is null, then the null is just dropped and trigger another pull(in), you can check the scaladoc of mapAsync.
I think we can have a special operator, which just drop the null value, when working with Java 21 's patten matching, it works like a charm
It's possible, the
nullwill just be ignored for some operator, and trigger anotherpull(in)if the upstream is not completed and not been pulled yet. just as themapAsyncoperator, where when you return aCompletionStage<T>but the value is null, then thenullis just dropped and trigger anotherpull(in), you can check the scaladoc of mapAsync.I think we can have a special operator, which just drop the
nullvalue, when working with Java 21 's patten matching, it works like a charm
The entire pekko-stream codebase is all designed with the assumption that null is never going to be any element. This is not something thats easy to change, and I personally wouldn't accept such an operator until pekko streams in general works with null elements, which could be something for 2.0.0 but this is a massive change
There are way too many edge cases and surprising behaviour otherwise, and I have personally spent days/weeks debugging streams due to a single stranded null element being sent somewhere. The way that null behaves also differs completely depending on which streaming operator.
Illegal means illegal, the entire codebase is based on the assumption that it should never exist.
And don't get me wrong, I would love pekko streams to work with null (it would make it much more ergonomic for kotlin users who idiomatically don't use Java Optional) but making null work with pekko streams is HUGE change
The entire pekko-stream codebase is all designed with the assumption that
nullis never going to be any element
As far as I understand, the operator proposed in the issue would indeed not have null as an element, but use it as a marker to indicate the 'partial' function does not produce a result to be put on the stream for the current input.
The entire pekko-stream codebase is all designed with the assumption that
nullis never going to be any elementAs far as I understand, the operator proposed in the issue would indeed not have
nullas an element, but use it as a marker to indicate the 'partial' function does produce a result to be put on the stream for the current input.
That might be the case, but it would be an anomaly as in all other places with pekko streams, even if we are not dealing with an element in a stream, Optional.empty is used to represent an absent value and not null (i.e. partition/collect/filter like operators)
And I believe that the reason behind this is deliberate, such a design is really meant to hone in "don't use null" when working with pekko streams and having some exception methods that work with null in some way just confuses users.
The entire pekko-stream codebase is all designed with the assumption that
nullis never going to be any elementAs far as I understand, the operator proposed in the issue would indeed not have
nullas an element, but use it as a marker to indicate the 'partial' function does produce a result to be put on the stream for the current input.That might be the case, but it would be an anomaly as in all other places with pekko streams, even if we are not dealing with an element in a stream,
Optional.emptyis used to represent an absent value and notnull(i.e. partition/collect/filter like operators)
Partition and filter don't use Optional. I agree we should not have both the current 'collect' and new method that uses null instead.
And I believe that the reason behind this is deliberate, such a design is really meant to hone in "don't use null" when working with pekko streams and having some exception methods that work with
nullin some way just confuses users.
That's possible, but changing this in 2.x Java API should at least be on the table to discuss.
Partition and filter don't use Optional. I agree we should not have both the current 'collect' and new method that uses null instead.
I don't remember the exact methods on hand but https://pekko.apache.org/docs/pekko/1.1/stream/operators/RetryFlow/withBackoff.html is an example of what I mean. I don't ever remember a single instance of null ever being used to represent an absence of value in pekko streams, its always Optional.empty
And I believe that the reason behind this is deliberate, such a design is really meant to hone in "don't use null" when working with pekko streams and having some exception methods that work with
nullin some way just confuses users.That's possible, but changing this in 2.x Java API should at least be on the table to discuss.
Yes, but is a big change and as such we should be quite deliberate and careful whether we do this. Personally and for the reasons I stated, I am already on the no side of the fence. Yes its annoying that you have to convert between null and Optional but it really makes it clear that when in pekko/akka/reactive streams land, just don't use null.
@mdedetrich This api is not making null passing through the stream , but filtering the null value out, I think it's quite useful, clear, and high performance. why should a user wrap the result value into an optional and then unwrap that, boring.
ZIO has something like this too.
Your idea is the same as the steering wheel must be round, but some cars sell well even though their steering wheels are not round, even you love Kotlin has a mapNotNull.
holder.elem match {
case Success(elem) =>
if (elem != null) {
push(out, elem)
pullIfNeeded()
} else {
// elem is null
pullIfNeeded()
pushNextIfPossible()
}
You can see, in mapAsync , when the result value is null, it just drop it and pull upstream again, does that wrong? here is the same, but more explicitly with mapNotNull/collectNotNull
And If you think this is a bad name, we can make it a Java-only api I think this name is pretty great, so does collectNonNull
@mdedetrich This api is not making
nullpassing through the stream , but filtering thenullvalue out, I think it's quite useful, clear, and high performance. why should a user wrap the result value into an optional and then unwrap that, boring.
I explained why, it makes the API very inconsistent (again look at RetryFlow.withBackoff as an example) and it makes it very easy to create broken streams. It could give users the false impression that null is safe to use in all contexts of pekko streams when it is not, and that makes it very easy to do things like create stream of elements that contain null's and then pass it to your suggested Flow#mapNotNull operator (because that would be very natural thing to do) and then things go boom.
ZIO has something like this too.
ZIO doesn't have the limitation that pekko/akka/reactive streams does, they don't care if you use null elements in a Stream so in the context of ZIO there are no inconsistencies here
And If you think this is a bad name, we can make it a Java-only api I think this name is pretty great, so does
collectNonNull
Name could be better (I vastly prefer collectNonNull), but the main issue is that the core concept is completely alien/foreign to all of akka/pekko streams right now.
Your idea is the same as the steering wheel must be round, but some cars sell well even though their steering wheels are not round, even you love Kotlin has a
mapNotNull.
No thats not what I am saying, I am saying that this suggestion makes it very easy for users to create incorrect/broken streams by accident because it gives them a false impression of whats allowed. I really wish that akka/pekko/reactive streams didn't have this limitation, I have to deal with this all the time in Kotlin but the reactive/java stream specification was made at a time where using null was considered an anti pattern and Java's Optional was the pushed as the idiomatic way to handle optional values. This was also at a time when Kotlin didn't or barely existed.
holder.elem match { case Success(elem) => if (elem != null) { push(out, elem) pullIfNeeded() } else { // elem is null pullIfNeeded() pushNextIfPossible() } You can see, in mapAsync , when the result value is
null, it just drop it andpullupstream again, does that wrong? here is the same, but more explicitly withmapNotNull/collectNotNull
If it does that, its either illegal (see https://github.com/reactive-streams/reactive-streams-jvm#2.13 and hence not following the reactive/java streams specification) or its an internal optimization detail thats not publicly visible to users.
The spec is quite clear, null elements are not allowed
But this api will never pass the null to the downstream, just filter out the null result. Java Optional is an allocation, so why bother, we will never have null in the stream.
The Void will nonly have a null as it's instance, think about the Optional<Void> and CompletionStage<Void>, does it have an element? yes, the element is null a valid value of the Void.
But this api will never pass the null to the downstream, just filter out the
nullresult. JavaOptionalis an allocation, so why bother, we will never have null in the stream. TheVoidwill nonly have anullas it's instance, think about theOptional<Void>andCompletionStage<Void>, does it have an element? yes, the element isnulla valid value of theVoid.
We are just going around in circles here, it would be better to make a github discussion to talk about the problem more generally and also get opinions from others.
the spec say the call to onNext can not be null, so when we filter out the null, the next call to downstream's onNext still not null, how it broken the spec?
the spec say the call to
onNextcan not benull, so when we filter out thenull, the next call to downstream'sonNextstill notnull, how it broken the spec?
Thats not the point, can we please move this to a discussion
I don't know why this is a problem, when both rxjava reactor have this and with very clear doc and svg for how it works.
Even spring ai and other cool libraries are using it to solve real world problems, you still think it break the specs, they must have though it very well before adding it
https://github.com/apache/pekko/pull/2414
@mdedetrich FYI
@mdedetrich I think the only problem with mapOption is it will cause some allocation.