kotlinx.coroutines
kotlinx.coroutines copied to clipboard
Parallel flow processing
It makes sense that Flow and all map operations are sequential by default, but it makes sense to allow some heavy operations to be performed in parallel on explicitly provided dispatcher.
I've implemented the feature based on existing flowOn implementation:
@FlowPreview
fun <T, R> Flow<T>.mapParallel(scope: CoroutineScope, bufferSize: Int = 16, transform: suspend (T) -> R) =
flow {
val currentContext = coroutineContext.minusKey(Job) // Jobs are ignored
coroutineScope {
val channel = produce(currentContext, capacity = bufferSize) {
collect { value ->
send(scope.async { transform(value) })
}
}
(channel as Job).invokeOnCompletion { if (it is CancellationException && it.cause == null) cancel() }
for (element in channel) {
emit(element.await())
}
//
// val producer = channel as Job
// if (producer.isCancelled) {
// producer.join()
// throw producer.getCancellationException()
// }
}
}
It seems to be working fine, but I can't use internal kotlinx,coroutines methods. I think that this use-case require a separate function in the library.
We do plan a set of separate functions for that and to close #172 with them. However, the key challenge here is to introduce a composable set of concurrency operators as opposed to specific operators like concurrentMap, which is hard to scale to other operators (concurrentFilter, concurrentFold, etc). Tentatively, it is going to look like this:
flow.concurrent(n) {
// inside here we can have a sequence of flow operations,
// each doing its own thing concurrently
// this: Flow<T> here similarly to `flowWith` operator
filter { ... }.map { ... }.filter { ... } // -> Flow<R>
} // -> Flow<Flow<R>>
Alternatively you can call concurrent without a lambda. This way you get Flow<Flow<T>> which you can map manually (so concurrent with lambda is just integrated with map):
// different way to write the same code as above
flow.concurrent(n).map {
// it: Flow<T> here
it.filter { ... }.map { ... }.filter { ... } // -> Flow<R>
} // -> Flow<Flow<R>>
Note, that the result is going to be Flow<Flow<R>>. Right now the only useful thing you can do on it is flattenMerge() to get Flow<R>. But what if you want to reduce/fold the result somehow -- that will require a sequence of invocations and will not parallelize properly. You can repeat reduction inside and outside, but that does not look pretty. A classical concurrent map-reduce pipeline would be written, for example, like this:
flow.concurrent(n) { // this: Flow<Input>
map(transform) // -> Flow<Mapped>
.reduce(reducer) // -> R
} // -> Flow<R>
.reduce(reducer) // we need to reduce again to get -> R
So that part of design for concurrent terminal operators is still TBD with map-reduce being the primary use-case.
The similar design problem we have with terminal concurrent collect:
flow.concurrent(n) {
collect { ... } // collect in `n` coroutines concurrently
}.collect {} // but must write an empty collect here to activate it all.
I see the idea, but parallel is not necessary concurrent. On the contrary, in my example, there is no concurrency. The key feature needed for parallel map is an ability to provide a place-holder ahead of time so when you actually call this place-holder, you trigger calculation, do not wait for result and move further. Now I understand that you use inner Flow in your examples as a placeholder. In my example it is eager Deferred.
Maybe we can modify your structure by replacing inner Flow by lazy Deferred? it would remove a lot of confusion and would allow more fine grained control of results. Then parallelMap will transform Flow<T> into Flow<Deferred<R>> and we can add extensions for that type or something like that. I will try to think about it later.
As for initial example, I work a lot with parallel data processing and map/reduce workflows. And basically the only operation that could be parallelized is map (safe for group-based reduce which could be represented as parallel map + non-parallel reduce). So it covers most of the use-cases.
@altavir Actually, it is the converse with the respect to the traditional distinction between concurrency and parallelism (it is quite a recent distinction, < 20 years old, but quite established by now).
Concurrent is not necessarily parallel. A mapParallel operator is concurrent, because different calls to transform happen concurrently with each other. Which means, for example, that if you update any kind of shared mutable state from inside of transform you'd run into a data race.
However, even if you take concurrent operator, it does not necessarily mean that it is going to actually run different calls to transform in parallel to each other. For example, if you run it on JS you'll get no parallelism whatsoever (since there is only a single thread), yet you can still have a lot of concurrency even on JS.
On JVM you can run your version mapParallel in a single-threaded dispatcher and while it provides concurrency between different calls to transform, there will no parallelism in a single-threaded dispatcher. You can get parallelism on JVM, though, by plugging a multi-threaded dispatcher into the context.
I won't argue with you about the terminology, especially remembering that significant fraction of my knowledge of it comes from your articles. Still, it does not change the fact that all we need for parallel processing is a way to start multiple computations ahead of time. I am trying to implement it via transforming a Flow into Flow of Deferred and batch starting those Deferred on collect. The code looks good from API point of view, but for some reason does not work right now. This function blocks on channel production stage and I am trying to figure out why.
The problem was in the conflict of lazy deferred with structured concurrency. I've fixed it by replacing deferred by my imitation. The working code is available here.
In theory, it should be possible to make concurrent collect without calling async in advance. One just have to create a number of placeholders (for example CompletableDeferred) and then fill and pop them to collector as soon as they come. I will think about it a little more. Maybe channel + switch or one channel for placeholders and the second for results.
Let's hold off a discussion on implementation strategy a bit. There are lots of ways to implement it. What are you trying to achieve in the end? You've started this thread with a proposal for mapParallel, but what is your end goal? What are you trying to build?
The particular case is a streaming mathematical processor with ability to parallel computations. In the version before the Flow I had Producers - generators, which could lazily create elements of some type (suspend receive) and Consumers working similar to actors (with suspend send). And I had Processors which implemented both interfaces. Also I had a mechanism to lock a Consumer on Producer forcing this specific Producer to send all its elements to Consumer (and no one else).
The typical application of such mechanism is the analysis of time series, when you have a very long block of numbers and you want to continuously run it through some chain of mathematical operations (like moving window, Fourier transform etc) and drop result somewhere without loading the whole block in the memory. Some of those operations could be parallelized. For example, one could split the incoming data into fixed chunks and then apply FFT to each chunk in parallel. Basically it is the same old map-reduce technology. And very similar to Java Stream processing (I would use Java Streams, but I want to implement it on MPP). Flow seems to be good replacement for my initial construct since next Flow always consumes previous one and the Flow does not start generating until final consumer is called. The internal sequential processing also probably allows to avoid context switching performance losses.
Actually, concurrentMap can be expressed via flatMapMerge without using any private API:
private fun <T, R> Flow<T>.concurrentMap(dispatcher: CoroutineDispatcher, concurrencyLevel: Int, bufferSize: Int, transform: suspend (T) -> R): Flow<R> {
return flatMapMerge(concurrencyLevel, bufferSize) { value ->
flow { emit(transform(value)) }
}.flowOn(dispatcher)
}
I think I'm debating the same need over on https://discuss.kotlinlang.org/t/must-go-fast-sequences-flows-bounded-channels/12494 - but with less knowledge of the various options than this thread, so I'm going to sit quietly and watch here instead. :)
Why I think my need is the same use case:
- Have a source. (In my case a video file that produces BufferedImages). The source is fast enough to not be the bottleneck if it is given a dedicated thread.
- Need to do a lot of math, and the math can be done in parallel. (distilling chunks of images down to a single frame)
- But too many in parallel "in flight" is bad (doesn't make sense to be grinding on more than I have cores, and I could run out of memory)
- The destination (encoding to a MP4) could slow things down, so again, don't have too many in-flight. (which brings up back-pressure)
- Even if we do some work in parallel, the order matters.
- Desire to have more steps as it gets more interesting.
And I have too many options and not enough knowledge if there is one that is "use this until we make the Kotlin canonical lib"
- Java Streams
- Kotlin Sequences
- Kotlin Flows
- Java Capacity-Bounded Channels (that block)
- Kotlin Capacity-Bounded Channels (that suspend)
- (any of the above) containing Deferred<BufferedImage>
@qwwdfsad Seems to be working. I looked into the source and it does the trick with two channels I was intending to do. I've changed signature a bit: fun <T, R> Flow<T>.map(concurrencyLevel: Int, dispatcher: CoroutineDispatcher = Dispatchers.Default, bufferSize: Int = concurrencyLevel, transform: suspend (T) -> R . Now it could be called instead of regular map replacing map by map(4).
I think it mostly solves my case. Few additional comments:
- I think that async/collect syntax in case we need multiple subsequent heavy operations.
- The
bufferSizepurpose is not clear from documentation. Back-pressure should start to work when the process is slow and limit the speed of incoming messages. In this case the speed is limited by suspension when incoming channel is full. This parameter does not seem to do anything. - I have not tested everything yet, but it would be nice to have subsequent operations to be run on the same thread to minimize context switching. My solution does not seem to work this way.
On Spring side, the use case is pretty common: you want to chain 2 remote webservices, the first will return a list of IDs and the second will allow to get the details associated with each IDs. For that you are typically using flatMap in Rx world and you will naturally convert it to map with Flow following the documentation which is IMO misleading:
Note that even though this operator looks very familiar, we discourage its usage in a regular application-specific flows. Most likely, suspending operation in map operator will be sufficient and linear transformations are much easier to reason about.
So developers will naturally use a sequential map thinking it handles naturally parallel processing (concurrency behavior is not specified in map operator documentation) since it supports suspending functions, but it will process each element sequentially which makes little sense to me.
I agree with @gildor when he says map is sequential in Rx (or Sequence) because it doesn't support async operation there by definition. Since map in Flow support suspending functions, it is critical to be explicit about concurrency in its documentation, and to provide parallel map capabilities that does not force developers to use single Flow element + flatMap*.
I would personally be in favor to support parallel processing by default in map because the suspending function parameter is a clear signal that may happen to me, and I see much more use cases for this behavior than sequential processing with suspending functions where use cases are unclear to me. @elizarov said he is not, point taken. But please at least update the documentation of map, flatMap* and provide a parallel map operator.
I'm all for adding concurrent APIs, too, but in an explicit way. There's a lot to design, to think about, though. I've already presented one way to provide explicit concurrent API in this thread. The other would be something like this:
flow.concurrent().map { ... }.merge()
This is both explicit about concurrency and about what happens with all those concurrently mapped things -- they are all merged at the end (the other possible termination is to reduce all those concurrent flows, which nicely gives you the classical concurrent map/reduce paradigm in the same API).
Actually, I have one really radical idea and this thread is good place to share it. I'm thinking that we should deprecate flatMapMerge because it is implicitly concurrent. The whole concurrency parameter to flatMapMerge is a design smell. The replacement for flatMapMerge should be:
flow.concurrent().flatMap { ... }.merge()
// ^ optional concurrency limit goes here if needed
At the same time flatMapConcat should be deprecated, too. A simple flow.flatMap should perform a sequential flatMapConcat just like it happens on a plain sequential sequence.
I like the simplification that leads to more consistency with only map and flatMap while providing both sequential and concurrent behavior, but I am puzzled by the requirement to add 2 additional steps for operations as simple as map or flatmap. What types would concurrent(), map { } and flatMap { } return?
I like the idea (the radical one) in general. The flatMapMerge is really vague. It is hard to understand what it does and how its parameters affect the result. The variant with flow.concurrent().map { ... }.merge() is what I ended up doing, I just alternative map(dispatcher) and collect instead of concurrent().map{} and collect(concurrency) instead of merge().collect()` (I think I like your variant better).
The question is how the intermediate flow looks from the type system point of view. I do not like Flow<Flow>> construct because it is hard to understand what it does. I also do not like using flatMap inside concurrent block instead of map because flatMap assumes that we actually have flow of flows which needs to be flattened, not a list of single element flows which are used just because we want to represent lazy values by them.
@sdeleuze You need to convert your flow to "promises" at some point and then another point to actualize those promises. In between those points you have concurrency. So you always have "in concurrency" and "out of concurrency" gates. The question is how you will describe what happens between them and will you allow to open one gate, pass the flow somewhere and close it there.
W.r.t. to types, I think that concurrent() shall return a special ConcurrentFlows<T> type which is not a flow (because flows are sequential). Only a limited set of operators will be defined for it, ones that make sense for a concurrent setting (filter, map, flatMap among them). So, to get back to the sequential Flow you'll have to terminate it with merge, reduce of something like this.
It is somewhat conceptually similar to how collection.groupingBy { ... } in stdlib works (see here). It returns Grouping type that is just an intermediate "DSL type" that should be converted back to the collection type.
A "concurrent block" can avoid to define a dedicated type, ie:
fun <T, R> Flow<T>.concurrent(concurrency: Int = 16, block: CoroutineScope.(Flow<T>) -> R): Flow<R>
or something similar.
Moreover it provides a not concurrent sandbox to use a not pure functions.
I understand why you are tempted by this design, but it would be pretty heavy to use and I am quite puzzled by the need to use 3 operators to do something as simple as a concurrent map where I see much more use cases for operation with latency like network ones than for sequential one. The original design was beautiful because simple, that looks like less clean to me.
Why not keeping simple map and flatMap with concurrency: Int = 1 by default, using the sequential/optimized implementation when concurrency == 1 and make it clear in the documentation that this optional parameter should be customized if concurrency is needed?
There are several design rules to be taken into account before we make a decision:
- We want all kinds of concurrency to be as explicit as possible. Sequential code is way easier to understand and the principle of least surprise pushes us in the direction of consistently following the rule that "flow is sequential" .
- We want to provide the library that gives the "building blocks" that end-users will combine in various ways to solve the problems they face. This makes us prefer general-purpose solutions to specialized ones. Kotlin is not Java. We don't have to provide all those operators RxJava has. If somebody uses
concurrent().flatMap { ... }.merge()so often that they find it tedious to write, they can easily define their own extension with a short name that does it. - Most (99.9..%) of our users don't care about performance nor concurrency. The data sets they work with are simply too small to care. We don't want to encumber them with anything that is related to concurrency, so API surface dealing with concurrency should be separate from the general-purpose sequential APIs that most people need.
Having spelled this out, having an optionalconcurrency parameter with a default of 1 for some operators is an option we'll look at. Here is one particular use-case I have in mind. Assume that someone wants to do a concurrent map while preserving original order (I think @altavir had this use-case in mind). It is not what concurrent().map { ... }.merge() would be doing, since that operator chain reorders original flow just like flatMapMerge is doing now (merge emits a value as soon as it is computed). Writing this order-preserving map operator yourself is non-trivial and it might deserve an option parameter to map.
I think that
concurrent()shall return a specialConcurrentFlows<T>
My thought exactly. This way it could even inherit Flow and be collected in a sequential way, though, I am not sure it is a good idea, because it could mix behaviors.
It is not what
concurrent().map { ... }.merge()would be doing, since that operator chain reorders original flow just likeflatMapMergeis doing now
I do not think it will be a problem. flatMapMerge could change the order only in case you have more than one element in merged flows. For a simple map, you will always have exactly one element in each of your flow, or any other lazy placeholders, so the order will be preserved (I've implemented both concurrent/merge and parallelMap solutions and got correct order in both cases). You have to iterate a channel somewhere inside in order to reduce parallel processing back to sequential one, and this channel basically guarantees correct order. Of course I am thinking about my flowOn-based implementation. It is probably possible to do something different which will mix the order.
The order, of course is important.
@altavir indeed, the buffer size parameter is useless when you are emitting flow with a single element, I wanted to show the general idea.
You can tune it in the way you like, e.g. you can make the target flow grouped/chunked under the hood first (Flow<T> -> Flow<List<T>>). In that case, bufferSize parameter makes sense.
For example, you can do the following (not tested):
fun <T, R> Flow<T>.concurrent(dispatcher: CoroutineDispatcher, concurrencyLevel: Int, bufferSize: Int, concurrentBuilder: (Flow<T>) -> Flow<R>): Flow<R> {
val chunked: Flow<List<T>> = chunked(size = 42) // Implementation of chunked is left as an exercise for the reader
return chunked.flatMapMerge(concurrencyLevel, bufferSize) { value ->
val batch = flow {
chunked.collect { chunk -> chunk.forEach { emit(it) } }
}
concurrentBuilder(batch)
}.flowOn(dispatcher)
}
Now you can use it in the following way:
f.concurrent(Dispatchers.Default, 16, 64) { batch ->
batch
.filter { /* heavy lifting */ } // Will be executed concurrently
.map { /* heavy lifting */ } // Will be executed concurrently, no thread switching after filter
}.map { /* do regular stuff */ } // Will be executed sequentially for the whole flow
Buffer size controls backpressure of the consumer and concurrency controls the amount of in-flight batches. Note that even if we completely remove flatMapMerge, vague parameters will just move to the concurrent builder.
@sdeleuze
On Spring side, the use case is pretty common: you want to chain 2 remote webservices, the first will return a list of IDs and the second will allow to get the details associated with each IDs. For that you are typically using flatMap in Rx world and you will naturally convert it to map with Flow following the documentation which is IMO misleading: ...since it supports suspending functions, but it will process each element sequentially which makes little sense to me.
Could you please elaborate a bit?
For example, consider you have fun ids(): Flow<Long> and fun details(id: Long): Flow<Details>.
What we (may) expect a user to write:
ids().map { id -> details(id).single() } // (1) Or even details should be suspend fun in the first place
What one may want to write in RxJava
ids().flatMap { id -> details(id) } // (2)
// or
ids().concatMap { id -> details(id) } // (3)
(1) and (3) are basically the same. The (2) option depends on the concurrency parameter (or its default value) that controls how many actual details request can be executed concurrently.
If I understood you correctly, you want (2) to be the default option in this scenario. The question is why it should be the default? I see at least multiple downsides of that:
- It introduces an implicit concurrency that leads unprepared users to the land of various concurrency bugs
- Concurrent map may be faster for the one isolated query, but for the system under the load it will actually slow it down (similarly if you will append
parallel()to everyj.l.Streamoperation) - With a default
map(concurrency)we will end in a situation "where is my anotherOperator(concurrency)"
The follow-up question is how can we improve map documentation to be clear about its intentions? What exactly did you fin senseless? We definitely don't mind some help here as documenting the core ideas you were designing/discussing/developing for a while is a bit biased :)
@elizarov Please find bellow my feedback
We want all kinds of concurrency to be as explicit as possible. Sequential code is way easier to understand and the principle of least surprise pushes us in the direction of consistently following the rule that "flow is sequential" .
Kotlin coroutines are designed on that principle, and I agree that's a good idea to avoid too much surprise here, so my initial proposal to make map concurrent by default is likely not compatible with that design principle. What I try to express here is that the declarative nature of Flow API can provide an elegant way to achieve concurrency (explicitly), and it would be a missed opportunity to make it a 2nd class citizen.
We want to provide the library that gives the "building blocks" that end-users will combine in various ways to solve the problems they face. This makes us prefer general-purpose solutions to specialized ones. Kotlin is not Java. We don't have to provide all those operators RxJava has. If somebody uses concurrent().flatMap { ... }.merge() so often that they find it tedious to write, they can easily define their own extension with a short name that does it.
Be sure I try to provide feedback with Kotlin mindset in mind. Forcing developers to use 2 additional operators that will most of the time just surround a single one just to enable concurrency is IMO nor elegant or pragmatic. Of course anybody can define custom operators, but here I think we are talking about one of the most common use cases.
Most (99.9..%) of our users don't care about performance nor concurrency.
Based on my experience of working on server-side reactive stuff for a few years, I strongly disagree with that statement about concurrency when applied to Flow use cases. And I really think this has nothing to do with Spring or Java.
The data sets they work with are simply too small to care. We don't want to encumber them with anything that is related to concurrency, so API surface dealing with concurrency should be separate from the general-purpose sequential APIs that most people need.
The need for concurrency does not come from data set size, but from latency. I am going to try to elaborate more as asked by @qwwdfsad.
A company is developing micro-services (with Spring or Ktor) that needs to request external slow REST webservices that are sadly not cachable. Locally I only have a Flow<Long> of let's say 10 elements that I get via fun ids(): Flow<Long>. To get these details I have typically call suspend fun awaitDetails(id: Long): Details. Latency of the remote webservice is 1s (not uncommon in real life).
The first thing that developers will do that after reading that Flow awesome map operator supports suspending functions is ids().map { awaitDetails(it) }. Coroutines are cheap, flatMap* documentation push to use map, and map documentation says nothing about concurrency. It is not silly to think hat most people will think that this will execute in 1 sec, sadly it will take 10 exactly like if I would have used a blocking stack where creating new threads is avoided because costly.
After bad feedback saying that the application is slow, the developer updates its code and find that she/he has to wrap the suspending function artificially in a Flow which is pretty ugly:
ids().flatMapMerge { flow { awaitDetails(it) } }.
Let's see what are the options to improve this for coroutines 1.3.
ids().concurrent().map { awaitDetails(it) }.merge()is very verbose for such common needids().mapParallel { awaitDetails(it) }is better but is adding another operator that will not solve theflatMap*design issuesids().map(concurrency = 4) { awaitDetails(it) }seems so far the best option to me, it is elegant, pragmatic, leverages optional named parameter with default value, and provide a solution where we can be back on just usingmapandflatMapwhich would solve the big concern I raised about this very common use case and would provide a way to solve theflatMap*design issue. Also it pushes developer to be aware of the number of maximum concurrent requests which is IMO a good thing.
I wish to expose a little issue regarding the using of concurrency attribute for Flow's operators instead having a ConcurrentFlow or the above proposal concurrent sandbox.
Example:
I am implementing a REST web service usingi Spring, for each id I have to limit the concurrent invocation to 4.
So I have to write ids().map(concurrency = 4) { awaitDetails(it) }.
Now I have to filter id to check the user authentication, so I really tempted to write:
ids().filter(concurrency = 4) { checkId(it) } .map(concurrency = 4) { awaitDetails(it) }
but concurrenty now is 4 + 4, so I should write something like
ids().filter(concurrency = 2) { checkId(it) } .map(concurrency = 2) { awaitDetails(it) }
tunig these parameters is not easy, to add more operators to chain become tricky.
See other solution
ids().concurrent(concurrency = 4).filter { checkId(it) }.map { awaitDetails(it) }.merge()
This looks better, but we have to consider to implement all operators for the new type, not only a limited set.
So, if I have to insert an operator not available for ConcurrentFlow then I have to write:
ids()
.concurrent(concurrency = 4).filter { checkId(it) }.merge()
.someOperator()
.concurrent(concurrency = 4).map { awaitDetails(it) }.merge()
The same issue, unless ConcurrentFlow extends Flow.
Lastly the sandbox proposal:
ids()
.concurrent(concurrency = 4) { flow ->
flow
.filter { checkId(it) }
.someOperator()
.map { awaitDetails(it) }
}
Good point, I agree that tuning these parameters is not easy.
On one side, specifying this concurrency parameter (with proper documentation) makes that more explicit so users will have a better idea of how many concurrent request their could be.
On the other side, Coroutines are cheap, so maybe we don't really care of the exact value for most cases since that adds a potentially annoying cognitive load to choose the proper value. Maybe users should just choose the concurrency mode they want (using a dedicated operator or a boolean or enum optional parameter) ...
What about using concurrent() to set the concurrency mode for downstream operators in order to avoid this "2 level API", so in my example that would be ids().concurrent().map { awaitDetails(it) }. If we need to bring back the sequential mode, we could use concurrent(concurrency = 1). That seems to solve the following points:
- Explicit concurrency
- No issues due to choosing the right value for concurrency parameter
- No new type to introduce
- 2 operators instead of 3
Let us explore this a bit. Assume that we have concurrent() operator that switches into concurrency at some "default level" (without forcing people to think about the exact numbers, which is a good thing), then you do some additional operators. This design is composable and pretty concise, but raises a number of questions. Regardless of what the return type is and how it is named, the key question is whether the result of concurrent is a Flow or not. I'd argue it cannot be a flow, and here's why.
If the result of concurrent() is a Flow, it means you can apply any flow operator to it. For example, you can do concurrent().collect { ... }. But since its "concurrent" one would naturally expect that the code in the collect operator is called concurrently. There is actually a use-case for exactly that. Here it is.
Assume that I have a flow of records and I want to store all of them to DB:
flowOfRecords.collect { storeToDB(it) } // sequential
But that does storing to DB sequentially. What if storeToDB takes a second and I want that storing to be performed concurrently to minimize latency? Armed with concurrent() operator at hand I, as a naive user, expect that this code is the solution:
flowOfRecords.concurrent().collect { storeToDB(it) } // concurrent
That's a really clear way to express my intent to concurrently store those record to DB!
But now, if a result of concurrent() is a Flow I can write: fun tricky(): Flow<Record> = flowOrRecord.concurrent(), and some unsuspecting poor user of tricky() function does tricky().collect { doSomething(it) } expecting that doSomething is called sequentially, because it is a flow, but boom -- it goes concurrent onto him.
The design rationale that Java used when designing their
Stream.parallel()operators does not apply to the design ofFlow, because streams a hot and one-shot and you are not supposed to write user-defined functions that return a stream, but flows are cold and multi-shot and we do expect people to write their functions with aFlowreturn type.
So what are the possible solutions to this conundrum in the context of our exploration of concurrent() operator?
-
If the result of
concurrent()is aFlow, then it has to have a separate operator for "concurrent collect" and a regularcollecthas to stay sequential. You can easily extend this reasoning for all the other operators (map,filter, etc), but you'll and up with a distinctly named set of operators doing essentially the same thing, but with concurrency. I'd say it is a no-go as a design choice. -
This leads us to a natural conclusion that
concurrent()result cannot be aFlow. It has to be a separate type with its own set of operators. This way thoe concurrent operators can have the same name (collect,map,filter) and it would be Ok for them to behave differently.
But that means that we'll need operators to convert back to the sequential mode and it cannot be just concurrent(1), because that does not return a flow. It has to be a separate operator: sequential, merge, whatever.
That, in turn, means that if I want to do "concurrent map" and get a regular sequential flow as a result, then I have to write somewhat longer-looking: concurrent().map { ... }.merge(). I personally do not find that to be a show-stopper, though, but it might look shorter with "sandbox" proposal if we force it to merge the results back at the end into the single flow: concurrent { map { ... } }. The sandbox design proposal has its own open questions, thought.
So far I don't see a single satisfying proposal that meets all the use-cases, so we may end up implementing multiple different solutions for concurrency to meet different use-cases.
@altavir
The order, of course is important.
You are looking through a prism of your narrow use-case, since there are clearly many use-cases for which the order is not important (see, for example, an example of storing to DB above). First of all, let me note that none of implementations of concurrentMap presented so far preserve an order. Consider proposal by @qwwdfsad, for example:
private fun <T, R> Flow<T>.concurrentMap(dispatcher: CoroutineDispatcher, concurrencyLevel: Int, bufferSize: Int, transform: suspend (T) -> R): Flow<R> {
return flatMapMerge(concurrencyLevel, bufferSize) { value ->
flow { emit(transform(value)) }
}.flowOn(dispatcher)
}
What happens here is that it creates a lot of inner flows and they are started to be collected concurrently. Now, the first one to emit, gets merged first. You can play with an example I wrote here to convince yourself that it does indeed reorder elements. Try rewriting so that it does preserve an order, yet remains concurrent.
Now, order-presenting "concurrentMap" is indeed a use-case. One of many. This use-case is an intricate one, though, because it is hard to design in a composable way. I can envision on order-preserving "concurrentMap" implementation as a separate operator, but it does not easily lend itself for a decomposition into simpler operators to combine it with "concurrentFilter" and others. It is one of the puzzles we have in the design of concurrent operators for flows.
Just my two cents. In the very likely event that we get a concurrentMap, which will probably spawn coroutines to run the next bufferSize elements (with a cap of concurrencyLevel) and maintain the original iteration order.
I propose it should be called pipelinedMap, since it's sequential but pipelined.
concurrentMap { it } vs concurrent { map { it } } already reeks of confusion (At least to me).