BedrockFramework
BedrockFramework copied to clipboard
How to receive data correctly and how to close normally
I have a few questions when using BedrockFramework:
-
After the connection is established, is there a better way to read data except through a while true loop?
-
When I call
_server.StopAsync()
method, an exception will be thrown .
I guess the reason is that the connection is closed, but the reader.ReadAsync()
method is still trying to read. How can I normally close the server. this is error detail:
dbug: Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets[7]
Connection id "0HMI96D1VNMLB" sending FIN because: "The connection was aborted by the application via ConnectionCo
ntext.Abort()."
fail: Athena.RPC.Transport.Bedrock.BedrockConnectionHandler[0]
地址:[192.168.2.58:15023]已断开链接!
fail: Athena.RPC.Transport.Bedrock.BedrockConnectionHandler[0]
The connection was aborted by the application via ConnectionContext.Abort().
Microsoft.AspNetCore.Connections.ConnectionAbortedException: The connection was aborted by the application via Con
nectionContext.Abort().
at System.IO.Pipelines.Pipe.GetReadResult(ReadResult& result)
at System.IO.Pipelines.Pipe.GetReadAsyncResult()
at System.IO.Pipelines.Pipe.DefaultPipeReader.GetResult(Int16 token)
at Bedrock.Framework.Protocols.ProtocolReader.ContinueDoAsyncRead[TReadMessage](ValueTask`1 readTask, Nullable`
1 maximumMessageSize, IMessageReader`1 reader, CancellationToken cancellationToken)
at Athena.RPC.Transport.Bedrock.BedrockConnectionHandler.OnConnectedAsync(ConnectionContext connection) in D:\i
afc_gitlab\mythology\code_net6\Athena.RPC.Transport.Bedrock\BedrockConnectionHandler.cs:line 84
this is my source code:
using Bedrock.Framework.Protocols;
using Microsoft.AspNetCore.Connections;
using Microsoft.Extensions.Logging;
using System;
using System.Buffers;
using System.Collections.Concurrent;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
namespace Athena.RPC.Transport.Bedrock
{
using Codec;
using Protocols;
using Runtime.Server;
using Tracing;
public class BedrockConnectionHandler : ConnectionHandler
{
private const int _acceptExceptionCount = 30;
private const int _exceptionMonitorPeriod = 60000;
private readonly IRpcMessageCodecFactory _rpcMessageCodecFactory;
private readonly IRpcMessageEncoder _rpcMessageEncoder;
private readonly IRpcMessageDecoder _rpcMessageDecoder;
private readonly ITransportListener _transportListener;
private readonly IServiceHealthCheck _serviceHealthCheck;
private readonly IEventTracer _eventTracer;
private readonly Timer _timerExceptionMonitor;
private readonly ILogger _logger;
private readonly ConcurrentDictionary<string, int> _exceptionCount;
public BedrockConnectionHandler(IRpcMessageCodecFactory rpcMessageCodecFactory, ITransportListener transportListener, IServiceHealthCheck serviceHealthCheck, IEventTracer eventTracer, ILogger<BedrockConnectionHandler> logger)
{
_rpcMessageCodecFactory = rpcMessageCodecFactory;
_rpcMessageDecoder = _rpcMessageCodecFactory.CreateDecoder();
_rpcMessageEncoder = _rpcMessageCodecFactory.CreateEncoder();
_transportListener = transportListener;
_serviceHealthCheck = serviceHealthCheck;
_eventTracer = eventTracer;
_logger = logger;
_exceptionCount = new ConcurrentDictionary<string, int>();
_timerExceptionMonitor = new Timer(ExceptionCountClear, null, _exceptionMonitorPeriod, _exceptionMonitorPeriod);
}
private void ExceptionCountClear(object obj)
{
_exceptionCount.Clear();
_logger.LogDebug("完成错误计数器数值清理");
}
public override async Task OnConnectedAsync(ConnectionContext connection)
{
var ipInfo = connection.RemoteEndPoint.ToString().Split(':');
if (_serviceHealthCheck.IsHealthCheck(ipInfo[0], int.Parse(ipInfo[1])))
{
return;
}
var protocol = new LengthPrefixedProtocol();
var reader = connection.CreateReader();
var writer = connection.CreateWriter();
if (_logger.IsEnabled(LogLevel.Information))
{
_logger.LogInformation("地址:[{RemoteEndPoint}]已建立链接!", connection.RemoteEndPoint);
}
var token = connection.ConnectionClosed.Register(() =>
{
DisposeConnection(connection, reader, writer).ConfigureAwait(false);
});
while (true)
{
if (token.Token.IsCancellationRequested)
{
break;
}
try
{
if (reader != null)
{
ProtocolReadResult<ReadOnlySequence<byte>> protocolReadResult;
try
{
protocolReadResult = await reader.ReadAsync(protocol, token.Token);
}
catch (InvalidOperationException connectionResetException)
{
_logger.LogError($"心跳地址未过滤,读取内容失败:{connectionResetException.Message} ");
break;
}
if (protocolReadResult.Message.Length > 0)
{
var rpcTransportMessage = _rpcMessageDecoder.MessageDecoding(protocolReadResult.Message);
var bedrockMessageSender = new BedrockMessageServiceSender(writer, _rpcMessageEncoder, _eventTracer, _logger);
if (_logger.IsEnabled(LogLevel.Information))
{
_logger.LogInformation("序列化数据成功,触发transport监听; 执行:{CorrelationId}", rpcTransportMessage.InvokeMessage.CorrelationId);
}
_eventTracer.CallReceiveEnd(this, new AthenaTraceEventArgs() { TransportMessage = rpcTransportMessage });
_transportListener.OnReceived(bedrockMessageSender, rpcTransportMessage);
reader?.Advance();
}
else
{
_logger.LogError("心跳地址未过滤,接收到空包将断开连接!");
break;
}
}
}
catch (ConnectionResetException connectionResetException)
{
if (connectionResetException.InnerException is SocketException socketException)
{
if (socketException.SocketErrorCode == SocketError.ConnectionReset)
{
break;
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, ex.Message);
var key = $"{ex.GetType().Name}_{ex.Message}";
_exceptionCount.TryGetValue(key, out var errCount);
errCount++;
if (errCount > _acceptExceptionCount)
{
_logger.LogError($"链接:{connection.ConnectionId} 单位时间内出现异常超过阈值,将主动进行断开操作");
break;
}
else
{
_exceptionCount.TryUpdate(key, errCount, errCount);
}
}
}
}
public async Task DisposeConnection(ConnectionContext connection, ProtocolReader reader, ProtocolWriter writer)
{
if (reader != null && writer != null)
{
_logger.LogError("地址:[{RemoteEndPoint}]已断开链接!", connection.RemoteEndPoint);
await reader.DisposeAsync();
await writer.DisposeAsync();
await connection.DisposeAsync();
}
}
}
}
Thank you for your answer!
- After the connection is established, is there a better way to read data except through a while true loop?
What's the matter with this approach?
no problem I'm just curious if there are any other ways to use it
- When I call
_server.StopAsync()
method, an exception will be thrown .
I just tried the sample server and it works fine. Can you elaborate on that?
BTW, you could check the token in the while
instead of breaking it.
BTW, you could check the token in the
while
instead of breaking it.
When should I break while
,if I didn't add if (token.Token.IsCancellationRequested) break;
these codes,I will get an exception because the connection is closed, but the reader.ReadAsync()
is still trying to read.
Pass the token to the reader.ReadAsync()
as well.
This logic:
var token = connection.ConnectionClosed.Register(() =>
{
DisposeConnection(connection, reader, writer).ConfigureAwait(false);
});
Is broken. Get rid of it. The system will dispose the connection and clean up when OnConnectedAsync unwinds.
When ProtocolReadResult<T> has IsCompleted set then you know there's no more data coming. You can consume the existing messages and break the loop.
Everything else looks pretty normal.