DotNetty
DotNetty copied to clipboard
The number of threads has been increasing and will not be released
I tried to use Netty as DTU TCP Server, but in the process of using it found that the number of threads has been slowly increasing with the running time, but at this time the number of online devices is very small, only about 1-5, why, My configuration code is as follows
`public void Init(ServerOptions options,Func<IChannelHandler> handler) { _options = options; if (options.EventLoopCount > 0) { bossGroup = new MultithreadEventLoopGroup(options.EventLoopCount); } else { bossGroup = new MultithreadEventLoopGroup(); } workerGroup = new MultithreadEventLoopGroup(); bootstrap = new ServerBootstrap(); bootstrap.Group(bossGroup, workerGroup); bootstrap.Channel<TcpServerSocketChannel>();
bootstrap.Option(ChannelOption.SoBacklog, _options.SoBacklog);
bootstrap.ChildOption(ChannelOption.RcvbufAllocator,new AdaptiveRecvByteBufAllocator(64,1048,65536));
bootstrap.Option(ChannelOption.TcpNodelay, true);
bootstrap.ChildOption(ChannelOption.SoKeepalive, false);
bootstrap.ChildHandler(new ActionChannelInitializer<ISocketChannel>(channel =>
{
IChannelPipeline pipeline = channel.Pipeline;
//pipeline.AddLast(new IdleStateHandler(300, 300, 0));
if (options.ReadTimeout > 0)
{
pipeline.AddLast("ReadTimeout", new ReadTimeoutHandler(options.ReadTimeout));
}
pipeline.AddLast(handler());
}));
}`
Handler code :
` ///
readonly bool autoRelease;
public SocketByteHandler() : this(true)
{
}
protected SocketByteHandler(bool autoRelease)
{
this.autoRelease = autoRelease;
}
public bool AcceptInboundMessage(object msg) => msg is IByteBuffer;
public override void ChannelRead(IChannelHandlerContext ctx, object msg)
{
bool release = true;
try
{
if (this.AcceptInboundMessage(msg))
{
IByteBuffer imsg = (IByteBuffer)msg;
byte[] result = new byte[imsg.ReadableBytes];
imsg.ReadBytes(result);
this.ChannelRead0(ctx, result);
}
else
{
release = false;
ctx.FireChannelRead(msg);
}
}
finally
{
if (autoRelease && release)
{
ReferenceCountUtil.Release(msg);
}
}
}
protected virtual void ChannelRead0(IChannelHandlerContext ctx,byte[] msg)
{
}
public override void UserEventTriggered(IChannelHandlerContext context, object evt)
{
if(evt is IdleStateEvent)
{
Console.WriteLine(evt.ToString());
IdleStateEvent _event = (IdleStateEvent)evt;
if(_event.State== IdleState.AllIdle)
{
}
}
}
}`

it would help if you could take a dump, open it in VS and check parallel stacks, then report what the different stacks are. Picture of the stacks view is fine too.
Yes,i do!
I tried to use the ProcessExplorer monitor thread to find that many idle threads had not been destroyed;

you can create a dump file , and open it by vs2017 or windbg , so you find the thread is nettythread or not @491134648
How do you analyze the dump file? @caozhiyuan

