AsyncEnumerable icon indicating copy to clipboard operation
AsyncEnumerable copied to clipboard

yield.ReturnAsync inside ParallelForEachAsync

Open galmok opened this issue 5 years ago • 5 comments

We have trouble using ParallelForEachAsync to start a number of parallel running async webrequests (actually they are Azure Blob Storage retrievals) and have the result being returned using yield.ReturnAsync.

The problem is that before the first result is returned, multiple parallel webrequests have been started and also completed, yet only the last of the results are returned to the consumer (that iterates using ForEachAsync).

The producer:

    public static IAsyncEnumerable<(MemoryStream, string)> Stream(this IEnumerable<string> blobNames,
        IBlobManager blobManager,
        CloudBlobContainer container, int maxDegreeOfParallelism = 5)
    {
        return new AsyncEnumerable<(MemoryStream, string)>(async yield =>
        {
            var cancellationToken1 = yield.CancellationToken;

            await blobNames.ParallelForEachAsync(async blobName =>
            {
                Console.WriteLine($"Trying to download blob: {blobName}");

               //TODO: Try-catch, what happens if one fail?
                var memoryStream = await blobManager
                    .DownloadBlobAsStreamAsync(container, blobName, cancellationToken1)
                    .ConfigureAwait(false);

                // Return immediately instead of waiting for all the blobs to complete
                await yield.ReturnAsync((memoryStream, blobName)).ConfigureAwait(false);
            }, maxDegreeOfParallelism, cancellationToken1).ConfigureAwait(false);
        });
    }

The consumer:

        var blobNames = MyFactory.BuildBlobNames(from, to);

        var asyncEnumerable = blobNames.Stream(BlobManager, Container, 4);

        // Assert
        var concurrentList = new ConcurrentBag<string>();
        await asyncEnumerable.ForEachAsync(async tuple =>
        {
            using (var ms = tuple.Item1)
            {
                var decoded = Encoding.UTF8.GetString(ms.ToArray());
                //TODO: Convert to text to assert the content
                concurrentList.Add(decoded);
                Console.WriteLine($"Blob: {tuple.Item2}. Content: {decoded}");
            }
        }, cancellationToken).ConfigureAwait(false);

What did we do wrong?

galmok avatar Jun 26 '19 11:06 galmok

I've exactly the same issue...

felipecruz91 avatar Jun 26 '19 12:06 felipecruz91

Good try, but this is simply an unsupported scenario with concurrent producers. Imagine a synchronous version of the producer side:

IEnumerable<...> Stream(...)
{
    Parallel.ForEach(... =>
    {
        yield return ...; // won't compile
    }

    yield return ...; // will compile
}

The yield keyword cannot be used inside the lambda function passed to the Parallel.ForEach method, because it is out of the scope of the synchronous enumerator method.

Since this library mimics a C# language feature, it cannot impose similar restrictions on using the variable yield (it's not a keyword in the async version).

IAsyncEnumerable<...> Stream(...)
{
    return new AsyncEnumerable<...>(async yield => // 'yield' is a variable
    {
        await ....ParallelForEachAsync(async ...=>
        {
             // 'yield' gets captured in the closure, that's why you can use it, but must not
            await yield.ReturnAsync(...);
         }, ...);
    });
}

If you try to use C# 3.0 and async streams, it won't work either. So this is something you have to be aware of.

Concurrent producer/consumer pattern with limiters is a slightly more complex problem, and is not available as an extension method in this library. I posted a possible solution on StackOverflow.

One thing I can recommend is to swap methods in your case - the producer uses ForEachAsync and the consumer uses ParallelForEachAsync if it works for you.

kind-serge avatar Jun 26 '19 16:06 kind-serge

P.S. this is a duplicate of #44

kind-serge avatar Jun 26 '19 16:06 kind-serge

How about using SemaphoreSlim to ensure only 1 active call to yield.Returning is happening at anyone time?

I tried and it seems to work just fine. But I'll have a look at your links later.

ons. 26. jun. 2019 18.59 skrev Serge Semenov [email protected]:

P.S. this is a duplicate of #44 https://github.com/Dasync/AsyncEnumerable/issues/44

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/Dasync/AsyncEnumerable/issues/45?email_source=notifications&email_token=ACZKVRBUMV52VQDVS3IIP2TP4ON5HA5CNFSM4H3ROQJ2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGODYUFNKY#issuecomment-505960107, or mute the thread https://github.com/notifications/unsubscribe-auth/ACZKVRFKN7CISMFUVZPXMGDP4ON5HANCNFSM4H3ROQJQ .

galmok avatar Jun 26 '19 17:06 galmok

The use of SemaphoreSlim will definitely solve the problem, however, that will impact the performance of enumerations in regular cases as every iteration has to consult a synchronization primitive. How would solve that problem?

kind-serge avatar Jul 19 '19 16:07 kind-serge