Asynchrone
Asynchrone copied to clipboard
CombineLatest does not combine all values
Hello!
First: thanks for your library.
Coming from Combine, we've stumbled upon some unexpected behaviour when using CombineLatest
:
StreamA's output is only processed if StreamB also provides further output.
Or if streams finish.
Expected behaviour: Each output of both streams gets combined with each other as soon as output from either stream arrives.
Code for issue reproduction
import Asynchrone
var continuationA: AsyncStream<Int>.Continuation! = nil
var continuationB: AsyncStream<Int>.Continuation! = nil
let streamA = AsyncStream(/*bufferingPolicy: .bufferingNewest(1)*/) { continuation in
continuationA = continuation
}
let streamB = AsyncStream(/*bufferingPolicy: .bufferingNewest(1)*/) { continuation in
continuationB = continuation
}
let x = Task.detached(priority: .background) {
print("START")
for try await value in streamA.combineLatest(streamB) {
print(value)
}
print("STOP")
}
Task.detached(priority: .userInitiated) {
try! await Task.sleep(for: .seconds(1))
print("A => 10")
await continuationA.yield(10)
print("B => 1")
await continuationB.yield(1)
print("A => 20")
await continuationA.yield(20)
print("A => 30")
await continuationA.yield(30)
print("A => 40")
await continuationA.yield(40)
print("B => 2")
await continuationB.yield(2)
print("B => 3")
await continuationB.yield(3)
print("A => 50")
await continuationA.yield(50)
print("B => 4")
await continuationB.yield(4)
print("A => finish")
await continuationA.finish()
print("B => finish")
await continuationB.finish()
}
try! await x.value
Program's output
START
A => 10
B => 1
A => 20
(10, 1)
A => 30
A => 40
B => 2
B => 3
(20, 2)
A => 50
(30, 3)
B => 4
A => finish
(40, 4)
B => finish
(50, 4)
STOP
Expected output
START
A => 10
B => 1
A => 20
(10, 1)
(20, 1)
A => 30
(30, 1)
A => 40
(40, 1)
B => 2
(40, 2)
B => 3
(40, 3)
A => 50
(50, 3)
B => 4
(50, 4)
A => finish
B => finish
STOP