Polly icon indicating copy to clipboard operation
Polly copied to clipboard

WaitAndRetryForeverAsync with fire and forget tasks

Open Hulkstance opened this issue 2 years ago • 3 comments

What is the correct way to use WaitAndRetryForeverAsync with fire and forget tasks?

Current behavior: running StartAsync initially without Internet connection, it tries to reconnect on each x seconds, as expected.

Expected behavior: when execution reaches ReceiveLoop, the policy no longer handles it because the method call is not awaited. I expect it to be handled. What is the correct way?

private Task _processingLoop = Task.CompletedTask;
private bool _stopping;

public Task StartAsync(CancellationToken cancellationToken = default)
{
    var retryPolicy = Policy
        .Handle<Exception>()
        .WaitAndRetryForeverAsync(_ => TimeSpan.FromMilliseconds(RetryBackOffMs),
            (exception, retryCount, calculatedWaitDuration) =>
            {
                Console.WriteLine(
                    $"Retrying in {calculatedWaitDuration.TotalSeconds} seconds (Reason: {exception.Message}) (Retry count: {retryCount})");
            });

    return retryPolicy.ExecuteAsync(async () =>
    {
        _stopping = false;

        // Connect to the web socket
        await _pipelineWebSocket.StartAsync(_url, cancellationToken).ConfigureAwait(false);

        // Receive loop
        _processingLoop = ReceiveLoop();
    });
}

public async Task StopAsync()
{
    _stopping = true;
    await _pipelineWebSocket.StopAsync().ConfigureAwait(false);
    await _processingLoop.ConfigureAwait(false);
}

private async Task ReceiveLoop()
{
    while (true)
    {
        if (_stopping)
        {
            break;
        }

        var result = await _pipelineWebSocket.Input.ReadAsync().ConfigureAwait(false);
        var buffer = result.Buffer;

        try
        {
            if (result.IsCanceled)
            {
                break;
            }

            if (!buffer.IsEmpty)
            {
                while (MessageParser.TryParse(ref buffer, out var payload))
                {
                    var message = Encoding.UTF8.GetString(payload);

                    _messageReceivedSubject.OnNext(message);
                }
            }

            if (result.IsCompleted)
            {
                break;
            }
        }
        finally
        {
            _pipelineWebSocket.Input.AdvanceTo(buffer.Start, buffer.End);
        }
    }
}

Hulkstance avatar Mar 18 '22 08:03 Hulkstance

I would suggest creating a policy within the execution of the first policy, and execute the listen loop's code through that policy. Effectively, two different wait and retry forever policies.

martincostello avatar Mar 18 '22 08:03 martincostello

@martincostello, thanks for the answer!

I would suggest creating a policy within the execution of the first policy, and execute the listen loop's code through that policy. Effectively, two different wait and retry forever policies.

If I do it that way, the problem is that it is going to be retrying to repeat the loop with closed web socket connection. I need to call StartAsync again in order to start the web socket connection again.

public Task StartAsync(CancellationToken cancellationToken = default)
{
    var retryPolicy = Policy
        .Handle<Exception>()
        .WaitAndRetryForeverAsync(_ => TimeSpan.FromMilliseconds(RetryBackOffMs),
            (exception, retryCount, calculatedWaitDuration) =>
            {
                Console.WriteLine(
                    $"Retrying in {calculatedWaitDuration.TotalSeconds} seconds (Reason: {exception.Message}) (Retry count: {retryCount})");
            });

    return retryPolicy.ExecuteAsync(async () =>
    {
        _stopping = false;

        // Connect to the web socket
        await _pipelineWebSocket.StartAsync(_url, cancellationToken).ConfigureAwait(false);

        // Receive loop
        retryPolicy.ExecuteAsync(() => _processingLoop = ReceiveLoop());
    });
}

Hulkstance avatar Mar 18 '22 09:03 Hulkstance

Maybe change the exception filter (Handle<Exception>()) to exclude an exception that indicates a closed connection?

martincostello avatar Mar 18 '22 09:03 martincostello

This issue is stale because it has been open for 60 days with no activity. It will be automatically closed in 14 days if no further updates are made.

github-actions[bot] avatar Jul 03 '23 02:07 github-actions[bot]

This issue was closed because it has been inactive for 14 days since being marked as stale.

github-actions[bot] avatar Jul 17 '23 02:07 github-actions[bot]