AsyncEnumerable
AsyncEnumerable copied to clipboard
yield.ReturnAsync inside ParallelForEachAsync
We have trouble using ParallelForEachAsync to start a number of parallel running async webrequests (actually they are Azure Blob Storage retrievals) and have the result being returned using yield.ReturnAsync.
The problem is that before the first result is returned, multiple parallel webrequests have been started and also completed, yet only the last of the results are returned to the consumer (that iterates using ForEachAsync).
The producer:
public static IAsyncEnumerable<(MemoryStream, string)> Stream(this IEnumerable<string> blobNames,
IBlobManager blobManager,
CloudBlobContainer container, int maxDegreeOfParallelism = 5)
{
return new AsyncEnumerable<(MemoryStream, string)>(async yield =>
{
var cancellationToken1 = yield.CancellationToken;
await blobNames.ParallelForEachAsync(async blobName =>
{
Console.WriteLine($"Trying to download blob: {blobName}");
//TODO: Try-catch, what happens if one fail?
var memoryStream = await blobManager
.DownloadBlobAsStreamAsync(container, blobName, cancellationToken1)
.ConfigureAwait(false);
// Return immediately instead of waiting for all the blobs to complete
await yield.ReturnAsync((memoryStream, blobName)).ConfigureAwait(false);
}, maxDegreeOfParallelism, cancellationToken1).ConfigureAwait(false);
});
}
The consumer:
var blobNames = MyFactory.BuildBlobNames(from, to);
var asyncEnumerable = blobNames.Stream(BlobManager, Container, 4);
// Assert
var concurrentList = new ConcurrentBag<string>();
await asyncEnumerable.ForEachAsync(async tuple =>
{
using (var ms = tuple.Item1)
{
var decoded = Encoding.UTF8.GetString(ms.ToArray());
//TODO: Convert to text to assert the content
concurrentList.Add(decoded);
Console.WriteLine($"Blob: {tuple.Item2}. Content: {decoded}");
}
}, cancellationToken).ConfigureAwait(false);
What did we do wrong?
I've exactly the same issue...
Good try, but this is simply an unsupported scenario with concurrent producers. Imagine a synchronous version of the producer side:
IEnumerable<...> Stream(...)
{
Parallel.ForEach(... =>
{
yield return ...; // won't compile
}
yield return ...; // will compile
}
The yield
keyword cannot be used inside the lambda function passed to the Parallel.ForEach
method, because it is out of the scope of the synchronous enumerator method.
Since this library mimics a C# language feature, it cannot impose similar restrictions on using the variable yield
(it's not a keyword in the async version).
IAsyncEnumerable<...> Stream(...)
{
return new AsyncEnumerable<...>(async yield => // 'yield' is a variable
{
await ....ParallelForEachAsync(async ...=>
{
// 'yield' gets captured in the closure, that's why you can use it, but must not
await yield.ReturnAsync(...);
}, ...);
});
}
If you try to use C# 3.0 and async streams, it won't work either. So this is something you have to be aware of.
Concurrent producer/consumer pattern with limiters is a slightly more complex problem, and is not available as an extension method in this library. I posted a possible solution on StackOverflow.
One thing I can recommend is to swap methods in your case - the producer uses ForEachAsync
and the consumer uses ParallelForEachAsync
if it works for you.
P.S. this is a duplicate of #44
How about using SemaphoreSlim to ensure only 1 active call to yield.Returning is happening at anyone time?
I tried and it seems to work just fine. But I'll have a look at your links later.
ons. 26. jun. 2019 18.59 skrev Serge Semenov [email protected]:
P.S. this is a duplicate of #44 https://github.com/Dasync/AsyncEnumerable/issues/44
— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/Dasync/AsyncEnumerable/issues/45?email_source=notifications&email_token=ACZKVRBUMV52VQDVS3IIP2TP4ON5HA5CNFSM4H3ROQJ2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGODYUFNKY#issuecomment-505960107, or mute the thread https://github.com/notifications/unsubscribe-auth/ACZKVRFKN7CISMFUVZPXMGDP4ON5HANCNFSM4H3ROQJQ .
The use of SemaphoreSlim
will definitely solve the problem, however, that will impact the performance of enumerations in regular cases as every iteration has to consult a synchronization primitive. How would solve that problem?