ReactiveSwift icon indicating copy to clipboard operation
ReactiveSwift copied to clipboard

Support await / AsyncSequence

Open NachoSoto opened this issue 4 years ago • 13 comments

I haven’t tried the new betas yet, but I imagine the prevalence of @MainActor in iOS 15 will bring issues to current uses of ReactiveSwift. Specifically, I imagine you won’t be able to just do this:

producer
  .observe(on: UIScheduler())
  .start { [label] in label.text = $0  }

Feature request:

for await value in producer.start() {
  self.label.text = value
}

Unfortunately for try await would mean we lose error type information, so I would propose this API produces Result<Value, Error>, or a sequence of Value if Error is Never.

I’ll probably work on this throughout the week, but other thoughts are appreciated!

References:

  • https://github.com/apple/swift-evolution/blob/main/proposals/0298-asyncsequence.md
  • https://github.com/apple/swift-evolution/blob/main/proposals/0314-async-stream.md

Other ideas:

  • Being able to create SignalProducers using async:
SignalProducer<Int, MyError> { observer, disposable in
  observer.send(value: await f1())

  guard !disposable.isDisposed else { return }

  do {
    observer.send(value: try await f2())
  } catch {
    observer.send(error: error)
  }
}
  • Or simply just one async function:
let producer = SignalProducer<Int, Error>(asyncFunction: f)
  • New collect overloads:
let values: [Int] = await producer.collect()
let result: Result<[Int], MyError> = try await producer.collect()

NachoSoto avatar Jun 07 '21 20:06 NachoSoto

It should be modelled as a conversion to AsyncSequence conforming type. asSequence() for fluency presumably.

It could also be SignalProducer(Convertible) inheriting AsyncSequence, though I am not sure how the operator name clash will pan out.

The nuance is likely around dealing with the scenario of a fast producer against a slow consumer. To achieve the deliver exactly once default of today’s RAS, we probably need to use unlimited buffering with the (supposedly available) AsyncStream.

(That is assuming we throw away all the blocking options)

andersio avatar Jun 08 '21 08:06 andersio

Another thing worth watching is whether Swift Concurrency will end up backward deployed.

Async-await is really a natural way to express backpressure (callee can defer return of control flow by holding onto the continuation), especially personally having had a hands-on experience of it in Kotlin Coroutines. So that’s honestly how I would see proper backpressure being introduced into RAS, versus the Combine/Reactive Streams model (works but sometimes brittle).

(especially relevant since the proposed AsyncSequence stuff did not grow into a full fledged FRP toolbox, which means there are still values in using community libraries like RAS).

Either way, for now, we should bet on basically simple interops.

andersio avatar Jun 11 '21 18:06 andersio

It became known that async / await mechanism is not backward deployed :( It's require new runtime. It's a pity

danya61 avatar Jul 04 '21 14:07 danya61

Edit: fixed implementation for RC:

I quickly prototyped this on Beta 3 and it works:

@available(iOS 15.0, *)
@available(tvOS 15.0, *)
@available(watchOS 8.0, *)
@available(macCatalystApplicationExtension 15.0, *)
extension SignalProducer {
    public func start() -> AsyncStream<Result<Value, Error>> {
        return AsyncStream(Result<Value, Error>.self) { continuation in
            let disposable = self.start { event in
                switch event {
                case let .value(value): continuation.yield(.success(value))
                case let .failed(error): continuation.yield(.failure(error))
                case .completed, .interrupted: continuation.finish()
                }
            }
            
            continuation.onTermination = { @Sendable _ in disposable.dispose() }
        }
    }
}

@available(iOS 15.0, *)
@available(tvOS 15.0, *)
@available(watchOS 6.0, *)
@available(macCatalystApplicationExtension 15.0, *)
func f() async {
	let p = SignalProducer<Int, Never>([1, 2, 3])

	for try await x in p.start() {
		print(x)
	}
}

Unfortunately we don't have a way to test these things right now until https://github.com/Quick/Quick/issues/1084.

NachoSoto avatar Jul 20 '21 18:07 NachoSoto

It became known that async / await mechanism is not backward deployed :( It's require new runtime. It's a pity

Luckily that's no longer true! :)

NachoSoto avatar Aug 31 '21 14:08 NachoSoto

@NachoSoto who knows how long this process will take...

danya61 avatar Sep 24 '21 07:09 danya61

What process?

NachoSoto avatar Sep 24 '21 15:09 NachoSoto

@NachoSoto I guess @danya61 is referring to this PR which I think is the last piece in back porting concurrency to older OSes. (looks like it will be iOS 13, macOS 10.15).

mluisbrown avatar Sep 24 '21 15:09 mluisbrown

What process?

Back-deploy concurrency

danya61 avatar Sep 29 '21 17:09 danya61

maybe https://github.com/ReactiveX/RxSwift/releases/tag/6.5.0 can serve as an example / template ?

apps4everyone avatar Jan 14 '22 12:01 apps4everyone

Edit: fixed implementation for RC:

I quickly prototyped this on Beta 3 and it works:

@available(iOS 15.0, *)
@available(tvOS 15.0, *)
@available(watchOS 8.0, *)
@available(macCatalystApplicationExtension 15.0, *)
extension SignalProducer {
    public func start() -> AsyncStream<Result<Value, Error>> {
        return AsyncStream(Result<Value, Error>.self) { continuation in
            let disposable = self.start { event in
                switch event {
                case let .value(value): continuation.yield(.success(value))
                case let .failed(error): continuation.yield(.failure(error))
                case .completed, .interrupted: continuation.finish()
                }
            }
            
            continuation.onTermination = { @Sendable _ in disposable.dispose() }
        }
    }
}

@available(iOS 15.0, *)
@available(tvOS 15.0, *)
@available(watchOS 6.0, *)
@available(macCatalystApplicationExtension 15.0, *)
func f() async {
	let p = SignalProducer<Int, Never>([1, 2, 3])

	for try await x in p.start() {
		print(x)
	}
}

Unfortunately we don't have a way to test these things right now until Quick/Quick#1084.

What about using AsyncThrowingStream to handle the errors without using Result?

jonasman avatar Mar 30 '22 13:03 jonasman

We would use the Error type information.

NachoSoto avatar Mar 30 '22 14:03 NachoSoto

Hi!

@andersio Let me first say: thank you very much for your great work! I learned the reactive paradigm mostly by using ReactiveSwift and enjoyed it a lot.

But in our team we have concerns mixing async/await with ReactiveSwift. According to this WWDC talk it's unsafe to use locking primitives like semaphores in the context of async continuations. I can see usage of DispatchSemaphore, PthreadLock, etc... in ReactiveSwift sources. Do you consider this a possible issue?

Thanks in advance.

Br, Darko

Screen Shot 2022-06-30 at 09 11 08

DarkoDamjanovic avatar Jun 30 '22 08:06 DarkoDamjanovic