AsyncEx
AsyncEx copied to clipboard
AsyncCollection Take calls not resolved in order
I have the following potential scenario where calls to Take are made before there is data in the collection.
var collection = new AsyncCollection<string>();
collection.Add("a");
var a = collection.TakeAsync();
var b = collection.TakeAsync();
var c = collection.TakeAsync();
collection.Add("b");
var d = collection.TakeAsync();
collection.Add("c");
collection.Add("d");
collection.Add("e");
var e = collection.TakeAsync();
Assert.AreEqual("a", a.Result);
Assert.AreEqual("b", b.Result);
Assert.AreEqual("c", c.Result);
Assert.AreEqual("d", d.Result);
Assert.AreEqual("e", e.Result);
Should I even expect this to work? The above test fails and the output seems quite random.
a - a
b - c
c - e
d - b
e - d
a - a
b - e
c - d
d - b
e - c
a - a
b - c
c - d
d - b
e - e
How does AsyncCollection decide which calls to Take to resolve?
I'll take a look at this.
AsyncCollection<T> uses a ConcurrentQueue<T> by default, which should be FIFO. The tasks are kept in an async wait queue, which is based on a deque and should be FIFO. That said, there may be edge cases where the FIFO falls over.
Ah, of course. When many items are added simultaneously, they can cause a race for those tasks to retrieve the items. So when adding n items within a short time frame, the first n tasks will be released and may take the "wrong" item.
It may be possible to fix this by using an explicit queue. I'll have to dig further.
This is turning out to be fairly complex. In the meantime, you can use AsyncProducerConsumerQueue<T>.
The basic approach doesn't work to preserve order. The fact that the multiple TakeAsync calls are concurrent makes the results unpredictable.
For comparison, this test also fails, concurrently calling Take on BlockingCollection<T>:
[Fact]
public async Task BlockingCollectionCompletesInOrder()
{
var collection = new BlockingCollection<string>();
collection.Add("a");
var a = Task.Run(() => collection.Take());
while (a.Status != TaskStatus.Running) ;
var b = Task.Run(() => collection.Take());
while (b.Status != TaskStatus.Running) ;
var c = Task.Run(() => collection.Take());
while (c.Status != TaskStatus.Running) ;
var d = Task.Run(() => collection.Take());
while (d.Status != TaskStatus.Running) ;
var e = Task.Run(() => collection.Take());
while (e.Status != TaskStatus.Running) ;
collection.Add("b");
collection.Add("c");
collection.Add("d");
collection.Add("e");
Assert.Equal("a", await a);
Assert.Equal("b", await b);
Assert.Equal("c", await c);
Assert.Equal("d", await d);
Assert.Equal("e", await e);
}
So I think it's fair that AsyncCollection<T> has the same behavior.
What is the actual problem you're trying to solve? It sounds like you have multiple consumers but that they are somehow dependent on which items they receive?
With the code in the "one op at a time" branch, this test passes:
[Fact]
public async Task CompleteInOrder()
{
var collection = new AsyncCollection<string>();
var queue = new AsyncInvokeQueue<string>(collection.TakeAsync);
collection.Add("a");
var a = queue.InvokeAsync();
var b = queue.InvokeAsync();
var c = queue.InvokeAsync();
collection.Add("b");
var d = queue.InvokeAsync();
collection.Add("c");
collection.Add("d");
collection.Add("e");
var e = queue.InvokeAsync();
Assert.Equal("a", await a);
Assert.Equal("b", await b);
Assert.Equal("c", await c);
Assert.Equal("d", await d);
Assert.Equal("e", await e);
}
I'm still on the fence of whether this behavior should be internal to AsyncCollection<T>. Since they are concurrent calls, the out-of-order responses are consistent with BlockingCollection<T>, but the behavior is noticeably different than other "async queues" like AsyncProducerConsumerCollection<T>, but really that's just due to the implementation detail that APCC happens to use its waiting queue as its negative-data storage (with new items never entering its data structure), while AC has to run its new items through its data structure. #random-stream-of-consciousness
So I'm leaning towards keeping AC semantics as-is, but also adding an AsyncInvocationQueue so that ordering can be preserved if desired. Still haven't quite decided, though; still ruminating.
Thanks for looking at this Stephen,
This example was based on similar functionality we have in corresponding python/java libraries.
I followed your suggestion and switched to an AsyncProducerConsumerQueue<T> which is working fine.