kotlinx-rpc
kotlinx-rpc copied to clipboard
Add workaround for reconnection issue to documentation
Is your feature request related to a problem? Please describe. It is a problem that client reconnection is not addressed
Describe the solution you'd like This is just a workaround, but here we go. It primarly uses flows and might be a solution to be integrated with the library.
@Mr3zee if you see this not fit as an issue I am very sorry. but I thought it might help someone
Some utilities:
fun <T> Flow<T>.firstShareStateInBlocking(scope: CoroutineScope): StateFlow<T> {
val started: SharingStarted = SharingStarted.Eagerly
return shareIn(scope, started, replay = 1).let {
it.stateIn(scope, started, runBlocking { it.first() })
}
}
// this is helpful if using compose
fun <T : R, R> Flow<T>.collectAsStateIn(
coroutineScope: CoroutineScope,
initial: R,
): State<R> {
val state = mutableStateOf(initial)
onEach { state.value = it }.launchIn(coroutineScope)
return state
}
Creating a client state that reacts to session changes and reconnection requests
@OptIn(ExperimentalCoroutinesApi::class)
fun createRPCClientState(
options: RpcClientOptions,
httpClient: HttpClient,
dataStore: DataStore<Preferences>,
reConnectRequestFlow: Flow<Unit>,
coroutineScope: CoroutineScope,
): StateFlow<RPCClient> {
return dataStore.data
.map { it[PREF_RPC_SESSION] }
.distinctUntilChanged()
.flatMapLatest { newSession ->
merge(
flowOf(newSession),
reConnectRequestFlow
.map { newSession },
)
}
.mapLatest { newSession ->
httpClient.rpc(options.baseurl) {
newSession?.let { bearerAuth(it) }
}
}
.runningReduce { oldClient, newClient ->
oldClient.webSocketSession.close()
newClient
}
.firstShareStateInBlocking(coroutineScope)
}
And to use the client (in compose, but can be used elsewhere):
val remoteServiceState = rpcClientState
.map { it.withService<MyRemoteService>() }
.firstShareStateInBlocking(viewModelScope) // <-- or other scope
@OptIn(ExperimentalCoroutinesApi::class)
val someFlowValue by remoteServiceState
.flatMapLatest { it.someServiceFlow }
.onEach { newFlowValue -> /**/ } // optional - to peek the value when changed
.collectAsStateIn(viewModelScope, initial = null) // <-- or other scope
I am not that good in explaining, thus I just pasted the code hoping it could explain itself. sorry 😥
Nice Solution! I tried to have so that if the flow closes then the RPCClient & Service also closes.
The solution I came up with definitely isn't as nice 😅
I'm using Arrow for the Either, sorry for not adding more docs.
private val serviceFlow = flow {
val retry = RetryLogic()
while (true) {
coroutineScope {
val rpcClient = catch {
httpClient.rpc(urlString = serviceUrl)
}
val result: Either<ForgeClientError, Service> = rpcClient.map {
it.withService<Service>()
}.mapLeft {
it.toError()
}
emit(result)
result.onRight { service ->
retry.reset()
val rpcJob = launchCleanUp {
rpcClient.getOrNull()?.cancel()
service.cancel()
}
service.coroutineContext.job.join()
emit(ForgeClientError.NoConnection.left())
rpcJob.cancelAndJoin()
}
}
retry.delay()
}
}
val forgeClientFlow = authService.authStateFlow.map { authState: Either<Throwable, User?> ->
authState.fold(
ifLeft = { it.toError().left() },
ifRight = { it?.right() ?: ForgeClientError.AuthMissing.left() }
)
}.distinctUntilChanged()
.flatMapRight { _ ->
serviceFlow
}.shareIn(
scope,
started = SharingStarted.WhileSubscribed(10000, 0),
replay = 1,
)
fun CoroutineScope.launchCleanUp(body: () -> Unit) = launch {
try {
awaitCancellation()
} finally {
body()
}
}