BedrockFramework icon indicating copy to clipboard operation
BedrockFramework copied to clipboard

How to receive data correctly and how to close normally

Open dannyshenl opened this issue 2 years ago • 7 comments

I have a few questions when using BedrockFramework:

  1. After the connection is established, is there a better way to read data except through a while true loop?

  2. When I call _server.StopAsync() method, an exception will be thrown .

I guess the reason is that the connection is closed, but the reader.ReadAsync() method is still trying to read. How can I normally close the server. this is error detail:

    dbug: Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets[7]
        Connection id "0HMI96D1VNMLB" sending FIN because: "The connection was aborted by the application via ConnectionCo
    ntext.Abort()."
    fail: Athena.RPC.Transport.Bedrock.BedrockConnectionHandler[0]
        地址:[192.168.2.58:15023]已断开链接!
    fail: Athena.RPC.Transport.Bedrock.BedrockConnectionHandler[0]
        The connection was aborted by the application via ConnectionContext.Abort().
        Microsoft.AspNetCore.Connections.ConnectionAbortedException: The connection was aborted by the application via Con
    nectionContext.Abort().
            at System.IO.Pipelines.Pipe.GetReadResult(ReadResult& result)
            at System.IO.Pipelines.Pipe.GetReadAsyncResult()
            at System.IO.Pipelines.Pipe.DefaultPipeReader.GetResult(Int16 token)
            at Bedrock.Framework.Protocols.ProtocolReader.ContinueDoAsyncRead[TReadMessage](ValueTask`1 readTask, Nullable`
    1 maximumMessageSize, IMessageReader`1 reader, CancellationToken cancellationToken)
            at Athena.RPC.Transport.Bedrock.BedrockConnectionHandler.OnConnectedAsync(ConnectionContext connection) in D:\i
    afc_gitlab\mythology\code_net6\Athena.RPC.Transport.Bedrock\BedrockConnectionHandler.cs:line 84 

this is my source code:

using Bedrock.Framework.Protocols;
using Microsoft.AspNetCore.Connections;
using Microsoft.Extensions.Logging;
using System;
using System.Buffers;
using System.Collections.Concurrent;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
namespace Athena.RPC.Transport.Bedrock
{
    using Codec;
    using Protocols;
    using Runtime.Server;
    using Tracing;
    public class BedrockConnectionHandler : ConnectionHandler
    {
        private const int _acceptExceptionCount = 30;
        private const int _exceptionMonitorPeriod = 60000;

        private readonly IRpcMessageCodecFactory _rpcMessageCodecFactory;
        private readonly IRpcMessageEncoder _rpcMessageEncoder;
        private readonly IRpcMessageDecoder _rpcMessageDecoder;
        private readonly ITransportListener _transportListener;
        private readonly IServiceHealthCheck _serviceHealthCheck;
        private readonly IEventTracer _eventTracer;
        private readonly Timer _timerExceptionMonitor;
        private readonly ILogger _logger;

        private readonly ConcurrentDictionary<string, int> _exceptionCount;

        public BedrockConnectionHandler(IRpcMessageCodecFactory rpcMessageCodecFactory, ITransportListener transportListener, IServiceHealthCheck serviceHealthCheck, IEventTracer eventTracer, ILogger<BedrockConnectionHandler> logger)
        {
            _rpcMessageCodecFactory = rpcMessageCodecFactory;
            _rpcMessageDecoder = _rpcMessageCodecFactory.CreateDecoder();
            _rpcMessageEncoder = _rpcMessageCodecFactory.CreateEncoder();
            _transportListener = transportListener;
            _serviceHealthCheck = serviceHealthCheck;
            _eventTracer = eventTracer;
            _logger = logger;
            _exceptionCount = new ConcurrentDictionary<string, int>();
            _timerExceptionMonitor = new Timer(ExceptionCountClear, null, _exceptionMonitorPeriod, _exceptionMonitorPeriod);
        }

        private void ExceptionCountClear(object obj)
        {
            _exceptionCount.Clear();
            _logger.LogDebug("完成错误计数器数值清理");
        }

        public override async Task OnConnectedAsync(ConnectionContext connection)
        {
            var ipInfo = connection.RemoteEndPoint.ToString().Split(':');
            if (_serviceHealthCheck.IsHealthCheck(ipInfo[0], int.Parse(ipInfo[1])))
            {
                return;
            }
            var protocol = new LengthPrefixedProtocol();
            var reader = connection.CreateReader();
            var writer = connection.CreateWriter();
            if (_logger.IsEnabled(LogLevel.Information))
            {
                _logger.LogInformation("地址:[{RemoteEndPoint}]已建立链接!", connection.RemoteEndPoint);
            }
            var token = connection.ConnectionClosed.Register(() =>
            {
                DisposeConnection(connection, reader, writer).ConfigureAwait(false);
            });

            while (true)
            {
                if (token.Token.IsCancellationRequested)
                {
                    break;
                }
                try
                {
                    if (reader != null)
                    {
                        ProtocolReadResult<ReadOnlySequence<byte>> protocolReadResult;
                        try
                        {
                            protocolReadResult = await reader.ReadAsync(protocol, token.Token);
                        }
                        catch (InvalidOperationException connectionResetException)
                        {
                            _logger.LogError($"心跳地址未过滤,读取内容失败:{connectionResetException.Message} ");
                            break;
                        }
                        if (protocolReadResult.Message.Length > 0)
                        {
                            var rpcTransportMessage = _rpcMessageDecoder.MessageDecoding(protocolReadResult.Message);

                            var bedrockMessageSender = new BedrockMessageServiceSender(writer, _rpcMessageEncoder, _eventTracer, _logger);
                            if (_logger.IsEnabled(LogLevel.Information))
                            {
                                _logger.LogInformation("序列化数据成功,触发transport监听; 执行:{CorrelationId}", rpcTransportMessage.InvokeMessage.CorrelationId);
                            }
                            _eventTracer.CallReceiveEnd(this, new AthenaTraceEventArgs() { TransportMessage = rpcTransportMessage });
                            _transportListener.OnReceived(bedrockMessageSender, rpcTransportMessage);
                            reader?.Advance();
                        }
                        else
                        {
                            _logger.LogError("心跳地址未过滤,接收到空包将断开连接!");
                            break;
                        }
                    }
                }
                catch (ConnectionResetException connectionResetException)
                {
                    if (connectionResetException.InnerException is SocketException socketException)
                    {
                        if (socketException.SocketErrorCode == SocketError.ConnectionReset)
                        {
                            break;
                        }
                    }
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, ex.Message);
                    var key = $"{ex.GetType().Name}_{ex.Message}";
                    _exceptionCount.TryGetValue(key, out var errCount);
                    errCount++;
                    if (errCount > _acceptExceptionCount)
                    {
                        _logger.LogError($"链接:{connection.ConnectionId} 单位时间内出现异常超过阈值,将主动进行断开操作");
                        break;
                    }
                    else
                    {
                        _exceptionCount.TryUpdate(key, errCount, errCount);
                    }
                }
            }
        }

        public async Task DisposeConnection(ConnectionContext connection, ProtocolReader reader, ProtocolWriter writer)
        {
            if (reader != null && writer != null)
            {
                _logger.LogError("地址:[{RemoteEndPoint}]已断开链接!", connection.RemoteEndPoint);
                await reader.DisposeAsync();
                await writer.DisposeAsync();
                await connection.DisposeAsync();
            }
        }
    }
}

Thank you for your answer!

dannyshenl avatar Jun 08 '22 09:06 dannyshenl

  1. After the connection is established, is there a better way to read data except through a while true loop?

What's the matter with this approach?

adamradocz avatar Jun 08 '22 10:06 adamradocz

no problem I'm just curious if there are any other ways to use it

dannyshenl avatar Jun 08 '22 14:06 dannyshenl

  1. When I call _server.StopAsync() method, an exception will be thrown .

I just tried the sample server and it works fine. Can you elaborate on that?

adamradocz avatar Jun 08 '22 18:06 adamradocz

BTW, you could check the token in the while instead of breaking it.

adamradocz avatar Jun 08 '22 18:06 adamradocz

BTW, you could check the token in the while instead of breaking it.

When should I break while ,if I didn't add if (token.Token.IsCancellationRequested) break; these codes,I will get an exception because the connection is closed, but the reader.ReadAsync() is still trying to read.

dannyshenl avatar Jun 09 '22 02:06 dannyshenl

Pass the token to the reader.ReadAsync() as well.

adamradocz avatar Jun 09 '22 06:06 adamradocz

This logic:

var token = connection.ConnectionClosed.Register(() =>
            {
                DisposeConnection(connection, reader, writer).ConfigureAwait(false);
            });

Is broken. Get rid of it. The system will dispose the connection and clean up when OnConnectedAsync unwinds.

When ProtocolReadResult<T> has IsCompleted set then you know there's no more data coming. You can consume the existing messages and break the loop.

Everything else looks pretty normal.

davidfowl avatar Jun 15 '22 05:06 davidfowl