RxSwift icon indicating copy to clipboard operation
RxSwift copied to clipboard

Fixes issues of AsyncStream wrapper of Observables and related tests.

Open GetToSet opened this issue 2 years ago • 13 comments

I'm using observable.values to bridge Observables into AsyncStreams in my app, but I found some issues in the related code and tests. This PR aims to adjust the following issues:

1. Tests wrapping Task return before firing the assertions.

func testAwaitsValuesAndFinishes() {
    Task {
		let observable = Observable
        	.from(1...10)
        var values = [Int]()
        do {
            for try await value in observable.values {
                values.append(value)
            }
            XCTAssertEqual(values, Array(1...10))
        } catch {
            XCTFail("Expected to not emit failure")
        }
    }
}

Task.init schedules a concurrent job and returns immediately. Tests wrapping Task are likely to have no chance to fire the assertions inside the operation closures before the functions return.

This PR modifies these tests to use Asynchronous Tests as recommended by Apple.

Thread sanitizer on framework test targets seems to be problematic and reports false positives on Swift Concurrency code, especially when actors being used.

For example, the following obviously correct tests can trigger false positives with thread sanitizer turned on: (Target platform: macOS, Xcode: 13.2.1/13.3.1)

import XCTest

class TestFrameworkTests: XCTestCase {

  @MainActor func testEample() async {
    // Do nothing here...
  }

}
import XCTest

actor MyActor {
  var values = [Int]()
  func append(_ value: Int) {
    values.append(value)
  }
}

class TestFrameworkTests: XCTestCase {

  func testEample() async {
    let myActor = MyActor()
    await withTaskGroup(of: Void.self) { group in
      group.addTask {
        for i in 0..<10 {
          await myActor.append(i)
        }
      }
    }
    let value = await myActor.values
    XCTAssertEqual(value, [Int](0..<10))
  }

}

This PR turns off the thread sanitizer to ensure tests pass. It's a compromised solution and I'm wondering if it's better to move Swift Concurrency related code to separate targets with the TSan off.

2. Subscriptions inside AsyncStreams block the cooperative queue.

AsyncStreams doesn't by default establish a Task environment. Long-running work can block the cooperative queue and
continuation.yield(_:) has no chance to switch to awaiting thread. Async results have to be queued until the entire observable is disposed.

import Foundation
import RxSwift

let observable = Observable<Int>.create { observer in
  let disposable = Disposables.create()
  for i in 0..<5 {
    sleep(1)
    observer.onNext(i)
  }
  observer.onCompleted()
  return disposable
}
.debug()

let task = Task {
  do {
    for try await val in observable.values {
      print(val)
    }
  } catch {
    print(error)
  }
}

RunLoop.main.run()
Output
2022-05-04 15:01:27.736: main.swift:17 (TestRxSwift) -> subscribed
2022-05-04 15:01:28.746: main.swift:17 (TestRxSwift) -> Event next(0)
2022-05-04 15:01:29.752: main.swift:17 (TestRxSwift) -> Event next(1)
2022-05-04 15:01:30.758: main.swift:17 (TestRxSwift) -> Event next(2)
2022-05-04 15:01:31.763: main.swift:17 (TestRxSwift) -> Event next(3)
2022-05-04 15:01:32.767: main.swift:17 (TestRxSwift) -> Event next(4)
2022-05-04 15:01:32.767: main.swift:17 (TestRxSwift) -> Event completed
2022-05-04 15:01:32.767: main.swift:17 (TestRxSwift) -> isDisposed
0
1
2
3
4
Screen Shot 2022-05-04 at 15 06 42

It's reasonable for AsyncStreams to start a new Task inside its continuation to prevent blocking the cooperative queue. This PR wraps subscriptions inside AsyncStreams with a Task.

Output
2022-05-04 15:24:20.040: main.swift:17 (TestRxSwift) -> subscribed
2022-05-04 15:24:21.050: main.swift:17 (TestRxSwift) -> Event next(0)
0
2022-05-04 15:24:22.053: main.swift:17 (TestRxSwift) -> Event next(1)
1
2022-05-04 15:24:23.058: main.swift:17 (TestRxSwift) -> Event next(2)
2
2022-05-04 15:24:24.061: main.swift:17 (TestRxSwift) -> Event next(3)
3
2022-05-04 15:24:25.067: main.swift:17 (TestRxSwift) -> Event next(4)
2022-05-04 15:24:25.067: main.swift:17 (TestRxSwift) -> Event completed
2022-05-04 15:24:25.067: main.swift:17 (TestRxSwift) -> isDisposed
4

3. Reentrancy of disposable.dispose() and continuation.onTermination on subscription disposal.

AsyncStream calls continuation.onTermination both on completion and cancellation. The current subscription in Observable.values might call disposable.dispose() and continuation.onTermination twice.

This PR removes onDisposed on the subscriptions inside AsyncStreams and add extra checks for AsyncStream.Continuation.Termination to only dispose the disposable on cancellation. This ensures disposable.dispose() and continuation.onTermination are only invoked once.

GetToSet avatar May 04 '22 09:05 GetToSet

Thanks for the wonderful work on improving this! I wanted to go back to fix some things in this area but didn't get to it

I'll try reviewing it over the weekend

freak4pc avatar May 04 '22 13:05 freak4pc

Can you find an official reference for that second point?

