Asynchrone icon indicating copy to clipboard operation
Asynchrone copied to clipboard

CombineLatest does not combine all values

Open bspinner opened this issue 1 year ago • 0 comments

Hello!

First: thanks for your library.

Coming from Combine, we've stumbled upon some unexpected behaviour when using CombineLatest: StreamA's output is only processed if StreamB also provides further output. Or if streams finish.

Expected behaviour: Each output of both streams gets combined with each other as soon as output from either stream arrives.

Code for issue reproduction

import Asynchrone

var continuationA: AsyncStream<Int>.Continuation! = nil
var continuationB: AsyncStream<Int>.Continuation! = nil
let streamA = AsyncStream(/*bufferingPolicy: .bufferingNewest(1)*/) { continuation in
    continuationA = continuation
}

let streamB = AsyncStream(/*bufferingPolicy: .bufferingNewest(1)*/) { continuation in
    continuationB = continuation
}

let x = Task.detached(priority: .background) {
    print("START")
    for try await value in streamA.combineLatest(streamB) {
        print(value)
    }
    print("STOP")
}

Task.detached(priority: .userInitiated) {
    try! await Task.sleep(for: .seconds(1))
    
    print("A => 10")
    await continuationA.yield(10)
    
    print("B => 1")
    await continuationB.yield(1)
    
    print("A => 20")
    await continuationA.yield(20)
    print("A => 30")
    await continuationA.yield(30)
    print("A => 40")
    await continuationA.yield(40)
    
    print("B => 2")
    await continuationB.yield(2)
    
    print("B => 3")
    await continuationB.yield(3)

    print("A => 50")
    await continuationA.yield(50)
    
    print("B => 4")
    await continuationB.yield(4)
    
    print("A => finish")
    await continuationA.finish()
    
    print("B => finish")
    await continuationB.finish()
}

try! await x.value

Program's output

START
A => 10
B => 1
A => 20
(10, 1)
A => 30
A => 40
B => 2
B => 3
(20, 2)
A => 50
(30, 3)
B => 4
A => finish
(40, 4)
B => finish
(50, 4)
STOP

Expected output

START
A => 10
B => 1
A => 20
(10, 1)
(20, 1)
A => 30
(30, 1)
A => 40
(40, 1)
B => 2
(40, 2)
B => 3
(40, 3)
A => 50
(50, 3)
B => 4
(50, 4)
A => finish
B => finish
STOP

bspinner avatar Sep 26 '23 14:09 bspinner