swift-async-algorithms icon indicating copy to clipboard operation
swift-async-algorithms copied to clipboard

Multi-consumption of an `AsyncSequence`

Open ABridoux opened this issue 2 years ago • 18 comments

Hi, I have been reading the documentation for this repository and it left me wondering: is this planned to introduce a feature to allow several tasks to consume the same AsyncSequence?

If this is not the right place to offer ideas, please let me know. I don't think this would fit in a proposal.

Context

I recently had to stream a file using URLSession and two pieces of code were interested in the streamed values. Rather than starting a stream twice which would not be efficient, I wanted to allow the stream to be consumed by several tasks. The data that were already streamed would be sent when a new consumption is set up.

Basic implementation

I tried to implement such a solution (in a gist) that will use a reduce function on the already emitted values and emit the result when a new task starts consuming the sequence. It's far from being perfect but I think it might help understanding the idea.

Implementation

struct ReducedReplayAsyncStream<Element> {

    typealias Reduce = (_ partialResult: inout Element, _ nextResult: Element) -> Void

    private let storage: _Storage
    private var originalStream: AsyncStream<Element>

    init(
        bufferingPolicy limit: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded,
        initialResult: Element,
        reduce: @escaping Reduce,
        build: (AsyncStream<Element>.Continuation) -> Void
    ) {
        originalStream = AsyncStream(Element.self, bufferingPolicy: limit, build)
        storage = _Storage(stored: initialResult, reduce: reduce)
    }

    private func makeStream() -> AsyncStream<Element> {
        AsyncStream<Element> { continuation in
            Task {
                var isFirst = false
                if await !storage.didStart {
                    await storage.setDidStart(true)
                    isFirst = true
                    startConsumingOriginalStream()
                }

                if !isFirst {
                    await continuation.yield(storage.stored)
                }
                await storage.appendContinuation(continuation)
            }
        }
    }

    private func startConsumingOriginalStream () {
        Task {
            for await value in originalStream {
                await storage.updateWith(value: value)
            }
            await storage.continuations.forEach { $0.finish() }
        }
    }
}

extension ReducedReplayAsyncStream {

    private actor _Storage {
        private let reduce: ReducedReplayAsyncStream.Reduce

        var didStart = false
        var stored: Element
        var continuations: [AsyncStream<Element>.Continuation] = []

        init(stored: Element, reduce: @escaping Reduce) {
            self.stored = stored
            self.reduce = reduce
        }

        func updateWith(value: Element) {
            reduce(&stored, value)
            continuations.forEach { $0.yield(value) }
        }

        func setDidStart(_ value: Bool) {
            didStart = value
        }

        func appendContinuation(_ continuation: AsyncStream<Element>.Continuation) {
            continuations.append(continuation)
        }
    }
}

extension ReducedReplayAsyncStream: AsyncSequence {
    typealias AsyncIterator = AsyncStream<Element>.AsyncIterator

    func makeAsyncIterator() -> AsyncIterator {
        let stream = makeStream()
        return stream.makeAsyncIterator()
    }
}
Usage

var subscriptions: Set<AnyCancellable> = []
var continuation: Stream.Continuation!

let replayStream = ReducedReplayAsyncStream<Int>(
    initialResult: 0,
    reduce: { partialResult, nextResult in partialResult = partialResult + nextResult },
    build: { continuation = $0 }
)

var counter = 0
Timer.publish(every: 0.4, on: .main, in: .default)
    .autoconnect()
    .sink { _ in
        if counter == 10 {
            continuation.finish()
        }
        continuation.yield(counter)
        counter += 1
    }
    .store(in: &subscriptions)

Task {
    for await value in replayStream {
        print("[A]", value)
    }
}

Task {
    try await Task.sleep(nanoseconds: 3_000_000_000)
    for await value in replayStream {
        print("[B]", value)
    }
}

Some considerations about efficiency can be found in the gist.

ABridoux avatar Mar 26 '22 10:03 ABridoux

I think this should be a built in feature of async sequences.

When I first started using them, I assumed this would work (coming from RX and combine) but was greeted with errors.

You can have multiple for each loops for regular sequences. So I think semantically it makes sense.

Maybe a SE proposal for this? Rather than it being part of a separate package.

BrentMifsud avatar Mar 26 '22 17:03 BrentMifsud

