swift-async-algorithms icon indicating copy to clipboard operation
swift-async-algorithms copied to clipboard

Subject equivalent?

Open ursusursus opened this issue 2 years ago • 18 comments

We need a way to imperatively pipe events into a AsyncSequence same asi Subjects from Combine did, or Kotlin's MutableState/SharedFlow

AsynchChannel feels like a low level primitive to be used for state tracking the way CurrentValueSubject was

ursusursus avatar Jul 10 '22 18:07 ursusursus

This would be a nice addition as it's a tool vastly used in Rx as well (BehaviorSubjects, PublishSubjects, BehaviorRelays, PublishRelays etc...) I guess for the time being we could build one using continuations with something in this flavour.

class BehaviorRelay<V: Sendable> {
    
    init(_ initialValue: V) {
        self.currentValue = initialValue
        self.continuations = []
    }
    
    var currentValue: V
    var continuations: [AsyncStream<V>.Continuation]
    
    let lock = NSLock()
    
    func accept(newValue: V) {
        lock.lock()
        defer { lock.unlock() }
        currentValue = newValue
        continuations.forEach { continuation in
            continuation.yield(newValue)
        }
    }
    
    func makeStream() -> AsyncStream<V> {
        AsyncStream { continuation in
            lock.lock()
            defer { lock.unlock() }
            continuation.yield(currentValue)
            continuations.append(continuation)
        }
    }
}

bioche avatar Jul 14 '22 22:07 bioche

If the Relay itself would be a asyncsequence, would be great

ursusursus avatar Jul 14 '22 22:07 ursusursus

Any news on integrating a subject equivalent in the lib at this time ? @phausler maybe ^^

bioche avatar Oct 09 '22 14:10 bioche

AsyncChannel serves this purpose to some extent (it is not a current value but a pass through subject style behavior with back pressure)

phausler avatar Oct 09 '22 21:10 phausler

The thing we are still missing is a multicast or share algorithm. Right now the focus, as @phausler pointed out in some other issues, is to the get the set of algos that are in here in a stable state and through evolution.

FranzBusch avatar Oct 10 '22 09:10 FranzBusch

I put up a draft PR https://github.com/apple/swift-async-algorithms/pull/208 and am hoping to fill out all the placeholders with a more polished version of the approach I'm using here where I am wrapping an AsyncStream and hosting it in a Task to ensure sequencing (relative to caller) https://github.com/rustle/TaskHostedAsyncSequence/tree/main/Sources/TaskHostedAsyncSequence. Would love to collaborate with any of y'all on this.

rustle avatar Oct 10 '22 17:10 rustle

Pushed up a working (so far) AsyncSubject. I'll work on AsyncThrowingSubject next. Then I'll flesh out tests for both.

rustle avatar Oct 11 '22 15:10 rustle

Hi @rustle

I allow my self to post a comment here since I’ve made the same kind of implementation in a pitch on the forum -> https://forums.swift.org/t/pitch-async-buffered-channel/59854

The outcome seemed to be that it was too close to AsyncStream and that having a factory function like https://github.com/sideeffect-io/AsyncExtensions/blob/main/Sources/Creators/AsyncStream%2BPipe.swift should be enough (and could be pitched to the standard library).

twittemb avatar Oct 11 '22 15:10 twittemb

Hi @rustle

I allow my self to post a comment here since I’ve made the same kind of implementation in a pitch on the forum -> https://forums.swift.org/t/pitch-async-buffered-channel/59854

The outcome seemed to be that it was too close to AsyncStream and that having a factory function like https://github.com/sideeffect-io/AsyncExtensions/blob/main/Sources/Creators/AsyncStream%2BPipe.swift should be enough (and could be pitched to the standard library).

Cool. I've seen a few versions floating around and will definitely check out yours.

rustle avatar Oct 11 '22 17:10 rustle

I would warmly recommend considering adopting the semantics of ReactiveSwift's Property/MutableProperty here, which I found vastly superior to anything I've seen in Rx or Combine.

Basically, a Property has a value, which in the case of MutableProperty can also be explicitly set. However, Property itself offers all the stream operators like map and combineLatest and so on, and those return a new Property.

This means that you can do things like:

let a = MutableProperty(0)
let b = MutableProperty(0)
let c: Property<Int> = Property.combineLatest(a, b).map { $0 + $1 }

print("\(c.value)") // Prints 0.

a.value = 1
b.value = 2

print("\(c.value)") // Prints 3.

While also allowing you to stream the values of all of a, b and c as they update. This allows you to mix and match reactive and imperative programming as you prefer, which is massively convenient in lots of situations.

DagAgren avatar Nov 26 '22 16:11 DagAgren

I've been experimenting with this AsyncSubject from ConcurrencyPlus, which essentially is a convenience wrapper for this annoyance:

var continuation: AsyncStream<Int>.Continuation!
let stream = AsyncStream<Int> {
    continuation = $0
}

