AsyncEnumerable icon indicating copy to clipboard operation
AsyncEnumerable copied to clipboard

Race condition in ParallelForEachAsync when exceptions are thrown

Open danylofitel opened this issue 5 years ago • 3 comments

ParallelForEachAsync sometimes does not throw an exception if individual iterations did throw exceptions.

It reproduces with the following code (derived from the scenario where I noticed it):

while (true)
{
    Console.WriteLine();
    IReadOnlyList<int> input = Enumerable.Range(0, 10).ToList();
    ConcurrentQueue<int> output = new ConcurrentQueue<int>();

    try
    {
        await input.ParallelForEachAsync(
            async item =>
            {
                if (item == 0)
                {
                    throw new AggregateException(new Exception("Individual task failed."));
                }

                await Task.Delay(1);
                output.Enqueue(item);
            },
            maxDegreeOfParallelism: 10,
            cancellationToken: default);
    }
    catch (Exception)
    {
        continue;
    }

    Console.WriteLine($"No exception. {input.Count} - {output.count}.");
}

The cancellation token does not get canceled, and the default values of breakLoopOnException: false and gracefulBreak: true are used. The first task always throws an exception, therefore the expectation is that remaining tasks would finish, and ParallelForEachAsync would throw an exception. However, the code above will eventually reach the case where exception is not thrown, and only 9 items are added to the queue (Console.WriteLine() statement above).

In the ParallelForEachAsync implementation the main task that schedules individual iterations, as well as continuations of individual tasks all call OnOperationComplete() of ParallelForEachContext. OnOperationComplete() adds exception to the list of tracked exceptions if it was supplied, releases the semaphore and at the very end calls CompleteLoopNow() if all tasks have completed or cancellation was requested (in this specific case I was not cancelling any tasks, so it was only called when all tasks finish).

public void OnOperationComplete(Exception exceptionIfFailed = null)
{
    // Add exception to the list
    // Release the semaphore

    if ((_semaphore.CurrentCount == _maxDegreeOfParallelism + 1) || (IsLoopBreakRequested && !_gracefulBreak))
        CompleteLoopNow();
}

The problem occurs when the last few tasks release the semaphore at the same time, in which case _semaphore.CurrentCount == _maxDegreeOfParallelism + 1 condition can be evaluated as true for multiple tasks, so CompleteLoopNow() can be called more than once.

public void CompleteLoopNow()
{
    Console.WriteLine("CompleteLoopNow - Start");
    _cancellationTokenRegistration.Dispose();

    try
    {
        if (_semaphore != null)
            _semaphore.Dispose();
    }
    catch
    {
    }

    var exceptions = ReadExceptions();
    var aggregatedException = exceptions?.Count > 0 ? new ParallelForEachException(exceptions) : null;

    if (_cancellationToken.IsCancellationRequested)
    {
        Console.WriteLine("CompleteLoopNow - OperationCanceledException");
        _ = _completionTcs.TrySetException(
            new OperationCanceledException(
                new OperationCanceledException().Message,
                aggregatedException,
                _cancellationToken));
    }
    else if (exceptions?.Count > 0)
    {
        Console.WriteLine("CompleteLoopNow - TrySetException");
        _ = _completionTcs.TrySetException(aggregatedException);
    }
    else
    {
        Console.WriteLine("CompleteLoopNow - TrySetResult");
        _ = _completionTcs.TrySetResult(null);
    }
}

Which means that multiple tasks can also enter ReadExceptions() concurrently.

public List<Exception> ReadExceptions()
{
    Console.WriteLine("ReadExceptions - Start");
    bool lockTaken = false;
    while (!lockTaken)
        _exceptionListLock.Enter(ref lockTaken);
    try
    {
        Console.WriteLine("ReadExceptions - Returning");
        return _exceptionList;
    }
    finally
    {
        _exceptionList = null;
        _exceptionListLock.Exit(useMemoryBarrier: false);
        Console.WriteLine("ReadExceptions - End");
    }
}

However, in the finally block the exception list is set to null, so the first task calling it will get the full list of exceptions back, and subsequent tasks will get a null. Then in CompleteLoopNow() it is possible that a task with null exception list calls TrySetResult() before the a task with the correct exception list calls TrySetException().

