AsyncExtensions icon indicating copy to clipboard operation
AsyncExtensions copied to clipboard

Youre missing async sequence builder like kotlin's

Open ursusursus opened this issue 2 years ago • 4 comments

I'm a kotlin developer familiar with kotlin coroutines and Flow, which is direct mapping to async/await + AsyncSequence.

The most powerful thing that kotlin flow has is the builder, which allows you to emit values arbitrarily and use all the async stuff since the closure is async, like so (I'll write it in async/await syntax so you understand better)

flow { emitter in
   await emitter.emit(1)
   await Task.sleep(1_000)
   await emitter.emit(10)
   await Task.sleep(1_000)
   for 0...10 {
      await emitter.emit(100)
   }
}

when the closure returns, the flow terminates, if you throw inside the closure, then error is propagated as usual

It allow you to then create arbitrary custom operators, or simply way to pipe async function into AsyncSequence like

asyncSequence {
   await someFunction()
}
.flatMapLatest { ... }
.collect { ... }

this is very needed

ursusursus avatar Jul 10 '22 01:07 ursusursus

Swift has built-in function

AsyncThrowingStream { continuation in        

    Task {
       continuation.yield(data) }  
       await ...
       continuation.finish(throwing: nil)
       continuation.finish(throwing: error)
 }

hoc081098 avatar Jul 11 '22 19:07 hoc081098

Hm, but is that not leaky? The task will continue after callsite task gets canceled, no?

ursusursus avatar Jul 11 '22 20:07 ursusursus

How does this look?

extension AsyncThrowingStream {
    public struct Emitter {
        let continuation: Continuation

        public func emit(_ value: Element) {
            continuation.yield(value)
        }
    }

    public init(body: @escaping (Emitter) async throws -> ()) where Failure == Error {
        self.init { continuation in
            let task = Task {
                do {
                    let emitter = Emitter(continuation: continuation)
                    try await body(emitter)
                    continuation.finish()
                } catch {
                    continuation.finish(throwing: error)
                }
            }
            continuation.onTermination = { @Sendable termination in
                switch termination {
                case .cancelled:
                    task.cancel()
                default:
                    break
                }
            }
        }
    }
}
let source = AsyncThrowingStream<String, Error> { emitter in
    try await Task.sleep(nanoseconds: 1_000_000_000)
    emitter.emit("A")

    try await Task.sleep(nanoseconds: 1_000_000_000)
    emitter.emit("B")
}

ursusursus avatar Jul 11 '22 22:07 ursusursus

I'd like something similar too. The implementation above looks good to me.

natario1 avatar Sep 14 '22 10:09 natario1