netmq
netmq copied to clipboard
Unhandled System.Net.Sockets.SocketException in reaper & iothread
I received two SocketExceptions in threads: reaper and iothread, respectively. Exception message was "An existing connection was forcibly closed by the remote host". Here are the stack traces:
System.Net.Sockets.Socket.Receive(System.Byte[] buffer, System.Int32 offset, System.Int32 size, System.Net.Sockets.SocketFlags socketFlags)
NetMQ.Core.Mailbox.TryRecv(System.Int32 timeout, NetMQ.Core.command& command)
NetMQ.Core.Reaper.InEvent()
NetMQ.Core.Utils.Poller.Loop()
System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, System.Object state, System.Boolean preserveSyncCtx)
System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, System.Object state, System.Boolean preserveSyncCtx)
System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, System.Object state)
System.Threading.ThreadHelper.ThreadStart()
System.Net.Sockets.Socket.Send(System.Byte[] buffer, System.Int32 offset, System.Int32 size, System.Net.Sockets.SocketFlags socketFlags)
NetMQ.Core.Mailbox.Send(NetMQ.Core.command )
NetMQ.Core.ZObject.SendCommand(NetMQ.Core.command )
NetMQ.Core.Pipe.Flush()
NetMQ.Core.Patterns.pair.XSend(NetMQ.msg& )
NetMQ.Core.SocketBase.TrySend(NetMQ.msg& , System.TimeSpan , System.Boolean )
NetMQ.Core.MonitorEvent.Write(NetMQ.Core.SocketBase )
NetMQ.Core.SocketBase.EventDisconnected(System.String , AsyncIO.AsyncSocket )
NetMQ.Core.Transports.StreamEngine.Error()
NetMQ.Core.Transports.StreamEngine.FeedAction(NetMQ.Core.Transports.Action , System.Net.Sockets.SocketError , System.Int32 )
NetMQ.Core.Utils.Proactor.Loop()
System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, System.Object state, System.Boolean preserveSyncCtx)
System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, System.Object state, System.Boolean preserveSyncCtx)
System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, System.Object state)
System.Threading.ThreadHelper.ThreadStart()
I suggest that they should be handled in NetMQ.
That should not happened, however I saw it in libzmq as well running on windows. Can you share some more information on when this happen?
I don't know the actual reason of the error "An existing connection was forcibly closed by the remote host", but it seems that something lead to connection closing, possibly network interface disconnect. The sockets are opened and used on the client in the following way:
NetMQTimer netTimer = new NetMQTimer(0);
using (DealerSocket netClientSocket = new DealerSocket())
using (SubscriberSocket netSubscriberSocket = new SubscriberSocket())
using (NetMQQueue<NetCommand> commandQueue = new NetMQQueue<NetCommand>())
using (NetMQPoller netPoller = new NetMQPoller { netClientSocket, netSubscriberSocket, commandQueue, netTimer })
using (NetMQMonitor netClientMonitor = new NetMQMonitor(netClientSocket, "inproc://cli.inproc", SocketEvents.Connected | SocketEvents.ConnectDelayed | SocketEvents.ConnectRetried | SocketEvents.Disconnected | SocketEvents.Closed))
using (NetMQMonitor netSubscriberMonitor = new NetMQMonitor(netSubscriberSocket, "inproc://sub.inproc", SocketEvents.Connected | SocketEvents.ConnectDelayed | SocketEvents.ConnectRetried | SocketEvents.Disconnected | SocketEvents.Closed))
{
netTimer.Elapsed += NetTimerElapsed;
netClientSocket.ReceiveReady += NetClientReceive;
netSubscriberSocket.Subscribe(new byte[] {});
netSubscriberSocket.ReceiveReady += NetSubscriberReceive;
commandQueue.ReceiveReady += NetCommandQueueReceive;
netClientMonitor.Connected += NetClientMonitorConnected;
netClientMonitor.ConnectDelayed += NetClientMonitorConnectDelayed;
netClientMonitor.ConnectRetried += NetClientMonitorConnectRetried;
netClientMonitor.Disconnected += NetClientMonitorDisconnected;
netClientMonitor.Closed += NetClientMonitorClosed;
netClientMonitor.AttachToPoller(netPoller);
netSubscriberMonitor.Connected += NetSubscriberMonitorConnected;
netSubscriberMonitor.ConnectDelayed += NetSubscriberMonitorConnectDelayed;
netSubscriberMonitor.ConnectRetried += NetSubscriberMonitorConnectRetried;
netSubscriberMonitor.Disconnected += NetSubscriberMonitorDisconnected;
netSubscriberMonitor.Closed += NetSubscriberMonitorClosed;
netSubscriberMonitor.AttachToPoller(netPoller);
netClientSocket.Connect("tcp://localhost:15555");
netSubscriberSocket.Connect("tcp://localhost:15556");
netPoller.Run();
netSubscriberMonitor.DetachFromPoller();
netClientMonitor.DetachFromPoller();
}
}
catch (Exception ex)
{
...
}
The server side is used in a similar way. The same exception seems to be caught also here.
Hi @somdoron , currently we are having this pain after deploying our app to prod environment.
We are using Router/Dealer sockets, netMQ version 4.0.0.207, AsyncIO version 0.1.69.0
We experienced app crashed immediately (thread throwing unhandled ex) which makes us struggling to change netMQ library by ourselves....
Would you please shed us some lights on it and if any fix were applied regarding this error?
=============================== [NetMQPollerThread] 4:00:18AM An existing connection was forcibly closed by the remote host, SocketException, IsTerminating:T System.Net.Sockets.SocketException (0x80004005): An existing connection was forcibly closed by the remote host at System.Net.Sockets.Socket.Receive(Byte[] buffer, Int32 offset, Int32 size, SocketFlags socketFlags) at NetMQ.Core.Mailbox.TryRecv(Int32 timeout, Command& command) at NetMQ.Core.SocketBase.ProcessCommands(Int32 timeout, Boolean throttle) at NetMQ.Core.SocketBase.GetSocketOption(ZmqSocketOption option) at NetMQ.NetMQSelector.Select(Item[] items, Int32 itemsCount, Int64 timeout) at NetMQ.NetMQPoller.RunPoller() at NetMQ.NetMQPoller.Run(SynchronizationContext syncContext) at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx) at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx) at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state) at System.Threading.ThreadHelper.ThreadStart()
This issue has been automatically marked as stale because it has not had activity for 365 days. It will be closed if no further activity occurs within 56 days. Thank you for your contributions.
EDIT: This seems to be an issue with Godot, and possibly Mono.
I have the same issue as @dickens-code and I am only running the example code on the front page of the project's GitHub.
Code:
using (var server = new ResponseSocket("@tcp://localhost:5556")) // bind
using (var client = new RequestSocket(">tcp://localhost:5556")) // connect
{
// Send a message from the client socket
client.SendFrame("Hello");
// Receive the message from the server socket
string m1 = server.ReceiveFrameString();
Console.WriteLine("From Client: {0}", m1);
// Send a response back from the server
server.SendFrame("Hi Back");
// Receive the response from the client socket
string m2 = client.ReceiveFrameString();
Console.WriteLine("From Server: {0}", m2);
}
It crashes here:
client.SendFrame("Hello");
Under Rider when I check the Exception from Debug: https://imgur.com/a/xuwSVU0
I am targeting .NET 4.8 with C# (Preview language version). I am using 4.0.1.7-pre of NetMQ.
The same issue occurs if I comment out the code and simply use:
using (var server = new ResponseSocket("@tcp://localhost:5556")) ;
I just tested it on 4.0.1.6 and the same issue occurs.
I just fixed the issue.
AsyncIO.ForceDotNet.Force();
and
NetMQConfig.Cleanup();
must both be used.
Here is my Godot main script (Main.cs):
using Godot;
using NetMQ;
using NetMQ.Sockets;
namespace RPG
{
public class Main : Node2D
{
public override void _Ready()
{
AsyncIO.ForceDotNet.Force();
using (var server = new ResponseSocket("@tcp://localhost:5556")) // bind
using (var client = new RequestSocket(">tcp://localhost:5556")) // connect
{
// Send a message from the client socket
client.SendFrame("Hello");
// Receive the message from the server socket
string m1 = server.ReceiveFrameString();
GD.Print("From Client: {0}", m1);
// Send a response back from the server
server.SendFrame("Hi Back");
// Receive the response from the client socket
string m2 = client.ReceiveFrameString();
GD.Print("From Server: {0}", m2);
}
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
NetMQConfig.Cleanup();
}
}
}
I want to give credit to: https://github.com/valkjsaaa/Unity-ZeroMQ-Example/blob/master/Assets/ServerObject.cs
This issue has been automatically marked as stale because it has not had activity for 365 days. It will be closed if no further activity occurs within 56 days. Thank you for your contributions.