serilog-sinks-graylog icon indicating copy to clipboard operation
serilog-sinks-graylog copied to clipboard

TCP transport not ThreadSafe

Open inf9144 opened this issue 1 year ago • 4 comments

Hey, the implementation of TCP Transport is not thread safe and you can get in situations where the stream is null while tcp client is connected. Also reconnecting does not follow a proper path. I switched to a custom ITransport implementation which solves thoses problems for me:

public class CustomizedTcpTransport : ITransport, IDisposable {
   private const int _defaultPort = 12201;

   private readonly GraylogSinkOptionsBase _options;
   private readonly SemaphoreSlim _lock;

   private TcpClient? _client;
   private Stream? _stream;

   public delegate CustomizedTcpTransport Factory(GraylogSinkOptionsBase options);
   public CustomizedTcpTransport(GraylogSinkOptionsBase options) {
      _options = options;
      _lock = new(1, 1);
   }

   public Task Send(string message) {
      var payload = new byte[message.Length + 1];
      System.Text.Encoding.UTF8.GetBytes(message.AsSpan(), payload.AsSpan());
      payload[^1] = 0x00;

      return Send(payload);
   }

   private async Task Send(byte[] payload) {
      await _lock.WaitAsync().ConfigureAwait(continueOnCapturedContext: false);
      try {
         var stream = await EnsureConnection().ConfigureAwait(continueOnCapturedContext: false);
         await stream.WriteAsync(payload).ConfigureAwait(continueOnCapturedContext: false);
         await stream.FlushAsync().ConfigureAwait(continueOnCapturedContext: false);
      } finally {
         _lock.Release();
      }
   }

   private async Task<Stream> EnsureConnection() {
      if (_client is not null && _client.Connected && _stream is not null)
         return _stream;

      _stream?.Dispose();
      _stream = null;
      _client?.Dispose();
      _client = new TcpClient();

      var iPAddress = await GetIpAddress(_options.HostnameOrAddress ?? throw new InvalidOperationException("Could not find hostname to connect to!"));
      if (iPAddress == null)
         throw new InvalidOperationException($"IP address of host '{_options.HostnameOrAddress}' could not be resolved.");

      var port = _options.Port.GetValueOrDefault(12201);
      var sslHost = _options.UseSsl ? _options.HostnameOrAddress : null;
      await _client.ConnectAsync(iPAddress, port).ConfigureAwait(continueOnCapturedContext: false);
      _stream = _client.GetStream();
      if (!string.IsNullOrWhiteSpace(sslHost)) {
         var sslStream = new SslStream(_stream, false);
         await sslStream.AuthenticateAsClientAsync(sslHost).ConfigureAwait(continueOnCapturedContext: false);
         if (sslStream.RemoteCertificate != null) {
            SelfLog.WriteLine("Remote cert was issued to {0} and is valid from {1} until {2}.", sslStream.RemoteCertificate.Subject, sslStream.RemoteCertificate.GetEffectiveDateString(), sslStream.RemoteCertificate.GetExpirationDateString());
            _stream = (Stream?) (object) sslStream;
         } else {
            SelfLog.WriteLine("Remote certificate is null.");
         }

         if (_stream is null)
            throw new InvalidOperationException($"SSL connection with host '{_options.HostnameOrAddress}' could not be established.");
      }

      return _stream;
   }

   public async Task<IPAddress?> GetIpAddress(string hostNameOrAddress) {
      var addresses = await Dns.GetHostAddressesAsync(hostNameOrAddress).ConfigureAwait(false);
      return addresses.FirstOrDefault(c => c.AddressFamily == AddressFamily.InterNetwork);
   }

   public void Dispose() {
      GC.SuppressFinalize(this);
      _stream?.Dispose();
      _client?.Dispose();
   }
}

Are you open for PRs? I could try to integrate this into you original classes.

inf9144 avatar Feb 23 '24 09:02 inf9144