When using it I've noticed a difference in behaviour in combineLatest compared to Combine's and currently debugging where the problem lies. The problem is if combining 2 streams and multiple values are sent to the first stream, when the second stream receives its first value the for await loops for all the previous first stream values instead of just once for the latest pair.

Edit: its because the linked AsyncSubject uses AsyncStream that buffers, even setting .bufferingNewest(0) or .bufferingOldest(0) didn't fix it. AsyncChannel also buffers. I think I need something equivalent to Combine's PassthroughSubject. Edit: I was wrong, I don't, using AsyncStream requires designing pipelines backwards compared to Combine.

malhal avatar Dec 16 '22 15:12 malhal

Here is my attempt at an AsyncCurrentValueSubject loosely based on code found in this repository:

public final class AsyncCurrentValueSubject<Element: Sendable>: AsyncSequence, Sendable {
    // MARK: - AsyncSequence
    
    public struct Iterator: AsyncIteratorProtocol, @unchecked Sendable {
        private let id: UInt64
        private let subject: AsyncCurrentValueSubject<Element>
        private var finished: Bool = false
        
        fileprivate init(id: UInt64, subject: AsyncCurrentValueSubject<Element>) {
            self.id = id
            self.subject = subject
        }
        
        public mutating func next() async -> Element? {
            if finished {
                return nil
            }
            
            guard let element = await subject.next(id: id) else {
                finished = true
                return nil
            }
            
            return element
        }
    }
    
    public func makeAsyncIterator() -> Iterator {
        return Iterator(id: generateId(), subject: self)
    }
    
    
    // MARK: - Public interface
    
    public init(_ element: Element) {
        self.state = .init(.iterating(element: element, updated: [], suspended: [:], cancelled: []))
    }
    
    public func send(_ element: Element) {
        for continuation in stateMachineSend(element: element) {
            continuation.resume(returning: element)
        }
    }
    
    public func finish() {
        for continuation in stateMachineFinish() {
            continuation.resume(returning: nil)
        }
    }
    
    
    // MARK: - Implementation details
    
    private let ids = ManagedCriticalState<UInt64>(0)
    
    private func generateId() -> UInt64 {
        return ids.withCriticalRegion { nextId in
            defer { nextId &+= 1 }
            return nextId
        }
    }
    
    fileprivate enum State {
        case finished
        case iterating(element: Element, updated: Set<UInt64>, suspended: [UInt64 : UnsafeContinuation<Element?, Never>], cancelled: Set<UInt64>)
    }
    
    private let state: ManagedCriticalState<State>
    
    private func next(id: UInt64) async -> Element? {
        let (shouldReturn, element) = stateMachineNextImmediate(id: id)
        
        if shouldReturn {
            return element
        }
        
        return await withTaskCancellationHandler {
            await withUnsafeContinuation { continuation in
                let (continuation, element) = stateMachineNextSuspended(id: id, continuation: continuation)
                continuation?.resume(returning: element)
            }
        } onCancel: {
            cancel(id: id)
        }
    }
    
    private func cancel(id: UInt64) {
        let continuation = stateMachineCancel(id: id)
        continuation?.resume(returning: nil)
    }
    
    private func stateMachineSend(element: Element) -> [UnsafeContinuation<Element?, Never>] {
        return state.withCriticalRegion { state -> [UnsafeContinuation<Element?, Never>] in
            switch state {
            case .finished:
                return []
                
            case .iterating(_, _, var suspended, let cancelled):
                let suspendedIds = Set(suspended.keys)
                let suspendedContinuations = Array(suspended.values)
                suspended.removeAll()
                
                state = .iterating(element: element, updated: suspendedIds, suspended: suspended, cancelled: cancelled)
                
                return suspendedContinuations
            }
        }
    }
    
    private func stateMachineFinish() -> [UnsafeContinuation<Element?, Never>] {
        return state.withCriticalRegion { state -> [UnsafeContinuation<Element?, Never>] in
            switch state {
            case .finished:
                return []
                
            case .iterating(_, _, let suspended, _):
                let suspendedContinuations = Array(suspended.values)
                
                state = .finished
                
                return suspendedContinuations
            }
        }
    }
    
    private func stateMachineNextImmediate(id: UInt64) -> (shouldReturn: Bool, element: Element?) {
        return state.withCriticalRegion { state -> (Bool, Element?) in
            switch state {
            case .finished:
                return (true, nil)
                
            case .iterating(let element, var updated, let suspended, var cancelled):
                precondition(suspended[id] == nil)
                let suspendedIds = Set(suspended.keys)
                precondition(updated.intersection(suspendedIds).isEmpty)
                precondition(updated.intersection(cancelled).isEmpty)
                precondition(suspendedIds.intersection(cancelled).isEmpty)
                
                if let _ = cancelled.remove(id) {
                    state = .iterating(element: element, updated: updated, suspended: suspended, cancelled: cancelled)
                    
                    return (true, nil)
                }
                else if updated.contains(id) {
                    return (false, nil)
                }
                else {
                    updated.insert(id)
                    
                    state = .iterating(element: element, updated: updated, suspended: suspended, cancelled: cancelled)
                    
                    return (true, element)
                }
            }
        }
    }
    
