Polly
Polly copied to clipboard
WaitAndRetryForeverAsync with fire and forget tasks
What is the correct way to use WaitAndRetryForeverAsync
with fire and forget tasks?
Current behavior: running StartAsync
initially without Internet connection, it tries to reconnect on each x seconds, as expected.
Expected behavior: when execution reaches ReceiveLoop
, the policy no longer handles it because the method call is not awaited. I expect it to be handled. What is the correct way?
private Task _processingLoop = Task.CompletedTask;
private bool _stopping;
public Task StartAsync(CancellationToken cancellationToken = default)
{
var retryPolicy = Policy
.Handle<Exception>()
.WaitAndRetryForeverAsync(_ => TimeSpan.FromMilliseconds(RetryBackOffMs),
(exception, retryCount, calculatedWaitDuration) =>
{
Console.WriteLine(
$"Retrying in {calculatedWaitDuration.TotalSeconds} seconds (Reason: {exception.Message}) (Retry count: {retryCount})");
});
return retryPolicy.ExecuteAsync(async () =>
{
_stopping = false;
// Connect to the web socket
await _pipelineWebSocket.StartAsync(_url, cancellationToken).ConfigureAwait(false);
// Receive loop
_processingLoop = ReceiveLoop();
});
}
public async Task StopAsync()
{
_stopping = true;
await _pipelineWebSocket.StopAsync().ConfigureAwait(false);
await _processingLoop.ConfigureAwait(false);
}
private async Task ReceiveLoop()
{
while (true)
{
if (_stopping)
{
break;
}
var result = await _pipelineWebSocket.Input.ReadAsync().ConfigureAwait(false);
var buffer = result.Buffer;
try
{
if (result.IsCanceled)
{
break;
}
if (!buffer.IsEmpty)
{
while (MessageParser.TryParse(ref buffer, out var payload))
{
var message = Encoding.UTF8.GetString(payload);
_messageReceivedSubject.OnNext(message);
}
}
if (result.IsCompleted)
{
break;
}
}
finally
{
_pipelineWebSocket.Input.AdvanceTo(buffer.Start, buffer.End);
}
}
}
I would suggest creating a policy within the execution of the first policy, and execute the listen loop's code through that policy. Effectively, two different wait and retry forever policies.
@martincostello, thanks for the answer!
I would suggest creating a policy within the execution of the first policy, and execute the listen loop's code through that policy. Effectively, two different wait and retry forever policies.
If I do it that way, the problem is that it is going to be retrying to repeat the loop with closed web socket connection. I need to call StartAsync
again in order to start the web socket connection again.
public Task StartAsync(CancellationToken cancellationToken = default)
{
var retryPolicy = Policy
.Handle<Exception>()
.WaitAndRetryForeverAsync(_ => TimeSpan.FromMilliseconds(RetryBackOffMs),
(exception, retryCount, calculatedWaitDuration) =>
{
Console.WriteLine(
$"Retrying in {calculatedWaitDuration.TotalSeconds} seconds (Reason: {exception.Message}) (Retry count: {retryCount})");
});
return retryPolicy.ExecuteAsync(async () =>
{
_stopping = false;
// Connect to the web socket
await _pipelineWebSocket.StartAsync(_url, cancellationToken).ConfigureAwait(false);
// Receive loop
retryPolicy.ExecuteAsync(() => _processingLoop = ReceiveLoop());
});
}
Maybe change the exception filter (Handle<Exception>()
) to exclude an exception that indicates a closed connection?
This issue is stale because it has been open for 60 days with no activity. It will be automatically closed in 14 days if no further updates are made.
This issue was closed because it has been inactive for 14 days since being marked as stale.