kotlinx.coroutines icon indicating copy to clipboard operation
kotlinx.coroutines copied to clipboard

[Flow] combineTransformLatest()

Open hrach opened this issue 6 years ago • 7 comments
trafficstars

I'd like to propose introduce new combineTransformLatest operator , which behaves like combineTransform but cancels transformer if new value arrives.

Example: loading places to map; user is moving with map, which cancels the previous fetching, also the loaded data depend on filter or other Flows.

I've implement this
class Symbol(val symbol: String) {
	override fun toString(): String = symbol

	@Suppress("UNCHECKED_CAST", "NOTHING_TO_INLINE")
	inline fun <T> unbox(value: Any?): T = if (value === this) null as T else value as T
}

val NULL = Symbol("NULL")

inline fun <reified T, R> combineTransformLatest(
	vararg flows: Flow<T>,
	@BuilderInference crossinline transform: suspend FlowCollector<R>.(Array<T>) -> Unit
): Flow<R> {
	return channelFlow {
		coroutineScope {
			val size = flows.size
			val channels = flows.map { flow ->
				[email protected]<Any> {
					flow.collect { value ->
						[email protected](value ?: NULL)
					}
				}
			}
		var job: Job? = null
		val latestValues = arrayOfNulls<Any?>(size)
		val isClosed = Array(size) { false }

		while (!isClosed.all { it }) {
			select<Unit> {
				for (i in 0 until size) {
					if (isClosed[i]) continue
					@Suppress("DEPRECATION")
					channels[i].onReceiveOrNull { receivedValue ->
						if (receivedValue == null) {
							isClosed[i] = true
						} else {
							latestValues[i] = receivedValue
							if (latestValues.all { it !== null }) {
								job?.apply {
									cancel(CancellationException())
									join()
								}
								job = launch {
									val arguments = arrayOfNulls<T>(size)
									for (index in 0 until size) {
										arguments[index] = NULL.unbox(latestValues[index])
									}
									flow<R> {
										@Suppress("UNCHECKED_CAST")
										[email protected](arguments as Array<T>)
									}.collect {
										[email protected](it)
									}
								}
							}
						}
					}
				}
			}
		}
	}
}

Not totally sure if the channelFlow as wrapper is correct, probably possible to do it without it but only with access to intenals.

I'm ok to have just this signature - varargs and with transformer, which allows not to emit or emit multiple times in opposite combineLatest().

hrach avatar Aug 28 '19 11:08 hrach

Can you, please, explain a use-case for such an operation. An example code that is using this operation would be extremely helpful, too.

elizarov avatar Sep 03 '19 14:09 elizarov

  • I do not have much of examples for multiple emissions (with exception of some Resource<T>, when you emit Resource.Loading and then Resource.Success, #1224).
  • I sometimes do not emit when some model condition is not met, so the transformer is needed here.

Generally, I'm connecting user event flows (moving with a map) with some background sync jobs with users data (user's favoriteIds, ...). Then I fetch data needed for viewmodel and in the end expose the flow as LiveData.

This example bellow needs combine's "map" and "latest", other my usecases need "transform" and would benefit from "latest".

private val mapPlacesFlow: Flow<FeatureCollection> by lazy {
	combineTransformLatest<Any?, FeatureCollection>(
		mapTilesFlow,
		mapFilterFlow,
		mapActivePlaceIdFlow
			.filter { placeId ->
				// if place is already loaded & spread, then we do not run fetch again
				mapPlaces.value?.features()?.find { it.getStringProperty("placeId") == placeId } == null
			},
		session.trip.asFlow(),
		session.favoriteIds.asFlow(),
		session.userPlaceIds.asFlow()
	) { args ->
		@Suppress("UNCHECKED_CAST")
		loadPlaces(
			args[0] as MapState,
			args[1] as Filter,
			args[2] as String?,
			args[3] as Trip?,
			args[4] as Set<String>,
			args[5] as Session.UserPlaces
		)
	}
		.flowOn(Dispatchers.Default)
		.shareIn(viewModelScope)
}
val mapPlaces: LiveData<FeatureCollection> by lazy {
	mapPlacesFlow
		.asLiveData(viewModelScope)
}

private suspend fun FlowCollector<FeatureCollection>.loadPlaces(
	mapState: MapState,
	filter: Filter,
	selectedPlaceId: String?,
	trip: Trip?,
	favoriteIds: Set<String>,
	userPlaceIds: Session.UserPlaces
): Unit = coroutineScope {
	// ...
	emit(FeatureCollection...)
}

Home this helps design also other API.

hrach avatar Sep 03 '19 14:09 hrach

Here is another use case:

val manualReload = ConflatedBroadcastChannel<Unit>().also { it.offer(Unit }
val accessTokenFlow = ...

val stateFlow = accessTokenFlow.combineLatestTransform(manualReload.asFlow()) { accessToken, _ ->
  if (accessToken == null) {
    emit(State.LoggedOut)
  } else {
    emit(State.Loading)
    observeUser(accessToken).doStuff()
  }
}

In this simple example the reload could be pulled into the inner flow but the use case remains the same however. Any time you want to transform into an infinite stream this would be very useful.

ansman avatar Sep 04 '20 22:09 ansman

We also need this.

Our use case is an instagram like story that auto progresses. There are two flows, one Flow<Data> and one flow that represents the current page index, backed by MutableStateFlow<Int>. Now based on the amount of text inside a story, the transform function emits a view state with a progress in 0F..1F. When the user presses the left side of the story, the current page index gets decremented and the story goes to the previous page. Therefore I need to transform the latest value only.

It would be nice if this was just part of the library.

PaulWoitaschek avatar Sep 26 '20 15:09 PaulWoitaschek

I managed to roll my own combineTransformLatest which is based on the existing operators :tada:

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.transformLatest

inline fun <reified T, R> combineTransformLatest(
  vararg flows: Flow<T>,
  @BuilderInference noinline transform: suspend FlowCollector<R>.(Array<T>) -> Unit
): Flow<R> {
  return combine(*flows) { it }
    .transformLatest(transform)
}

fun <T1, T2, R> combineTransformLatest(
  flow: Flow<T1>,
  flow2: Flow<T2>,
  @BuilderInference transform: suspend FlowCollector<R>.(T1, T2) -> Unit
): Flow<R> {
  return combineTransformLatest(flow, flow2) { args ->
    @Suppress("UNCHECKED_CAST")
    transform(
      args[0] as T1,
      args[1] as T2
    )
  }
}

PaulWoitaschek avatar Sep 27 '20 07:09 PaulWoitaschek

In fact, combineTransform should be called combineTransformConcat because any function that is suspended inside transform collector also suspend combines a bit like flatMapConcat operator.

Shusek avatar Dec 24 '21 13:12 Shusek

Especially if you want to run a polling loop, combineTransformLatest is very handy, because the running loop is cancelled as soon the input has changed:

fun pollingPageContentFlow(
    isAuthorizedFlow: Flow<Boolean>,
    urlFlow: Flow<String>,
) = combineTransformLatest(isAuthorizedFlow, urlFlow) { isAuthorized, url ->
    while (currentCoroutineContext().isActive) {
        emit(if(isAuthorized) "Page Content $url" else "Not authorized")
        delay(2000)
    }
}

Ic-ks avatar Aug 25 '23 09:08 Ic-ks