I debugged with the same Console.WriteLine statements as above, and in cases where ParallelForEachAsync() did not throw I saw the following output

CompleteLoopNow - Start              // Task A entering CompleteLoopNow()
ReadExceptions - Start               // Task A entering ReadExceptions()
ReadExceptions - Returning           // Task A returning a full list of exceptions
ReadExceptions - End                 // Task A setting the list of exceptions to null
CompleteLoopNow - Start              // Task B entering CompleteLoopNow()
ReadExceptions - Start               // Task B entering ReadExceptions()
ReadExceptions - Returning           // Task B returning null as the list of exceptions
ReadExceptions - End                 // Task B setting the list of exceptions to null
CompleteLoopNow - TrySetResult       // Task B setting result on the task completion source since it got null from ReadExceptions()
CompleteLoopNow - TrySetException    // Task A setting exception on the task completion source since it got a non-empty list of exceptions from ReadExceptions()

I'm not sure whether ReadExceptions() needs to reset exception list to null. One possible reason is to prevent a race condition for the case where the loop was canceled, in which case continuations of tasks that are still running can keep adding exceptions to the list, but the same list is returned from ReadExceptions() to CompleteLoopNow(). However, in this case it's possible to return a copy of the exception list from ReadExceptions(), i.e.

public List<Exception> ReadExceptions()
{
    bool lockTaken = false;
    while (!lockTaken)
        _exceptionListLock.Enter(ref lockTaken);
    try
    {
        // Return a copy, so the list being returned will not be modified
        // by tasks that are still running if the loop was canceled
        return new List<Exception>(_exceptionList ?? Enumerable.Empty<Exception>());
    }
    finally
    {
        _exceptionListLock.Exit(useMemoryBarrier: false);
    }
}

Another option is to prevent tasks from re-entering CompleteLoopNow().

danylofitel avatar Apr 20 '20 23:04 danylofitel

Here's a PR with the proposed change to ReadExceptions() https://github.com/Dasync/AsyncEnumerable/pull/58

danylofitel avatar Apr 21 '20 00:04 danylofitel

@danylofitel hi! Do you know why this could be unaddressed issue? No one answered on your PR, we have faced similar issue in production we couldn't understand why exception was not thrown, but in our case this occurred with 1 task that was throwing an exception. Is there any alternative package from Microsoft?

marvel16 avatar Sep 14 '21 06:09 marvel16

@marvel16 it seems like the owner hasn't checked the repo for a while, previously he was super responsive and merged my PR within a day.

While waiting for a fix I'm using a wrapper in which I'm adding a try/catch around the async action and if an exception is thrown. Equivalent to:

        public static async Task ParallelForEachAsync<T>( 
            this IAsyncEnumerable<T> collection,
            Func<T, long, Task> asyncItemAction,
            int maxDegreeOfParallelism,
            bool breakLoopOnException,
            bool gracefulBreak,
            CancellationToken cancellationToken = default)
        {
            ConcurrentQueue<Exception> exceptions = new ConcurrentQueue<Exception>();

            await Dasync.Collections.ParallelForEachExtensions.ParallelForEachAsync(
                collection,
                async (item, index) =>
                {
                    try
                    {
                        await asyncItemAction(item, index).ConfigureAwait(false);
                    }
                    catch (Exception exception)
                    {
                        exceptions.Enqueue(exception);
                        throw;
                    }
                },
                maxDegreeOfParallelism,
                breakLoopOnException,
                gracefulBreak,
                cancellationToken).ConfigureAwait(false);

            if (!exceptions.IsEmpty)
            {
                if (exceptions.All(exception => exception is OperationCanceledException) && cancellationToken.IsCancellationRequested)
                {
                    throw exceptions.First();
                }
                else
                {
                    throw new AggregateException("ParallelForEachAsync exception", exceptions);
                }
            }
        }

The idea is that once this issue is resolved you won't need any code changes (except perhaps cleaning up namespace usings - potentially remove the namespace in which you keep this wrapper and add the Dasync namespace).

Another option is to use the new Parallel.ForEachAsync. Note that it is available only in .NET 6 which is in preview until November, so if you're able to switch to it - you don't really need this package.

danylofitel avatar Sep 16 '21 07:09 danylofitel