Here's (gist) my not very Swifty attempt, main thing I was going for was cancelling all the other streams would cancel the original.

justin-foreflight avatar Mar 29 '22 14:03 justin-foreflight

I think solutions in this territory are interesting; The one issue is that more often than not the buffer may be indefinite/unbounded.

I think the uses are perhaps the important point here - I don't think that it is that folks need to replay all values in very distant parts of code execution but instead it feels like there are code locations that have short execution distances from each other that would want to use the values at the same time.

So:

func doThings<Input: AsyncSequence>(_ input: Input) {
  let (sideA, sideB) = input.split() 
  let a = sideA.compactMap { 
    switch $0 {
      case .a(let payload): return payload
      default: return nil
    }
  }
  let b = sideb.compactMap { 
    switch $0 {
      case .b(let payload): return payload
      default: return nil
    }
  }
  Task {
     await doThingWithA(a)
   }
  Task {
     await doThingWithB(b)
   }
}

phausler avatar Mar 29 '22 16:03 phausler

I believe you are right, I cannot think of a use case where all values should be replayed. The feature I wanted to talk about was rather to "jump in the wagon" and start receiving the remaining values of the stream without impacting other consumers. The reduce function would be used for that: to get the "synthesis" of all the previous values before receiving the remaining ones. I think this could be useful when it's not possible to make a new sequence. For instance when it's too expensive like a stream of a large file. Though I agree that this might be too specific to fit in this library.

ABridoux avatar Apr 01 '22 21:04 ABridoux

a stream from a large file should be pretty cheap; AsyncBufferedByteIterator is quite efficient and that type of strategy should be used to stream bytes.

If the rate of one consumer is less than another then there needs to be some sort of buffering or back pressure to accommodate for the slowest consumer. Else-wise there will be dropped values. The problem is knowing the number of consumers ahead of time. If you can know that then you can make back pressure work.

phausler avatar Apr 01 '22 21:04 phausler

@phausler I see that the multi-consumption was already discussed in a thread about AsyncSequence. Especially I wanted to share your words from a response here before asking my question:

Distributors: Sending a value to an AsyncSequence where the send awaits the consumption.

Sharing the iteration in a safe manner across multiple consumers where each consumer gets a distinct value

Sharing the values from iteration among a known number of consumers

That's exactly what I meant in the first place (sorry if that was unclear). A way for multiple consumers to receive values emitted by a single sequence. Very much like a publisher-subscribers relationship.

Is this this planned to add such a feature? When I experimented with AsyncChannel it was not possible to consume the sequence in two different places:

func testAsyncChannel() {
    let channel = AsyncChannel<Int>()

    Task {
        await channel.send(1)
        await channel.send(2)
    }

    Task {
        for await value in channel {
            print("A", value)
        }
    }

    Task {
        for await value in channel {
            print("B", value)
        }
    }
}
// prints:
// A 1
// B 2

I have got some uses cases were it would be useful to have this logic available. For instance with a re-implementation of @Published. Maybe I am biased by my usage of Combine, but I think this could be really useful to have such a property wrapper that will emit modifications of a value though an AsyncSequence. The only solution I can think of currently would be to create a new AsyncStream each time a consumer is interested and to store the associated continuations to forward the emitted values of the original stream.

ABridoux avatar Apr 17 '22 20:04 ABridoux

There are some unanswered questions with regards to sharing behaviors. For example; is it the best route to go having it be an "operator" style API? Combine takes the approach of multicast+autoconnect, does that make sense? or does some other composed behavior split make more sense? How can we make the api more approachable so that it is clear on when you need it? The list goes on.

Personally I think the multicast+autoconnect is a bit complex of a concept, and instead breaking it up into concepts of sharing iteration or sharing values seems to me more approachable if you are not experienced with functional reactive programming.

I also contend that splitting into a known number of replications can allow us to make some really nice behavioral aspects with regards to cancellation.

The general role that share plays to me seems quite useful and needed, just not the forefront of need.

The property wrapper thing brings up a ton of other questions; namely of which are more aimed at groups working at Apple on SwiftUI to answer. Since in truth, even though @Published may live in Combine, it is really more of a SwiftUI type in practice.

phausler avatar Apr 18 '22 14:04 phausler

