pekko icon indicating copy to clipboard operation
pekko copied to clipboard

Feature request: Add Flow#switchMap operator.

Open He-Pin opened this issue 1 year ago • 4 comments

Motivation: I find that Pekko stream doen't has the switchmap operator, it's called as this in both fs2 and flux, I think we need to take the same name and same semantic.

Usercase: A Stream of Post, eg twitter/X 's Post, and every Post has a Stream of Comments.

[Post 1] ---> [Comment 1] ... [Comment 2] ... [Comment 3] ...
.
.
.
[Post 2] ---> [Comment 1] ... [Comment 2] ... 
.
.
.
[Post 3] ---> [Comment 1] ... [Comment 2] ... [Comment 3] ... [Comment 4]..
.
.
.
\|/ User scrolling down.

Emits when: sub stream emits and downstream is available Backpressures when: sub stream backpressure. cancels when: down stream cancels completes when: upstream completes and the current substream completes.

In some implementation, the upstream is never backpressured, I think that's not safe. I think the upstream should be backpressured if there is no downstream demands.

Result: We have rich operators set.

He-Pin avatar Jan 17 '24 15:01 He-Pin

@pjfanning @mdedetrich what do you think about his one, I find it would be nice for live streaming.

He-Pin avatar Jan 19 '24 16:01 He-Pin

Implementation this shold not be that hard but seems no much interest, will defer to later version. it's a kind of stream of streams

He-Pin avatar Jan 24 '24 03:01 He-Pin

I think we have enough on our plate right now with all of the new features, we can always add this into 1.1.0-M2 (if a new M2 is needed) or in 1.2.x.

mdedetrich avatar Jan 24 '24 03:01 mdedetrich

I will back my hometown soon, if anyone submited a pr about this, then this can be added to 1.1.0-M2 I think, before that, it's safe for later version.

I was expecting we stating remove code in 1.2.x but not adding features.

I just update what I thought about this operator.

He-Pin avatar Jan 24 '24 03:01 He-Pin

refs: https://github.com/apache/pekko/issues/1660

He-Pin avatar Jan 03 '25 13:01 He-Pin

https://github.com/spring-projects/spring-ai/blob/3c539a37a38e009e11e2a8870a1708787831069b/models/spring-ai-anthropic/src/main/java/org/springframework/ai/anthropic/AnthropicChatModel.java#L283-L288

Flux<ChatResponse> chatResponseFlux = response.switchMap(chatCompletionResponse -> {

It's been used in spring-ai

He-Pin avatar Jan 07 '25 07:01 He-Pin