kotlinx-rpc icon indicating copy to clipboard operation
kotlinx-rpc copied to clipboard

Add workaround for reconnection issue to documentation

Open lsafer-meemer opened this issue 11 months ago • 2 comments
trafficstars

Is your feature request related to a problem? Please describe. It is a problem that client reconnection is not addressed

Describe the solution you'd like This is just a workaround, but here we go. It primarly uses flows and might be a solution to be integrated with the library.

@Mr3zee if you see this not fit as an issue I am very sorry. but I thought it might help someone

Some utilities:

fun <T> Flow<T>.firstShareStateInBlocking(scope: CoroutineScope): StateFlow<T> {
    val started: SharingStarted = SharingStarted.Eagerly
    return shareIn(scope, started, replay = 1).let {
        it.stateIn(scope, started, runBlocking { it.first() })
    }
}

// this is helpful if using compose
fun <T : R, R> Flow<T>.collectAsStateIn(
    coroutineScope: CoroutineScope,
    initial: R,
): State<R> {
    val state = mutableStateOf(initial)
    onEach { state.value = it }.launchIn(coroutineScope)
    return state
}

Creating a client state that reacts to session changes and reconnection requests

@OptIn(ExperimentalCoroutinesApi::class)
fun createRPCClientState(
    options: RpcClientOptions,
    httpClient: HttpClient,
    dataStore: DataStore<Preferences>,
    reConnectRequestFlow: Flow<Unit>,
    coroutineScope: CoroutineScope,
): StateFlow<RPCClient> {
    return dataStore.data
        .map { it[PREF_RPC_SESSION] }
        .distinctUntilChanged()
        .flatMapLatest { newSession ->
            merge(
                flowOf(newSession),
                reConnectRequestFlow
                    .map { newSession },
            )
        }
        .mapLatest { newSession ->
            httpClient.rpc(options.baseurl) {
                newSession?.let { bearerAuth(it) }
            }
        }
        .runningReduce { oldClient, newClient ->
            oldClient.webSocketSession.close()
            newClient
        }
        .firstShareStateInBlocking(coroutineScope)
}

And to use the client (in compose, but can be used elsewhere):

val remoteServiceState = rpcClientState
        .map { it.withService<MyRemoteService>() }
        .firstShareStateInBlocking(viewModelScope) // <-- or other scope

@OptIn(ExperimentalCoroutinesApi::class)
val someFlowValue by remoteServiceState
    .flatMapLatest { it.someServiceFlow }
    .onEach { newFlowValue ->  /**/  } // optional - to peek the value when changed
    .collectAsStateIn(viewModelScope, initial = null)  // <-- or other scope

lsafer-meemer avatar Nov 29 '24 16:11 lsafer-meemer

I am not that good in explaining, thus I just pasted the code hoping it could explain itself. sorry 😥

lsafer-meemer avatar Nov 29 '24 16:11 lsafer-meemer

Nice Solution! I tried to have so that if the flow closes then the RPCClient & Service also closes.

The solution I came up with definitely isn't as nice 😅

I'm using Arrow for the Either, sorry for not adding more docs.

    private val serviceFlow = flow {
        val retry = RetryLogic()
        while (true) {
            coroutineScope {
                val rpcClient = catch {
                    httpClient.rpc(urlString = serviceUrl)
                }

                val result: Either<ForgeClientError, Service> = rpcClient.map {
                    it.withService<Service>()
                }.mapLeft {
                    it.toError()
                }
                emit(result)

                result.onRight { service ->
                    retry.reset()
                    val rpcJob = launchCleanUp {
                        rpcClient.getOrNull()?.cancel()
                        service.cancel()
                    }
                    service.coroutineContext.job.join()
                    emit(ForgeClientError.NoConnection.left())
                    rpcJob.cancelAndJoin()
                }
            }
            retry.delay()
        }
    }

    val forgeClientFlow = authService.authStateFlow.map { authState: Either<Throwable, User?> ->
        authState.fold(
            ifLeft = { it.toError().left() },
            ifRight = { it?.right() ?: ForgeClientError.AuthMissing.left() }
        )
    }.distinctUntilChanged()
        .flatMapRight { _ ->
            serviceFlow
        }.shareIn(
            scope,
            started = SharingStarted.WhileSubscribed(10000, 0),
            replay = 1,
        )

fun CoroutineScope.launchCleanUp(body: () -> Unit) = launch {
    try {
        awaitCancellation()
    } finally {
        body()
    }
}

afTrolle avatar Feb 04 '25 22:02 afTrolle