swift-async-algorithms
swift-async-algorithms copied to clipboard
Subject equivalent?
We need a way to imperatively pipe events into a AsyncSequence same asi Subjects from Combine did, or Kotlin's MutableState/SharedFlow
AsynchChannel feels like a low level primitive to be used for state tracking the way CurrentValueSubject was
This would be a nice addition as it's a tool vastly used in Rx as well (BehaviorSubjects, PublishSubjects, BehaviorRelays, PublishRelays etc...) I guess for the time being we could build one using continuations with something in this flavour.
class BehaviorRelay<V: Sendable> {
init(_ initialValue: V) {
self.currentValue = initialValue
self.continuations = []
}
var currentValue: V
var continuations: [AsyncStream<V>.Continuation]
let lock = NSLock()
func accept(newValue: V) {
lock.lock()
defer { lock.unlock() }
currentValue = newValue
continuations.forEach { continuation in
continuation.yield(newValue)
}
}
func makeStream() -> AsyncStream<V> {
AsyncStream { continuation in
lock.lock()
defer { lock.unlock() }
continuation.yield(currentValue)
continuations.append(continuation)
}
}
}
If the Relay itself would be a asyncsequence, would be great
Any news on integrating a subject equivalent in the lib at this time ? @phausler maybe ^^
AsyncChannel serves this purpose to some extent (it is not a current value but a pass through subject style behavior with back pressure)
The thing we are still missing is a multicast
or share
algorithm. Right now the focus, as @phausler pointed out in some other issues, is to the get the set of algos that are in here in a stable state and through evolution.
I put up a draft PR https://github.com/apple/swift-async-algorithms/pull/208 and am hoping to fill out all the placeholders with a more polished version of the approach I'm using here where I am wrapping an AsyncStream and hosting it in a Task to ensure sequencing (relative to caller) https://github.com/rustle/TaskHostedAsyncSequence/tree/main/Sources/TaskHostedAsyncSequence. Would love to collaborate with any of y'all on this.
Pushed up a working (so far) AsyncSubject. I'll work on AsyncThrowingSubject next. Then I'll flesh out tests for both.
Hi @rustle
I allow my self to post a comment here since I’ve made the same kind of implementation in a pitch on the forum -> https://forums.swift.org/t/pitch-async-buffered-channel/59854
The outcome seemed to be that it was too close to AsyncStream and that having a factory function like https://github.com/sideeffect-io/AsyncExtensions/blob/main/Sources/Creators/AsyncStream%2BPipe.swift should be enough (and could be pitched to the standard library).
Hi @rustle
I allow my self to post a comment here since I’ve made the same kind of implementation in a pitch on the forum -> https://forums.swift.org/t/pitch-async-buffered-channel/59854
The outcome seemed to be that it was too close to AsyncStream and that having a factory function like https://github.com/sideeffect-io/AsyncExtensions/blob/main/Sources/Creators/AsyncStream%2BPipe.swift should be enough (and could be pitched to the standard library).
Cool. I've seen a few versions floating around and will definitely check out yours.
I would warmly recommend considering adopting the semantics of ReactiveSwift's Property
/MutableProperty
here, which I found vastly superior to anything I've seen in Rx or Combine.
Basically, a Property
has a value
, which in the case of MutableProperty
can also be explicitly set. However, Property
itself offers all the stream operators like map
and combineLatest
and so on, and those return a new Property
.
This means that you can do things like:
let a = MutableProperty(0)
let b = MutableProperty(0)
let c: Property<Int> = Property.combineLatest(a, b).map { $0 + $1 }
print("\(c.value)") // Prints 0.
a.value = 1
b.value = 2
print("\(c.value)") // Prints 3.
While also allowing you to stream the values of all of a, b and c as they update. This allows you to mix and match reactive and imperative programming as you prefer, which is massively convenient in lots of situations.
I've been experimenting with this AsyncSubject from ConcurrencyPlus, which essentially is a convenience wrapper for this annoyance:
var continuation: AsyncStream<Int>.Continuation!
let stream = AsyncStream<Int> {
continuation = $0
}
When using it I've noticed a difference in behaviour in combineLatest compared to Combine's and currently debugging where the problem lies. The problem is if combining 2 streams and multiple values are sent to the first stream, when the second stream receives its first value the for await loops for all the previous first stream values instead of just once for the latest pair.
Edit: its because the linked AsyncSubject uses AsyncStream that buffers, even setting .bufferingNewest(0)
or .bufferingOldest(0)
didn't fix it. AsyncChannel also buffers. I think I need something equivalent to Combine's PassthroughSubject
. Edit: I was wrong, I don't, using AsyncStream requires designing pipelines backwards compared to Combine.
Here is my attempt at an AsyncCurrentValueSubject loosely based on code found in this repository:
public final class AsyncCurrentValueSubject<Element: Sendable>: AsyncSequence, Sendable {
// MARK: - AsyncSequence
public struct Iterator: AsyncIteratorProtocol, @unchecked Sendable {
private let id: UInt64
private let subject: AsyncCurrentValueSubject<Element>
private var finished: Bool = false
fileprivate init(id: UInt64, subject: AsyncCurrentValueSubject<Element>) {
self.id = id
self.subject = subject
}
public mutating func next() async -> Element? {
if finished {
return nil
}
guard let element = await subject.next(id: id) else {
finished = true
return nil
}
return element
}
}
public func makeAsyncIterator() -> Iterator {
return Iterator(id: generateId(), subject: self)
}
// MARK: - Public interface
public init(_ element: Element) {
self.state = .init(.iterating(element: element, updated: [], suspended: [:], cancelled: []))
}
public func send(_ element: Element) {
for continuation in stateMachineSend(element: element) {
continuation.resume(returning: element)
}
}
public func finish() {
for continuation in stateMachineFinish() {
continuation.resume(returning: nil)
}
}
// MARK: - Implementation details
private let ids = ManagedCriticalState<UInt64>(0)
private func generateId() -> UInt64 {
return ids.withCriticalRegion { nextId in
defer { nextId &+= 1 }
return nextId
}
}
fileprivate enum State {
case finished
case iterating(element: Element, updated: Set<UInt64>, suspended: [UInt64 : UnsafeContinuation<Element?, Never>], cancelled: Set<UInt64>)
}
private let state: ManagedCriticalState<State>
private func next(id: UInt64) async -> Element? {
let (shouldReturn, element) = stateMachineNextImmediate(id: id)
if shouldReturn {
return element
}
return await withTaskCancellationHandler {
await withUnsafeContinuation { continuation in
let (continuation, element) = stateMachineNextSuspended(id: id, continuation: continuation)
continuation?.resume(returning: element)
}
} onCancel: {
cancel(id: id)
}
}
private func cancel(id: UInt64) {
let continuation = stateMachineCancel(id: id)
continuation?.resume(returning: nil)
}
private func stateMachineSend(element: Element) -> [UnsafeContinuation<Element?, Never>] {
return state.withCriticalRegion { state -> [UnsafeContinuation<Element?, Never>] in
switch state {
case .finished:
return []
case .iterating(_, _, var suspended, let cancelled):
let suspendedIds = Set(suspended.keys)
let suspendedContinuations = Array(suspended.values)
suspended.removeAll()
state = .iterating(element: element, updated: suspendedIds, suspended: suspended, cancelled: cancelled)
return suspendedContinuations
}
}
}
private func stateMachineFinish() -> [UnsafeContinuation<Element?, Never>] {
return state.withCriticalRegion { state -> [UnsafeContinuation<Element?, Never>] in
switch state {
case .finished:
return []
case .iterating(_, _, let suspended, _):
let suspendedContinuations = Array(suspended.values)
state = .finished
return suspendedContinuations
}
}
}
private func stateMachineNextImmediate(id: UInt64) -> (shouldReturn: Bool, element: Element?) {
return state.withCriticalRegion { state -> (Bool, Element?) in
switch state {
case .finished:
return (true, nil)
case .iterating(let element, var updated, let suspended, var cancelled):
precondition(suspended[id] == nil)
let suspendedIds = Set(suspended.keys)
precondition(updated.intersection(suspendedIds).isEmpty)
precondition(updated.intersection(cancelled).isEmpty)
precondition(suspendedIds.intersection(cancelled).isEmpty)
if let _ = cancelled.remove(id) {
state = .iterating(element: element, updated: updated, suspended: suspended, cancelled: cancelled)
return (true, nil)
}
else if updated.contains(id) {
return (false, nil)
}
else {
updated.insert(id)
state = .iterating(element: element, updated: updated, suspended: suspended, cancelled: cancelled)
return (true, element)
}
}
}
}
private func stateMachineNextSuspended(id: UInt64, continuation: UnsafeContinuation<Element?, Never>) -> (UnsafeContinuation<Element?, Never>?, Element?) {
return state.withCriticalRegion { state -> (UnsafeContinuation<Element?, Never>?, Element?) in
switch state {
case .finished:
return (continuation, nil)
case .iterating(let element, var updated, var suspended, var cancelled):
precondition(suspended[id] == nil)
let suspendedIds = Set(suspended.keys)
precondition(updated.intersection(suspendedIds).isEmpty)
precondition(updated.intersection(cancelled).isEmpty)
precondition(suspendedIds.intersection(cancelled).isEmpty)
if let _ = cancelled.remove(id) {
state = .iterating(element: element, updated: updated, suspended: suspended, cancelled: cancelled)
return (continuation, nil)
}
else if let _ = updated.remove(id) {
suspended[id] = continuation
state = .iterating(element: element, updated: updated, suspended: suspended, cancelled: cancelled)
return (nil, nil)
}
else {
updated.insert(id)
state = .iterating(element: element, updated: updated, suspended: suspended, cancelled: cancelled)
return (continuation, element)
}
}
}
}
private func stateMachineCancel(id: UInt64) -> UnsafeContinuation<Element?, Never>? {
return state.withCriticalRegion { state -> UnsafeContinuation<Element?, Never>? in
switch state {
case .finished:
// finished before cancelled
return nil
case .iterating(let element, var updated, var suspended, var cancelled):
precondition(!cancelled.contains(id))
let suspendedIds = Set(suspended.keys)
precondition(updated.intersection(suspendedIds).isEmpty)
precondition(updated.intersection(cancelled).isEmpty)
precondition(suspendedIds.intersection(cancelled).isEmpty)
if let _ = updated.remove(id) {
cancelled.insert(id)
state = .iterating(element: element, updated: updated, suspended: suspended, cancelled: cancelled)
return nil
}
else if let continuation = suspended.removeValue(forKey: id) {
state = .iterating(element: element, updated: updated, suspended: suspended, cancelled: cancelled)
return continuation
}
else {
cancelled.insert(id)
state = .iterating(element: element, updated: updated, suspended: suspended, cancelled: cancelled)
return nil
}
}
}
}
}
I'm curious. Is async-algorithms is a replacement of Combine as reactive programming? Is that why Subject equivalent is needed while Combine already provides them? Or is it unnecessary as concurrency programming library because Subject is concept of reactive programming?
After using it for a while I've learned it's more like the opposite of Combine. Rather than receive publishers from different places and combine them, instead you just write the code procedurally like normal.
AsyncSequence
the primary building block of this package is using a pull based eventing approach whereas Combine
uses a push based model. The other difference here is that AsyncSequence
are a language level feature and have first class support via for await in
. On the larger topic of Subject
like types, there is definitely room for more algorithms in this library to make it possible to model multicast AsyncSequence
. In fact, there have been a few different test in various PRs already though the pull based model makes this slightly more interesting to implement and there are a few open implementation questions.
Combine uses a push based model.
No; this is not true - Combine uses a demand based model; everything is pull based.
Combine uses a push based model.
No; this is not true - Combine uses a demand based model; everything is pull based.
I was talking about the mechanics how elements are delivered. You are right that demand is pull based in the end.
I'm curious. Is async-algorithms is a replacement of Combine as reactive programming? Is that why Subject equivalent is needed while Combine already provides them? Or is it unnecessary as concurrency programming library because Subject is concept of reactive programming?
Apple decided to break all of the nice combine features with their @Observable macro.
So now if you want to design a viewModel and pipe values, you need to use the old process of inheriting classes with @ObservableObject.
The problem is that, using the @Observable macro, you cannot do data bindings. Which makes life a complex hell when working with state to view bindings from external sources.
**WORKS **
class AClass: ObservableObject {
...
@Published var aVariable: Bool
...
aPublisher
.assign(to: &$aVariable)
}
DOES NOT WORK
@Observable class AClass {
...
var aVariable: Bool
...
aPublisher
.assign(to: &$aVariable)
}
Are there plans to get this Subject equivalent to work?