netmq
netmq copied to clipboard
NetMQRuntime throws exception on cancellation
Hi,
A simplified version of what I am trying to achieve is to perform the following steps in an infinite loop:
- Instantiate NetMQRuntime via TaskManager.Initialize();
- Perform writes/async reads on multiple sockets
- Allow NetMQRuntime.Run(CancellationToken ct, ...) to return by setting the CancellationTokenSource via the TaskManager.Dispose().
The following exception is thrown: System.ObjectDisposedException: 'Cannot access a disposed object. Object name: 'NetMQPoller'.'
- Note that the exception cannot always be reproduced.
with the following callstack: NetMQ.dll!NetMQ.NetMQPoller.CheckDisposed() Line 660 NetMQ.dll!NetMQ.NetMQPoller.TryExecuteTaskInline(System.Threading.Tasks.Task task, bool taskWasPreviouslyQueued) Line 82 [External Code] NetMQ.dll!NetMQ.NetMQSynchronizationContext.Send(System.Threading.SendOrPostCallback d, object state) Line 28 [External Code] client.dll!Client.TaskManager.Dispose(bool disposing) Line 98 client.dll!Client.TaskManager.Dispose() Line 86 client.dll!Client.Dispose(bool disposing) Line 61 client.dll!Client.Dispose() Line 45 clinet_test.dll!Test.TestInstance.CreateAndDestroyNetMQRuntimeInLoop() Line 642 [Resuming Async Method] [External Code] [Async Call Stack] [Async] clinet_test.dll!client_test.Program.Main(string[] args) Line 662
Environment
NetMQ Version: master branch as is on 02/Sept/2019 (The intention is to use 4.0.0.239-pre which throws a similar exception)
Operating System: Windows 10 Pro x64
.NET Version: .Net Core 2.1
Questions
What causes the NetMQPoller to be already disposed when the NetMQRuntime object goes out of the using() scope?
Is it enough to catch the exception, ignore it, and then proceed with creating a new instance of NetMQRuntime?
Should I take a different approach, in comparison with the provided example, when using NetMQRuntime?
Should I avoid cancelling the CancellationTokenSource from TaskManager.Dispose? (due to the fact that the Garbage Collector can also call it)
Example code of TaskManager class
TaskManager : IDisposable
{
private CancellationTokenSource cts = new CancellationTokenSource();
private Thread runtimeThread;
....
public void Dispose()
{
Dispose(true);
}
protected virtual void Dispose(bool disposing)
{
if (disposed)
{
return;
}
if (disposing)
{
cts.Cancel(); // Allows NetMQRuntime.Run() to return
runtimeFinished.WaitOne(); // Do not continue until NetMQRuntime is disposed
cts.Dispose();
}
disposed = true;
}
public void Initialize()
{
runtimeThread = new Thread(()=>
{
using (requestDealerSocket = new DealerSocket(">tcp://" + remoteHost + ":" + remoteRequestAcceptorPort))
using (eventDealerSocket = new DealerSocket(">tcp://" + remoteHost + ":" + remoteEventPublisherPort))
using (requestForwarderSocket = new PairSocket("@inproc://request"))
using (requestInitiatorSocket = new PairSocket(">inproc://request"))
using (var netMqRuntime = new NetMQRuntime())
{
netMqRuntime.Run(cts.Token, ForwardRequestToServerAsync(cts.Token), ListenForRequestAckFromServerAsync(cts.Token))
}
runtimeFinished.Set(); // NetMQRuntime disposed
});
runtimeThread.Start();
}
async Task ForwardRequestToServerAsync(CancellationToken ct)
{
while(!ct.IsCancellationRequested)
{
var result = await requestForwarderSocket.ReceiveFrameBytesAsync();
NetMQMessage message = new NetMQMessage();
// populate message
requestDealerSocket.SendMultipartMessage(message);
}
}
async Task ListenForRequestAckFromServerAsync(CancellationToken ct)
{
while(!ct.IsCancellationRequested)
{
var message = await ReceiveMultipartMessageBytesAsync(requestDealerSocket);
// do work
}
}
async Task<List<byte[]>> ReceiveMultipartMessageBytesAsync(DealerSocket socket)
{
List<byte[]> message = new List<byte[]>();
(byte[], bool) response;
do
{
response = await socket.ReceiveFrameBytesAsync();
message.Add(response.Item1);
}
while (response.Item2);
return message;
}
}
By investigating further, the runtimeThread which calls the netMqRuntime.Run() is the one that also calls the NetMQPoller.Dispose(). After the NetMQPoller is disposed, a WorkerThread - which was instantiated by the NetMQRuntime - calls the NetMQPoller.TryExecuteTaskInline and consequently the CheckDisposed() which throws the exception.
Should any WorkerThreads be alive after the disposal of the NetMQPoller?
Weird behavior, what call is on:
client.dll!Client.TaskManager.Dispose(bool disposing) Line 98
I have an idea for a fix, I will make a branch for it, will you be able to test?
Also, can you make a pull request with a test that reproduce it?
please check https://github.com/zeromq/netmq/pull/815/commits/83c7c50dfda7ab26cf4e4bf10a0249c0c8f22cf1
To clarify the "client.dll!Client.TaskManager.Dispose(bool disposing) Line 98", I just have a Client object that has a TaskManager as a member. When I call Client.Dispose(), in turn it calls TaskManager.Dispose(). Nothing special to it.
It seems that the change in 83c7c50 does not solve the issue.
There is a race condition between the thread that sets the NetMQRuntime's cancellationToken (which continues and calls the NetMQPoller.TryExecuteTaskInline) and another thread that calls the NetMQPoller.Dispose() via the NetMQRuntime.Dispose().
An easy way to reproduce the exception is by adding a Thread.Sleep within the NetMQPoller.TryExecuteTaskInline and trying to cancel the CancellationToken provided in the NetMQRuntime.Run() as shown below:
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
if (task == null)
throw new ArgumentNullException(nameof(task));
System.Threading.Thread.Sleep(5000); // Give time to the other thread to dispose this object
CheckDisposed(); // will throw
return CanExecuteTaskInline && TryExecuteTask(task);
}
I think it is the m_poller.Dispose(), in the NetMQRuntime.Dispose(), that causes the problem rather than the m_poller.Remove(socket).
I asked about line 98 in TaskManager. I'm trying to understand what exactly is calling TryExecuteTaskInline
Ok, I think I found the issue.
Can you check https://github.com/zeromq/netmq/pull/815/commits/7cdbcc6ef6f89a46ad17c1146e29a41fe6803cc3
Thanks. I will check it out and let you know as soon as possible.
Hi, thanks for the fix!
May I ask when will be the next (pre)release including this change?
I also would like to ask what version this fix will be released in.
Just released https://www.nuget.org/packages/NetMQ/4.0.0.272-pre Should include it.
Thank you! Just out of curiosity, how often are non-pre releases made?
Not sure if I should create a new issue but I stumbled upon more exceptions regarding cancelling the token.
There is an issue in the async extensions which will leak the event registration in case the token is cancelled because this:
https://github.com/zeromq/netmq/blob/43d0d5630c1f21762117a277474f55973c32faf8/src/NetMQ/AsyncReceiveExtensions.cs#L100
is never called.
I see two issues here:
- If the server eventually responds, an exception is thrown - simple
try
..finally
would solve that. - If the server never responds the client will leak though the event registration.
What happens if I dispose the client before the server responds, will socket.TryReceive
fail?
Adding the de-registration to cancellationToken.Register
might result in non-deterministic behavior (since SynchronizationContext
is not captured), IMO additional synchronization is needed.
See repro here.
This issue has been automatically marked as stale because it has not had activity for 365 days. It will be closed if no further activity occurs within 56 days. Thank you for your contributions.
@somdoron any feedback, please 😉