AsyncEx icon indicating copy to clipboard operation
AsyncEx copied to clipboard

AsyncCollection Take calls not resolved in order

Open alexl0gan opened this issue 6 years ago • 6 comments

I have the following potential scenario where calls to Take are made before there is data in the collection.

var collection = new AsyncCollection<string>();

collection.Add("a");
var a = collection.TakeAsync();
var b = collection.TakeAsync();
var c = collection.TakeAsync();
collection.Add("b");
var d = collection.TakeAsync();
collection.Add("c");
collection.Add("d");
collection.Add("e");
var e = collection.TakeAsync();
Assert.AreEqual("a", a.Result);
Assert.AreEqual("b", b.Result);
Assert.AreEqual("c", c.Result);
Assert.AreEqual("d", d.Result);
Assert.AreEqual("e", e.Result);

Should I even expect this to work? The above test fails and the output seems quite random.

 a - a
 b - c
 c - e
 d - b
 e - d

 a - a
 b - e
 c - d
 d - b
 e - c

a - a
 b - c
 c - d
 d - b
 e - e

How does AsyncCollection decide which calls to Take to resolve?

alexl0gan avatar Feb 22 '19 14:02 alexl0gan

I'll take a look at this.

AsyncCollection<T> uses a ConcurrentQueue<T> by default, which should be FIFO. The tasks are kept in an async wait queue, which is based on a deque and should be FIFO. That said, there may be edge cases where the FIFO falls over.

StephenCleary avatar Feb 22 '19 14:02 StephenCleary

Ah, of course. When many items are added simultaneously, they can cause a race for those tasks to retrieve the items. So when adding n items within a short time frame, the first n tasks will be released and may take the "wrong" item.

It may be possible to fix this by using an explicit queue. I'll have to dig further.

StephenCleary avatar Feb 25 '19 01:02 StephenCleary

This is turning out to be fairly complex. In the meantime, you can use AsyncProducerConsumerQueue<T>.

StephenCleary avatar Feb 25 '19 04:02 StephenCleary

The basic approach doesn't work to preserve order. The fact that the multiple TakeAsync calls are concurrent makes the results unpredictable.

For comparison, this test also fails, concurrently calling Take on BlockingCollection<T>:

[Fact]
public async Task BlockingCollectionCompletesInOrder()
{
    var collection = new BlockingCollection<string>();

    collection.Add("a");
    var a = Task.Run(() => collection.Take());
    while (a.Status != TaskStatus.Running) ;
    var b = Task.Run(() => collection.Take());
    while (b.Status != TaskStatus.Running) ;
    var c = Task.Run(() => collection.Take());
    while (c.Status != TaskStatus.Running) ;
    var d = Task.Run(() => collection.Take());
    while (d.Status != TaskStatus.Running) ;
    var e = Task.Run(() => collection.Take());
    while (e.Status != TaskStatus.Running) ;
    collection.Add("b");
    collection.Add("c");
    collection.Add("d");
    collection.Add("e");
    Assert.Equal("a", await a);
    Assert.Equal("b", await b);
    Assert.Equal("c", await c);
    Assert.Equal("d", await d);
    Assert.Equal("e", await e);
}

So I think it's fair that AsyncCollection<T> has the same behavior.

What is the actual problem you're trying to solve? It sounds like you have multiple consumers but that they are somehow dependent on which items they receive?

StephenCleary avatar Feb 25 '19 14:02 StephenCleary

With the code in the "one op at a time" branch, this test passes:

[Fact]
public async Task CompleteInOrder()
{
    var collection = new AsyncCollection<string>();
    var queue = new AsyncInvokeQueue<string>(collection.TakeAsync);

    collection.Add("a");
    var a = queue.InvokeAsync();
    var b = queue.InvokeAsync();
    var c = queue.InvokeAsync();
    collection.Add("b");
    var d = queue.InvokeAsync();
    collection.Add("c");
    collection.Add("d");
    collection.Add("e");
    var e = queue.InvokeAsync();
    Assert.Equal("a", await a);
    Assert.Equal("b", await b);
    Assert.Equal("c", await c);
    Assert.Equal("d", await d);
    Assert.Equal("e", await e);
}

I'm still on the fence of whether this behavior should be internal to AsyncCollection<T>. Since they are concurrent calls, the out-of-order responses are consistent with BlockingCollection<T>, but the behavior is noticeably different than other "async queues" like AsyncProducerConsumerCollection<T>, but really that's just due to the implementation detail that APCC happens to use its waiting queue as its negative-data storage (with new items never entering its data structure), while AC has to run its new items through its data structure. #random-stream-of-consciousness

So I'm leaning towards keeping AC semantics as-is, but also adding an AsyncInvocationQueue so that ordering can be preserved if desired. Still haven't quite decided, though; still ruminating.

StephenCleary avatar Feb 28 '19 16:02 StephenCleary

Thanks for looking at this Stephen,

This example was based on similar functionality we have in corresponding python/java libraries.

I followed your suggestion and switched to an AsyncProducerConsumerQueue<T> which is working fine.

alexl0gan avatar Mar 01 '19 16:03 alexl0gan