I understand now that AsyncSequence approach is quite different from the Combine publishers. All your points make sense. I am left wondering if the best solution for now is to create a new sequence each time a consumer is interested. This might be expensive in some scenarios when creating a new sequence require some heavy work.

Regarding the @Published to be honest I use it in many other places than with SwiftUI. Especially, I find it useful for testing purposes to easily observe if a value has changed or a function was called with the proper input. But it might not be the right approach with AsyncSequence indeed (I'd rather use something like a custom @AsyncNext property wrapper to observe the next value).

Thank you a lot for taking the time to answer my questions. If that's ok I'll close the issue in a few days if no comments are added.

ABridoux avatar Apr 19 '22 10:04 ABridoux

Perhaps the parts that we can help identify here are: what pieces do we need to build such a thing? And what are it's restrictions?

First and foremost: any such property wrapper would have a couple of characteristics. It would effectively always be a trigger on didSet because it is awaited. The setter currently cannot be async so we need to address back pressure (e.g. what happens if no one is currently awaiting a value?) To the point of the last question: should it even be settable beyond the first value? What happens if more than one interested party is awaiting? What does the projection of the property do? is it just an AsyncSequence?

Out of those questions; what parts can we build independently to make constructing that thing easy/safe? Just because it incubates in this project does not mean that it must always stay here - perhaps something of useful importance to the rest of the language might be able to transition into the swift concurrency library.

phausler avatar Apr 19 '22 16:04 phausler

One idea that I had a bit ago is to do something like this:

@propertyWrapper
public struct ThisDeservesABetterName<Element> {
  public private(set) var wrappedValue: Element
  let channel = AsyncChannel<Element>()
  
  public init(wrappedValue: Element) {
    self.wrappedValue = wrappedValue
  }
  
  public struct Projection: AsyncSequence {
    public struct Iterator: AsyncIteratorProtocol {
      var iterator: AsyncChannel<Element>.Iterator
      
      public mutating func next() async -> Element? {
        return await iterator.next()
      }
    }
    
    var value: Element
    let channel: AsyncChannel<Element>
    
    public mutating func setValue(_ value: Element) async {
      self.value = value
      await channel.send(value)
    }
    
    public func makeAsyncIterator() -> Iterator {
      Iterator(iterator: channel.makeAsyncIterator())
    }
  }
  
  public var projectedValue: Projection {
    get {
      Projection(value: wrappedValue, channel: channel)
    }
    set {
      wrappedValue = newValue.value
    }
  }
}

phausler avatar Apr 19 '22 17:04 phausler

I think there are some expectations that may not meet.

It should use back pressure not buffering: e.g. the next item should await all consumption of previous next calls to be done.

It should rethrow failures if and only if the base throws.

phausler avatar May 01 '22 21:05 phausler

Would love to have a built-in type to do this. We use Combine for some services that are shared throughout the app as well as view model outputs. I'm hoping to replace these with AsyncSequences in some way so we can enter into the Swift Concurrency world at the view level instead of views only seeing a Combine interface. The ability to have multiple consumers of a single AsyncSequence and access to the last sent value is a must-have.

nickruddeni avatar Jun 08 '22 12:06 nickruddeni

I agree that this is an important feature to iterate on (pun most assuredly intended). However the battle that we must face is that share in Combine feels a bit abject and mysterious some times. Some folks find it hard to understand why/when it is needed.

If we can somehow leverage things in the language that help to that end it would go a long way to making it more of a concrete item. Even though personally I find a share or split operator concept for AsyncSequence meaningful and useful, other folks are not yet convinced.

phausler avatar Jun 08 '22 15:06 phausler

Was trying to implement one of these behaviors myself (one sequence's elements consumed once each by multiple concurrent consumers) in this thread and got sent here to make a feature request, so

+1

KeithBauerANZ avatar Apr 20 '23 22:04 KeithBauerANZ

Any updates on this? We need the broadcasting behaviour in our app and currently rely on Asynchrone's SharedAsyncSequence.

Would be nice to see an "official solution".

bspinner avatar Sep 26 '23 14:09 bspinner

+1 on this

mantuness avatar Dec 14 '23 14:12 mantuness

+1

nonameplum avatar Feb 23 '24 10:02 nonameplum

Just make a channel per client, have the server loop the main sequence and send to all client channels.

malhal avatar May 09 '24 21:05 malhal