It seems to be against everything officially written about AsyncStream (also seems very unintuitive that it would be blocking by default)

freak4pc avatar May 04 '22 13:05 freak4pc

Can you find an official reference for that second point?

It seems to be against everything officially written about AsyncStream (also seems very unintuitive that it would be blocking by default)

From my understanding Task related APIs should be the only way to establish a concurrent environment in Swift Concurrency.

The documentation of AsyncStream seems somewhat obscure, but the doc of AsyncStream.init(_:bufferingPolicy:_:) gives the example of an asyncStream that wraps a Task.detached call.

AsyncStream source reveals that the build closure is actually called immediately.

GetToSet avatar May 04 '22 15:05 GetToSet

Can you find an official reference for that second point? It seems to be against everything officially written about AsyncStream (also seems very unintuitive that it would be blocking by default)

From my understanding Task related APIs should be the only way to establish a concurrent environment in Swift Concurrency.

The documentation of AsyncStream seems somewhat obscure, but the doc of AsyncStream.init(_:bufferingPolicy:_:) gives the example of an asyncStream that wraps a Task.detached call.

AsyncStream source reveals that the build closure is actually called immediately.

From my understanding observable.values works exactly like publisher.values in Combine - it blocks the current context unless you wrap it in a Task. Seems to me like expected behavior, unless im missing something?

freak4pc avatar May 12 '22 05:05 freak4pc

rom my understanding observable.values works exactly like publisher.values in Combine - it blocks the current context unless you wrap it in a Task. Seems to me like expected behavior, unless im missing something?

Your understanding should be correct. But what's different is that unlike observer.onNext(_:) in Rx or subscriber.receive(_:) in Combine, continuation.resume in Swift Concurrency does not deliver results synchronously but buffer them until the cooperative queue is idle to accept new values.

GetToSet avatar May 12 '22 06:05 GetToSet

@GetToSet sure - but I think that's an acceptable behavior. It's what a user would expect if they try compare with the Combine implementation. IMO

freak4pc avatar May 12 '22 06:05 freak4pc

@GetToSet sure - but I think that's an acceptable behavior. It's what a user would expect if they try compare with the Combine implementation. IMO

It should be the acceptable behavior (identical to what Combine behaves).

Actually there's no official implementation to bridge Combine Publisher to AsyncStream and there're some open-source implementations like this post or this, which we could might take a reference from.

GetToSet avatar May 12 '22 06:05 GetToSet

@GetToSet I played with this a bit.

When you do combinePublisher.values it would also block the current thread if you don't carefully wrap it inside a Task. I don't think it's something Combine (or RxSwift) should do. The developer should be careful enough to use the AsyncStream in the right context.

freak4pc avatar May 24 '22 04:05 freak4pc

yncStream calls continuation.onTermination both on completion and cancellation. The current subscription in Observable.values might call disposable.dispose() and continuation.onTermination twice.

This PR removes onDisposed on the subscriptions inside AsyncStreams and add extra checks for AsyncStream.Continuation.Termination to only dispose the disposable on cancellation. This ensures disposable.dispose() and continuation.onTermination are only invoked once.

Thanks @freak4pc for reviewing. After some second thoughts, I agree with your point and I'll try to exclude the Task wrapper from this PR.

GetToSet avatar May 28 '22 17:05 GetToSet

@freak4pc I've updated my PR and now it fixes the first and third point, leaving the blocking behavior(second point) untouched.

GetToSet avatar Jun 04 '22 12:06 GetToSet

IRT No.1 - I'm a bit worried about turning of TSan In tests here but I've read a lot about issues regarding to concurrency, specifically. Perhaps it is worthwhile to split those tests to a new test target? Not sure it's worth the effort. WDYT?

And IRT No. 3, there was some work done on Single in #2427 - should this same fix be applied to it?

freak4pc avatar Jun 04 '22 13:06 freak4pc

IRT No.1 - I'm a bit worried about turning of TSan In tests here but I've read a lot about issues regarding to concurrency, specifically. Perhaps it is worthwhile to split those tests to a new test target? Not sure it's worth the effort. WDYT?

And IRT No. 3, there was some work done on Single in #2427 - should this same fix be applied to it?

For the first point, I've tried this locally, but still I think it's more like an Xcode bug than we should split those tests to a new target. For the second, I've not seen this PR before, and let me have a review first.

GetToSet avatar Jun 04 '22 14:06 GetToSet

I've made following changes to this PR:

  1. For async sequences that throws, manually throw a CancellationError on cancel (similar to PrimitiveSequence.value, #2427).
  2. Add tests to cover the changes.

GetToSet avatar Jun 14 '22 17:06 GetToSet

This PR was made a year ago and I disabled the thread senitizer as a workaround for XCTest false positives. It seems tests run well with latest SDKs (tested with Xcode 14.3.0, both macOS and iOS platform). We could probably re-enable the TSan for tests schemes. @freak4pc

GetToSet avatar May 11 '23 15:05 GetToSet

This PR was made a year ago and I disabled the thread senitizer as a workaround for XCTest false positives. It seems tests run well with latest SDKs (tested with Xcode 14.3.0, both macOS and iOS platform). We could probably re-enable the TSan for tests schemes. @freak4pc

Mind pulling up a separate PR for this quickly? Otherwise will take care later today

freak4pc avatar May 11 '23 15:05 freak4pc