pekko
pekko copied to clipboard
Feature request: Add Flow#switchMap operator.
Motivation:
I find that Pekko stream doen't has the switchmap operator, it's called as this in both fs2 and flux, I think we need to take the same name and same semantic.
Usercase:
A Stream of Post, eg twitter/X 's Post, and every Post has a Stream of Comments.
[Post 1] ---> [Comment 1] ... [Comment 2] ... [Comment 3] ...
.
.
.
[Post 2] ---> [Comment 1] ... [Comment 2] ...
.
.
.
[Post 3] ---> [Comment 1] ... [Comment 2] ... [Comment 3] ... [Comment 4]..
.
.
.
\|/ User scrolling down.
Emits when: sub stream emits and downstream is available Backpressures when: sub stream backpressure. cancels when: down stream cancels completes when: upstream completes and the current substream completes.
In some implementation, the upstream is never backpressured, I think that's not safe. I think the upstream should be backpressured if there is no downstream demands.
Result: We have rich operators set.
@pjfanning @mdedetrich what do you think about his one, I find it would be nice for live streaming.
Implementation this shold not be that hard but seems no much interest, will defer to later version. it's a kind of stream of streams
I think we have enough on our plate right now with all of the new features, we can always add this into 1.1.0-M2 (if a new M2 is needed) or in 1.2.x.
I will back my hometown soon, if anyone submited a pr about this, then this can be added to 1.1.0-M2 I think, before that, it's safe for later version.
I was expecting we stating remove code in 1.2.x but not adding features.
I just update what I thought about this operator.
refs: https://github.com/apache/pekko/issues/1660
https://github.com/spring-projects/spring-ai/blob/3c539a37a38e009e11e2a8870a1708787831069b/models/spring-ai-anthropic/src/main/java/org/springframework/ai/anthropic/AnthropicChatModel.java#L283-L288
Flux<ChatResponse> chatResponseFlux = response.switchMap(chatCompletionResponse -> {
It's been used in spring-ai