RxSwift
RxSwift copied to clipboard
Cooperative task cancellation bug
Short description of the issue:
Today I started a discussion in the Swift forums regarding cooperative task cancellation and AsyncSequence
types. Digging through the implementations of types like Async[Throwing]Stream
I discovered that those were not cooperative at all and will straight cut off the buffer from the upstream during cancellation. With the current non-cooperative behavior there's no issue in RxSwift's values
implementation. However if the Async[Throwing]Stream
were cooperative then the implementation would get stuck and never terminate the stream via onDispose
.
Here's a custom wrapper AsyncThrowingCooperativeStream
type, which simply avoids the direct cancellation forwarding to the inner AsyncThrowingStream._Storage.cancel()
method, which would cause the termination of the buffer by calling AsyncThrowingStream._Storage.finish()
. It gives the custom logic such as a wrapped Task
or an Observable
to properly receive the dispose message and forward its decision through the onDispose
back to the captured continuation.
public struct AsyncThrowingCooperativeStream<Element, Failure> where Failure: Error {
let continuation: AsyncThrowingStream<Element, Failure>.Continuation
let stream: AsyncThrowingStream<Element, Failure>
public init(
_ elementType: Element.Type = Element.self,
bufferingPolicy limit:
AsyncThrowingStream<Element, Failure>.Continuation.BufferingPolicy = .unbounded,
_ build: (AsyncThrowingStream<Element, Failure>.Continuation) -> Void
) where Failure == any Error {
var streamContinuation: AsyncThrowingStream<Element, Failure>.Continuation! = nil
let stream = AsyncThrowingStream<Element, Failure>(
elementType,
bufferingPolicy: limit
) { continuation in
build(continuation)
streamContinuation = continuation
}
self.continuation = streamContinuation
self.stream = stream
}
}
extension AsyncThrowingCooperativeStream: AsyncSequence {
public struct Iterator: AsyncIteratorProtocol {
let continuation: AsyncThrowingStream<Element, Failure>.Continuation
// NOTE: This is `@unchecked Sendable` because `AsyncThrowingStream._Context` just captures
// `AsyncThrowingStream._Storage` which itself is `@unchecked Sendable`, so we're safe here.
struct _Box: @unchecked Sendable {
let iterator: AsyncThrowingStream<Element, Failure>.Iterator
}
let box: _Box
public mutating func next() async throws -> Element? {
let box = self.box
return try await withTaskCancellationHandler {
try await withCheckedThrowingContinuation { continuation in
// Detach the `next` method from the current parent task.
Task.detached {
var mutableIterator = box.iterator
do {
let element = try await mutableIterator.next()
continuation.resume(returning: element)
} catch {
continuation.resume(throwing: error)
}
}
}
} onCancel: { [continuation] in
// Forward the cancellation manually to the termination handler, then remove it so that
// during a subsequent `next` call we do not signal another cancellation.
let handler = continuation.onTermination
continuation.onTermination = nil
handler?(.cancelled)
}
}
}
public func makeAsyncIterator() -> Iterator {
Iterator(continuation: continuation, box: Iterator._Box(iterator: stream.makeAsyncIterator()))
}
}
extension AsyncThrowingCooperativeStream: Sendable where Element: Sendable {}
Here's the wrap similar to the implementation in the RxSwift module:
public extension ObservableConvertibleType {
// This is a copy from RxSwift's repo which mimics `Publisher.values`
var values: AsyncThrowingCooperativeStream<Element, Error> {
AsyncThrowingCooperativeStream<Element, Error> { continuation in
let disposable = asObservable().subscribe(
onNext: { value in
continuation.yield(value)
},
onError: { error in
continuation.finish(throwing: error)
},
onCompleted: {
continuation.finish(throwing: nil)
},
onDisposed: {
continuation.finish(throwing: CancellationError()) // THE FIX ✅
}
)
continuation.onTermination = { @Sendable _ in
disposable.dispose()
}
}
}
}
As you can see, onDispose
finishes the cooperative stream continuation by throwing a CancellationError
. An infinite running Observable
which gets deposed without an error will essentially call onDispose
, but it will never reach correctly the continuation, at least not a cooperative one.
Note: continuation.finish
will change the internal state in such way that a subsequent call to it would result into a no-op. That said, if an Observable
completes via onCompleted
first, a call to onDisposed
will not actually throw CancellationError
as terminal
would already equal finished
and onTermination
closure would already be nil
-ed.
Expected outcome:
Right now, it's not a bug, but if AsyncThrowingStream
ever gets changed to be cooperative on task cancellation, the values
implementation will get stuck.
What actually happens:
Nothing right now. The AsyncThrowingStream
buffer gets immediately terminated on cancellation and anything coming the Observable
subscription will be completely ignored. You can use the above AsyncThrowingCooperativeStream
to actually reproduce the potential future issue.
RxSwift/RxCocoa/RxBlocking/RxTest version/commit
Compared with RxSwift 6.5.0
Platform/Environment
Any.
How easy is to reproduce? (chances of successful reproduce after running the self contained code)
- [x] easy, 100% repro
Xcode version:
Version 14.1 (14B47b)
Installation method:
- [x] Swift Package
I have multiple versions of Xcode installed:
- [x] no
Level of RxSwift knowledge:
- [x] I have a significant code base