dunp_winbgd_log.txt winbdg logs
使用仅限托管进行调试 @491134648 看下线程窗口 看log日志像是你代码里面也很多同步阻塞线程的东西 程序EventLoopCount是多少? 或者你本地用vs debug看能不能重现这种问题
我们的业务场景是在早晨、下午设备会打开,然而中午跟晚上很多都会关闭,在设备断线重连后资源监视器中的线程会增多,应该是原有的连接未释放,而我有加入ReadTimeoutHandler,Handler中有捕获相关异常; 关键代码如下 ` public void Init(ServerOptions options,Func<IChannelHandler> handler) { _options = options; if (options.EventLoopCount > 0) { bossGroup = new MultithreadEventLoopGroup(options.EventLoopCount); } else { bossGroup = new MultithreadEventLoopGroup(); } workerGroup = new MultithreadEventLoopGroup(); bootstrap = new ServerBootstrap(); bootstrap.Group(bossGroup, workerGroup); bootstrap.Channel<TcpServerSocketChannel>();
bootstrap.Option(ChannelOption.SoBacklog, _options.SoBacklog);
bootstrap.ChildOption(ChannelOption.RcvbufAllocator,new AdaptiveRecvByteBufAllocator(64,1048,65536));
bootstrap.Option(ChannelOption.TcpNodelay, true);
bootstrap.ChildOption(ChannelOption.SoKeepalive, false);
bootstrap.ChildHandler(new ActionChannelInitializer<ISocketChannel>(channel =>
{
IChannelPipeline pipeline = channel.Pipeline;
//pipeline.AddLast(new IdleStateHandler(300, 300, 0));
if (options.ReadTimeout > 0)
{
pipeline.AddLast("ReadTimeout", new ReadTimeoutHandler(options.ReadTimeout));
}
pipeline.AddLast(handler());
}));
}`
` public class SchoolHandler : SocketByteHandler { private string SessionId = string.Empty; private IAppSession _session; private ServerOptions _options = null; private IPersistenceSession _persistenceSession; private IPersistentLogService _logService; private IDtuService _dtuService; private ILogger<SchoolHandler> _logger; private ConcurrentQueue<TaskQueueModel> tasgkQueues; HashedWheelTimer hashedWheelTimer; public SchoolHandler() { tasgkQueues = new ConcurrentQueue<TaskQueueModel>(); } public override void ChannelActive(IChannelHandlerContext context) {
hashedWheelTimer = new HashedWheelTimer();
_options = FH.Core.AppConfig.ServerOptions;
_logger = ServiceLocator.GetService<ILogger<SchoolHandler>>(); //ILogger < DefaultPersistentLogService >
_persistenceSession = ServiceLocator.GetService<IPersistenceSession>();
_dtuService = ServiceLocator.GetService<IDtuService>();
_logService = ServiceLocator.GetService<IPersistentLogService>();
_session = new NettyAppSession();
_session.SessionId = Guid.NewGuid().ToString("n");
_session.RemoteIpAddress = context.Channel.RemoteAddress.ToString();
_session.ConnectOn = DateTimeUtils.GetTimeStampByUtcDate(DateTime.UtcNow);
_session.SocketSession = new NettySocketSession(context.Channel);
ServiceLocator.GetService<ISessionMananger>().AddSession(_session);
base.ChannelActive(context);
_logger.LogInformation("ChannelActive:" + context.Channel.RemoteAddress.ToString());
}
protected override void ChannelRead0(IChannelHandlerContext ctx, byte[] msg)
{
Dictionary<string, ItemModel> inItems = null;
List<Dictionary<string, ItemModel>> outItemsList = null;
Int16 timestamp = 0;
//IotMessage message = new IotMessage();
Stopwatch stopwatch = new Stopwatch();
ushort CmdNo = 0;
DateTime receiveTime = DateTime.Now;
try
{
stopwatch.Start();
CmdNo = DP.GetCmdNo(msg);
_session.BusNo = DP.GetBusNo(msg);
_session.DeviceId = DP.GetIMEI(msg);
#region 业务处理
inItems = DP.IsClient(CmdNo) ? DP.GetInHeadItems(CmdNo) : DP.GetOutHeadItems(CmdNo);
DP.SetItemsValue(msg, inItems);
CommdBase cmd = CommdBase.GetMe(CmdNo, _dtuService, _session);
outItemsList = cmd.Response(inItems);
timestamp = (Int16)stopwatch.ElapsedMilliseconds;
if (outItemsList == null || outItemsList.Count == 0)
{
return;
}
if (outItemsList.Count > 1)
{
for (int a = 1; a < outItemsList.Count; a++)
{
TaskQueueModel taskQueueModel = new TaskQueueModel()
{
SchoolId = _session.SchoolId,
IMEI = _session.IMEI,
No = _session.BusNo,
Item = outItemsList[a],
process = timestamp,
SessionId = _session.SessionId
};
tasgkQueues.Enqueue(taskQueueModel);
hashedWheelTimer.NewTimeout(new ActionTimerTask(b =>
{
try
{
TaskQueueModel tmodel = null;
if (tasgkQueues.TryDequeue(out tmodel))
{
byte[] resultBytes2 = DP.GetBytes(tmodel.Item);
_session.SocketSession.TrySend(resultBytes2);
if (_options.PersistentSocketLog)
{
_logService.InsertSorketLog(_session.RemoteIpAddress, tmodel.SessionId, resultBytes2, tmodel.IMEI, tmodel.Item, tmodel.process, receiveTime);
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "hashedWheelTimer");
}
}), TimeSpan.FromMilliseconds(_options.Multipleinterval * a));
}
}
#endregion
}
catch (Exception ex)
{
_logger.LogError(ex, string.Format("ChannelRead0:SchoolId:{0}:IMEI:{1},IMSI:{2}:CMDNO:{3};data:{4}"
, _session.SchoolId, _session.IMEI, _session.IMSI, CmdNo, msg.ToHexString(" ")));
}
finally
{
try
{
if (_options.PersistentSocketLog)
{
if (inItems != null)
{
_logService.InsertSorketLog(_session.RemoteIpAddress, SessionId, msg, _session.IMSI, inItems, timestamp, receiveTime);
}
}
if (outItemsList != null && outItemsList.Count > 0)
{
if (outItemsList[0] != null && outItemsList[0].Any())
{
byte[] resultBytes = DP.GetBytes(outItemsList[0]);
_session.SocketSession.TrySend(resultBytes);
_logService.InsertSorketLog(_session.RemoteIpAddress, SessionId, resultBytes, _session.IMEI, outItemsList[0], timestamp, receiveTime);
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, string.Format("ChannelRead0:SchoolId:{0}:IMEI:{1},IMSI:{2}:CMDNO:{3};data:{4}"
, _session.SchoolId, _session.IMEI, _session.IMSI, CmdNo, msg.ToHexString(" ")));
if (_options.PersistentSocketLog)
{
if (inItems != null)
{
_logService.InsertSorketLog(_session.RemoteIpAddress, SessionId, msg, _session.IMSI, inItems, timestamp, receiveTime);
}
}
}
_dtuService.Close();
}
}
public override void ChannelInactive(IChannelHandlerContext context)
{
try
{
ServiceLocator.GetService<ISessionMananger>().RemoveSession(_session);
_session = null;
_dtuService?.Dispose();
_dtuService = null;
hashedWheelTimer?.StopAsync().Wait(400);
hashedWheelTimer?.TryDispose();
hashedWheelTimer = null;
tasgkQueues?.Clear();
tasgkQueues?.TryDispose();
tasgkQueues = null;
_persistenceSession = null;
_logService = null;
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
base.ChannelInactive(context);
}
public override void ChannelReadComplete(IChannelHandlerContext context)
{
context.Flush();
}
public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
{
if (exception is ReadTimeoutException) //超时异常不处理
{
}else if(exception is SocketException)
{
context.CloseAsync();
}
else
{
_logger.LogError(exception, "ExceptionCaught:" + context.Channel.RemoteAddress.ToString() + ";SessionId:" + _session?.SessionId
+ ";IMEI:" + _session?.IMEI);
}
//base.ExceptionCaught(context, exception);
}
}
}
///
readonly bool autoRelease;
public SocketByteHandler() : this(true)
{
}
protected SocketByteHandler(bool autoRelease)
{
this.autoRelease = autoRelease;
}
public bool AcceptInboundMessage(object msg) => msg is IByteBuffer;
public override void ChannelRead(IChannelHandlerContext ctx, object msg)
{
bool release = true;
try
{
if (this.AcceptInboundMessage(msg))
{
IByteBuffer imsg = (IByteBuffer)msg;
byte[] result = new byte[imsg.ReadableBytes];
imsg.ReadBytes(result);
this.ChannelRead0(ctx, result);
}
else
{
release = false;
ctx.FireChannelRead(msg);
}
}
finally
{
if (autoRelease && release)
{
ReferenceCountUtil.Release(msg);
}
}
}
protected virtual void ChannelRead0(IChannelHandlerContext ctx,byte[] msg)
{
}
}`
应该是这里引起的异常,本机调试发现,这里开启了大量线程
Thank you,Get to a skill for windbg, I'll change HashedWheelTimer to a single case!
you can handle business in threadpool , usually netty handle network codecs , long time business will block netty thread. reference: https://github.com/caozhiyuan/DotNetty/blob/dev/src/DotNetty.Rpc/Server/RpcHandler.cs#L28