retrofit
retrofit copied to clipboard
Async RxJava3 call adapter doesn't produce error event when request is canceled by timeout
Test case/sample: https://gist.github.com/alapshin/a60540a1a128c0af042b7fe427b9de88
Description
- OkHttp client is configured to use arbitrary call timeout
- Retrofit's RxJava3 call adapter is created via
RxJava3CallAdapterFactory.create()
Result When call time outs Rx stream doesn't receive any event and remains active
Expected result When call time outs Rx stream receives error event and completes
Additional info
If RxJava3 call adapter is created via RxJava3CallAdapterFactory.createWithScheduler(Schedulers.io) then call cancellation by timeout produces exception which is propagated to stream
java.io.InterruptedIOException: timeout
at okhttp3.internal.connection.RealCall.timeoutExit(RealCall.kt:398)
at okhttp3.internal.connection.RealCall.callDone(RealCall.kt:360)
at okhttp3.internal.connection.RealCall.noMoreExchanges$okhttp(RealCall.kt:325)
at okhttp3.internal.connection.RealCall.getResponseWithInterceptorChain$okhttp(RealCall.kt:209)
at okhttp3.internal.connection.RealCall.execute(RealCall.kt:154)
at retrofit2.OkHttpCall.execute(OkHttpCall.java:204)
at retrofit2.adapter.rxjava3.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:46)
at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13127)
at retrofit2.adapter.rxjava3.BodyObservable.subscribeActual(BodyObservable.java:35)
at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13127)
at io.reactivex.rxjava3.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
at io.reactivex.rxjava3.core.Scheduler$DisposeTask.run(Scheduler.java:644)
at io.reactivex.rxjava3.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:65)
at io.reactivex.rxjava3.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:56)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Canceled
at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.kt:72)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
at okhttp3.internal.connection.RealCall.getResponseWithInterceptorChain$okhttp(RealCall.kt:201)
... 16 more
There is also somewhat related discussion at https://github.com/square/retrofit/issues/3453 with comment with comment https://github.com/square/retrofit/issues/3453#issuecomment-682034904 which points to possible cause of this difference in behavior.
Seems like the issue happens when thread switching happens while onFailure is being executed. To demonstrate the case deterministically I have created a sample snippet,
import io.reactivex.Observable
import io.reactivex.Observer
import io.reactivex.Single
import io.reactivex.disposables.Disposable
import io.reactivex.exceptions.CompositeException
import io.reactivex.exceptions.Exceptions
import io.reactivex.plugins.RxJavaPlugins
import io.reactivex.schedulers.Schedulers
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
fun main(args: Array<String>) {
val countDownLatch = CountDownLatch(1)
val disposable = SampleObservable()
.subscribeOn(Schedulers.io())
.subscribe(
{
println("Emitted")
countDownLatch.countDown()
},
{
println("Errored")
countDownLatch.countDown()
}
)
Single.just(Unit)
.delay(3000, TimeUnit.MILLISECONDS, Schedulers.io())
.subscribe({ disposable.dispose() }, {})
countDownLatch.await()
}
class SampleObservable : Observable<Unit>() {
override fun subscribeActual(observer: Observer<in Unit>?) {
observer?.onSubscribe(SampleCallback(observer))
}
private inner class SampleCallback(private val observer: Observer<in Unit>) : Disposable {
@Volatile
private var isDisposed: Boolean = false
init {
sendError(IllegalStateException("Sample Exception"))
}
override fun dispose() {
isDisposed = true
}
override fun isDisposed(): Boolean = isDisposed
private fun sendError(exception: Exception) {
if (!isDisposed) {
val executorService = Executors.newSingleThreadScheduledExecutor()
executorService.schedule(
{
try {
observer.onError(exception)
} catch (inner: Throwable) {
Exceptions.throwIfFatal(inner)
RxJavaPlugins.onError(CompositeException(exception, inner))
}
},
5,
TimeUnit.SECONDS
)
}
}
}
}
The fix would be to synchronise read and writes to isDisposed and observer.
This is the cause for one of the top trending crashes in our product. Is the fix already in pipeline? Please advice any alternatives if possible. Thanks.
Is there any negative side effect of just leveraging, RxJava3CallAdapterFactory.createWithScheduler(Schedulers.io) instead of the default .create()? Seems important that timeouts would be sent downstream instead of hanging.
EDIT: It's also a significant regression from the rxjava2 call adapter, no?
Apparently this is happening due to this line: https://github.com/square/retrofit/blob/b4eed3f82a969d0eb3c171449df04dc2b2824bbc/retrofit-adapters/rxjava2/src/main/java/retrofit2/adapter/rxjava2/CallEnqueueObservable.java#L85
in okhttp,when timeout,it will cancel all the call so in CallEnqueueObervale.java
if (call.isCanceled()) return;
then ,it will never catch the exception