grpc-dotnet icon indicating copy to clipboard operation
grpc-dotnet copied to clipboard

How to properly handle the closing of a long-running bidirectional stream

Open SCLDGit opened this issue 1 year ago • 1 comments

I have a long-running bidirectional stream to monitor connectivity between a client and server in both directions. This stream is intended to live for the lifetime of the application. When closing either the server or client, it seems that the only way to detect this closure on either end is to wrap the entire client call/service implementation in a try catch block to catch the RpcExceptions/IOExceptions/TaskCanceledExceptions thrown by various failures to write/read from the stream. Is there a better way to handle this? The server is intended to service very large numbers of connected clients, so we're worried about the additional overhead of the try/catches.

Currently, client method looks something like:

public async Task MonitorConnectionAsync(CancellationToken p_cancellationToken)
{
    var connectivityClient = new Connectivity.ConnectivityClient(m_serverInfoService.Channel);
    
    var connectionMonitorStream = connectivityClient.MonitorClientConnection(cancellationToken: p_cancellationToken);

    var readStreamTask = Task.Run(async () =>
                                  {
                                      await foreach
                                          ( var request in
                                           connectionMonitorStream.ResponseStream.ReadAllAsync(p_cancellationToken) )
                                      {
                                          switch ( request.MessageCase )
                                          {
                                              case G_ConnectionCheck.MessageOneofCase.Request:
                                                  m_logger.LogDebug("Received ping request from server, responding");
                                                  await connectionMonitorStream.RequestStream.WriteAsync(new ()
                                                                                                         {
                                                                                                             Response = new G_ConnectionCheckResponse()
                                                                                                         });
                                                  break;
                                              case G_ConnectionCheck.MessageOneofCase.Response:
                                                  m_logger.LogInformation("Server last available at {LastPing}", DateTimeOffset.Now);
                                                  break;
                                              case G_ConnectionCheck.MessageOneofCase.None:
                                                  break;
                                              default:
                                                  throw new ArgumentOutOfRangeException();
                                          }
                                      }
                                  }, p_cancellationToken);

    while ( !p_cancellationToken.IsCancellationRequested )
    {
        await connectionMonitorStream.RequestStream.WriteAsync(new()
                                                               {
                                                                   Request = new G_ConnectionCheckRequest()
                                                               });
        
        await Task.Delay(TimeSpan.FromSeconds(5), p_cancellationToken);
    }

    await connectionMonitorStream.RequestStream.CompleteAsync();
    await readStreamTask;
}

Server side:

public override async Task MonitorClientConnection(IAsyncStreamReader<G_ConnectionCheck>   p_requestStream,
                                             IServerStreamWriter<G_ConnectionCheck> p_responseStream,
                                             ServerCallContext                              p_context)
{
    var readStreamTask = Task.Run(async () =>
                                  {
                                      await foreach
                                          ( var request in
                                            p_requestStream.ReadAllAsync(p_context.CancellationToken) )
                                      {
                                          switch ( request.MessageCase )
                                          {
                                              case G_ConnectionCheck.MessageOneofCase.Request:
                                                  m_logger
                                                     .LogDebug("Received ping request from client, responding");
                                                  await p_responseStream.WriteAsync(new()
                                                                                    {
                                                                                        Response =
                                                                                            new
                                                                                                G_ConnectionCheckResponse()
                                                                                    });
                                                  break;
                                              case G_ConnectionCheck.MessageOneofCase.Response:
                                                  m_logger.LogDebug("Received ping response from client");
                                                  m_lastPing = DateTimeOffset.Now;
                                                  break;
                                              case G_ConnectionCheck.MessageOneofCase.None:
                                                  break;
                                              default:
                                                  throw new ArgumentOutOfRangeException(nameof(request.MessageCase),
                                                                                        request.MessageCase,
                                                                                        "Unknown message type received");
                                          }
                                      }
                                  }, cancellationToken: p_context.CancellationToken);

    while ( !p_context.CancellationToken.IsCancellationRequested )
    {
        await p_responseStream.WriteAsync(new()
                                          {
                                              Request = new G_ConnectionCheckRequest()
                                          });
        
        await Task.Delay(TimeSpan.FromSeconds(5), p_context.CancellationToken);
    }

    await readStreamTask;

    m_logger.LogInformation("Client lost connection, last available at {LastPing}", m_lastPing);
}

Both throw a TaskCanceledException after the client writes its completion to the stream.

How can we gracefully handle application closes on either side of the stream?

SCLDGit avatar Sep 30 '23 21:09 SCLDGit