    private func stateMachineNextSuspended(id: UInt64, continuation: UnsafeContinuation<Element?, Never>) -> (UnsafeContinuation<Element?, Never>?, Element?) {
        return state.withCriticalRegion { state -> (UnsafeContinuation<Element?, Never>?, Element?) in
            switch state {
            case .finished:
                return (continuation, nil)
                
            case .iterating(let element, var updated, var suspended, var cancelled):
                precondition(suspended[id] == nil)
                let suspendedIds = Set(suspended.keys)
                precondition(updated.intersection(suspendedIds).isEmpty)
                precondition(updated.intersection(cancelled).isEmpty)
                precondition(suspendedIds.intersection(cancelled).isEmpty)
                
                if let _ = cancelled.remove(id) {
                    state = .iterating(element: element, updated: updated, suspended: suspended, cancelled: cancelled)
                    
                    return (continuation, nil)
                }
                else if let _ = updated.remove(id) {
                    suspended[id] = continuation
                    
                    state = .iterating(element: element, updated: updated, suspended: suspended, cancelled: cancelled)
                    
                    return (nil, nil)
                }
                else {
                    updated.insert(id)
                    
                    state = .iterating(element: element, updated: updated, suspended: suspended, cancelled: cancelled)
                    
                    return (continuation, element)
                }
            }
        }
    }
    
    private func stateMachineCancel(id: UInt64) -> UnsafeContinuation<Element?, Never>? {
        return state.withCriticalRegion { state -> UnsafeContinuation<Element?, Never>? in
            switch state {
            case .finished:
                // finished before cancelled
                return nil
                
            case .iterating(let element, var updated, var suspended, var cancelled):
                precondition(!cancelled.contains(id))
                let suspendedIds = Set(suspended.keys)
                precondition(updated.intersection(suspendedIds).isEmpty)
                precondition(updated.intersection(cancelled).isEmpty)
                precondition(suspendedIds.intersection(cancelled).isEmpty)

                if let _ = updated.remove(id) {
                    cancelled.insert(id)
                    
                    state = .iterating(element: element, updated: updated, suspended: suspended, cancelled: cancelled)
                    
                    return nil
                }
                else if let continuation = suspended.removeValue(forKey: id) {
                    state = .iterating(element: element, updated: updated, suspended: suspended, cancelled: cancelled)
                    
                    return continuation
                }
                else {
                    cancelled.insert(id)
                    
                    state = .iterating(element: element, updated: updated, suspended: suspended, cancelled: cancelled)
                    
                    return nil
                }
            }
        }
    }
}

justin-foreflight avatar Feb 23 '23 19:02 justin-foreflight

I'm curious. Is async-algorithms is a replacement of Combine as reactive programming? Is that why Subject equivalent is needed while Combine already provides them? Or is it unnecessary as concurrency programming library because Subject is concept of reactive programming?

ingun37 avatar Dec 12 '23 01:12 ingun37

After using it for a while I've learned it's more like the opposite of Combine. Rather than receive publishers from different places and combine them, instead you just write the code procedurally like normal.

malhal avatar Dec 12 '23 14:12 malhal

AsyncSequence the primary building block of this package is using a pull based eventing approach whereas Combine uses a push based model. The other difference here is that AsyncSequence are a language level feature and have first class support via for await in. On the larger topic of Subject like types, there is definitely room for more algorithms in this library to make it possible to model multicast AsyncSequence. In fact, there have been a few different test in various PRs already though the pull based model makes this slightly more interesting to implement and there are a few open implementation questions.

FranzBusch avatar Dec 12 '23 15:12 FranzBusch

Combine uses a push based model.

No; this is not true - Combine uses a demand based model; everything is pull based.

phausler avatar Dec 12 '23 18:12 phausler

Combine uses a push based model.

No; this is not true - Combine uses a demand based model; everything is pull based.

I was talking about the mechanics how elements are delivered. You are right that demand is pull based in the end.

FranzBusch avatar Dec 12 '23 18:12 FranzBusch

I'm curious. Is async-algorithms is a replacement of Combine as reactive programming? Is that why Subject equivalent is needed while Combine already provides them? Or is it unnecessary as concurrency programming library because Subject is concept of reactive programming?

Apple decided to break all of the nice combine features with their @Observable macro.

So now if you want to design a viewModel and pipe values, you need to use the old process of inheriting classes with @ObservableObject.

The problem is that, using the @Observable macro, you cannot do data bindings. Which makes life a complex hell when working with state to view bindings from external sources.

**WORKS **

class AClass: ObservableObject {
...
@Published var aVariable: Bool
...
aPublisher
  .assign(to: &$aVariable)
}

DOES NOT WORK

@Observable class AClass {
...
var aVariable: Bool
...
aPublisher
  .assign(to: &$aVariable)
}

Are there plans to get this Subject equivalent to work?

realityworks avatar Jul 03 '24 03:07 realityworks