RxSwift
RxSwift copied to clipboard
Fixes issues of AsyncStream wrapper of Observables and related tests.
I'm using observable.values
to bridge Observable
s into AsyncStream
s in my app, but I found some issues in the related code and tests. This PR aims to adjust the following issues:
1. Tests wrapping Task
return before firing the assertions.
func testAwaitsValuesAndFinishes() {
Task {
let observable = Observable
.from(1...10)
var values = [Int]()
do {
for try await value in observable.values {
values.append(value)
}
XCTAssertEqual(values, Array(1...10))
} catch {
XCTFail("Expected to not emit failure")
}
}
}
Task.init
schedules a concurrent job and returns immediately. Tests wrapping Task
are likely to have no chance to fire the assertions inside the operation closures before the functions return.
This PR modifies these tests to use Asynchronous Tests as recommended by Apple.
Thread sanitizer on framework test targets seems to be problematic and reports false positives on Swift Concurrency code, especially when actors being used.
For example, the following obviously correct tests can trigger false positives with thread sanitizer turned on: (Target platform: macOS, Xcode: 13.2.1/13.3.1)
import XCTest
class TestFrameworkTests: XCTestCase {
@MainActor func testEample() async {
// Do nothing here...
}
}
import XCTest
actor MyActor {
var values = [Int]()
func append(_ value: Int) {
values.append(value)
}
}
class TestFrameworkTests: XCTestCase {
func testEample() async {
let myActor = MyActor()
await withTaskGroup(of: Void.self) { group in
group.addTask {
for i in 0..<10 {
await myActor.append(i)
}
}
}
let value = await myActor.values
XCTAssertEqual(value, [Int](0..<10))
}
}
This PR turns off the thread sanitizer to ensure tests pass. It's a compromised solution and I'm wondering if it's better to move Swift Concurrency related code to separate targets with the TSan off.
2. Subscriptions inside AsyncStream
s block the cooperative queue.
AsyncStream
s doesn't by default establish a Task
environment. Long-running work can block the cooperative queue and
continuation.yield(_:)
has no chance to switch to awaiting thread. Async results have to be queued until the entire observable is disposed.
import Foundation
import RxSwift
let observable = Observable<Int>.create { observer in
let disposable = Disposables.create()
for i in 0..<5 {
sleep(1)
observer.onNext(i)
}
observer.onCompleted()
return disposable
}
.debug()
let task = Task {
do {
for try await val in observable.values {
print(val)
}
} catch {
print(error)
}
}
RunLoop.main.run()
Output
2022-05-04 15:01:27.736: main.swift:17 (TestRxSwift) -> subscribed 2022-05-04 15:01:28.746: main.swift:17 (TestRxSwift) -> Event next(0) 2022-05-04 15:01:29.752: main.swift:17 (TestRxSwift) -> Event next(1) 2022-05-04 15:01:30.758: main.swift:17 (TestRxSwift) -> Event next(2) 2022-05-04 15:01:31.763: main.swift:17 (TestRxSwift) -> Event next(3) 2022-05-04 15:01:32.767: main.swift:17 (TestRxSwift) -> Event next(4) 2022-05-04 15:01:32.767: main.swift:17 (TestRxSwift) -> Event completed 2022-05-04 15:01:32.767: main.swift:17 (TestRxSwift) -> isDisposed 0 1 2 3 4
![Screen Shot 2022-05-04 at 15 06 42](https://user-images.githubusercontent.com/8158163/166642918-27f4b854-d25a-4e03-8c3f-080e6396ae63.png)
It's reasonable for AsyncStream
s to start a new Task
inside its continuation to prevent blocking the cooperative queue. This PR wraps subscriptions inside AsyncStream
s with a Task
.
Output
2022-05-04 15:24:20.040: main.swift:17 (TestRxSwift) -> subscribed 2022-05-04 15:24:21.050: main.swift:17 (TestRxSwift) -> Event next(0) 0 2022-05-04 15:24:22.053: main.swift:17 (TestRxSwift) -> Event next(1) 1 2022-05-04 15:24:23.058: main.swift:17 (TestRxSwift) -> Event next(2) 2 2022-05-04 15:24:24.061: main.swift:17 (TestRxSwift) -> Event next(3) 3 2022-05-04 15:24:25.067: main.swift:17 (TestRxSwift) -> Event next(4) 2022-05-04 15:24:25.067: main.swift:17 (TestRxSwift) -> Event completed 2022-05-04 15:24:25.067: main.swift:17 (TestRxSwift) -> isDisposed 4
3. Reentrancy of disposable.dispose()
and continuation.onTermination
on subscription disposal.
AsyncStream calls continuation.onTermination
both on completion and cancellation. The current subscription in Observable.values
might call disposable.dispose()
and continuation.onTermination
twice.
This PR removes onDisposed
on the subscriptions inside AsyncStream
s and add extra checks for AsyncStream.Continuation.Termination
to only dispose the disposable on cancellation. This ensures disposable.dispose()
and continuation.onTermination
are only invoked once.
Thanks for the wonderful work on improving this! I wanted to go back to fix some things in this area but didn't get to it
I'll try reviewing it over the weekend
Can you find an official reference for that second point?
It seems to be against everything officially written about AsyncStream (also seems very unintuitive that it would be blocking by default)
Can you find an official reference for that second point?
It seems to be against everything officially written about AsyncStream (also seems very unintuitive that it would be blocking by default)
From my understanding Task
related APIs should be the only way to establish a concurrent environment in Swift Concurrency.
The documentation of AsyncStream seems somewhat obscure, but the doc of AsyncStream.init(_:bufferingPolicy:_:)
gives the example of an asyncStream that wraps a Task.detached
call.
AsyncStream source reveals that the build
closure is actually called immediately.
Can you find an official reference for that second point? It seems to be against everything officially written about AsyncStream (also seems very unintuitive that it would be blocking by default)
From my understanding
Task
related APIs should be the only way to establish a concurrent environment in Swift Concurrency.The documentation of AsyncStream seems somewhat obscure, but the doc of
AsyncStream.init(_:bufferingPolicy:_:)
gives the example of an asyncStream that wraps aTask.detached
call.AsyncStream source reveals that the
build
closure is actually called immediately.
From my understanding observable.values works exactly like publisher.values in Combine - it blocks the current context unless you wrap it in a Task
. Seems to me like expected behavior, unless im missing something?
rom my understanding observable.values works exactly like publisher.values in Combine - it blocks the current context unless you wrap it in a
Task
. Seems to me like expected behavior, unless im missing something?
Your understanding should be correct. But what's different is that unlike observer.onNext(_:)
in Rx or subscriber.receive(_:)
in Combine, continuation.resume
in Swift Concurrency does not deliver results synchronously but buffer them until the cooperative queue is idle to accept new values.
@GetToSet sure - but I think that's an acceptable behavior. It's what a user would expect if they try compare with the Combine implementation. IMO
@GetToSet sure - but I think that's an acceptable behavior. It's what a user would expect if they try compare with the Combine implementation. IMO
It should be the acceptable behavior (identical to what Combine behaves).
Actually there's no official implementation to bridge Combine Publisher
to AsyncStream
and there're some open-source implementations like this post or this, which we could might take a reference from.
@GetToSet I played with this a bit.
When you do combinePublisher.values
it would also block the current thread if you don't carefully wrap it inside a Task
. I don't think it's something Combine (or RxSwift) should do. The developer should be careful enough to use the AsyncStream in the right context.
yncStream calls
continuation.onTermination
both on completion and cancellation. The current subscription inObservable.values
might calldisposable.dispose()
andcontinuation.onTermination
twice.This PR removes
onDisposed
on the subscriptions insideAsyncStream
s and add extra checks forAsyncStream.Continuation.Termination
to only dispose the disposable on cancellation. This ensuresdisposable.dispose()
andcontinuation.onTermination
are only invoked once.
Thanks @freak4pc for reviewing. After some second thoughts, I agree with your point and I'll try to exclude the Task
wrapper from this PR.
@freak4pc I've updated my PR and now it fixes the first and third point, leaving the blocking behavior(second point) untouched.
IRT No.1 - I'm a bit worried about turning of TSan In tests here but I've read a lot about issues regarding to concurrency, specifically. Perhaps it is worthwhile to split those tests to a new test target? Not sure it's worth the effort. WDYT?
And IRT No. 3, there was some work done on Single
in #2427 - should this same fix be applied to it?
IRT No.1 - I'm a bit worried about turning of TSan In tests here but I've read a lot about issues regarding to concurrency, specifically. Perhaps it is worthwhile to split those tests to a new test target? Not sure it's worth the effort. WDYT?
And IRT No. 3, there was some work done on
Single
in #2427 - should this same fix be applied to it?
For the first point, I've tried this locally, but still I think it's more like an Xcode bug than we should split those tests to a new target. For the second, I've not seen this PR before, and let me have a review first.
I've made following changes to this PR:
- For async sequences that throws, manually throw a CancellationError on cancel (similar to
PrimitiveSequence.value
, #2427). - Add tests to cover the changes.
This PR was made a year ago and I disabled the thread senitizer as a workaround for XCTest false positives. It seems tests run well with latest SDKs (tested with Xcode 14.3.0, both macOS and iOS platform). We could probably re-enable the TSan for tests schemes. @freak4pc
This PR was made a year ago and I disabled the thread senitizer as a workaround for XCTest false positives. It seems tests run well with latest SDKs (tested with Xcode 14.3.0, both macOS and iOS platform). We could probably re-enable the TSan for tests schemes. @freak4pc
Mind pulling up a separate PR for this quickly? Otherwise will take care later today