netmq icon indicating copy to clipboard operation
netmq copied to clipboard

NetMQRuntime throws exception on cancellation

Open ageorgallides opened this issue 5 years ago • 15 comments

Hi,

A simplified version of what I am trying to achieve is to perform the following steps in an infinite loop:

  1. Instantiate NetMQRuntime via TaskManager.Initialize();
  2. Perform writes/async reads on multiple sockets
  3. Allow NetMQRuntime.Run(CancellationToken ct, ...) to return by setting the CancellationTokenSource via the TaskManager.Dispose().

The following exception is thrown: System.ObjectDisposedException: 'Cannot access a disposed object. Object name: 'NetMQPoller'.'

  • Note that the exception cannot always be reproduced.

with the following callstack: NetMQ.dll!NetMQ.NetMQPoller.CheckDisposed() Line 660 NetMQ.dll!NetMQ.NetMQPoller.TryExecuteTaskInline(System.Threading.Tasks.Task task, bool taskWasPreviouslyQueued) Line 82 [External Code] NetMQ.dll!NetMQ.NetMQSynchronizationContext.Send(System.Threading.SendOrPostCallback d, object state) Line 28 [External Code] client.dll!Client.TaskManager.Dispose(bool disposing) Line 98 client.dll!Client.TaskManager.Dispose() Line 86 client.dll!Client.Dispose(bool disposing) Line 61 client.dll!Client.Dispose() Line 45 clinet_test.dll!Test.TestInstance.CreateAndDestroyNetMQRuntimeInLoop() Line 642 [Resuming Async Method] [External Code] [Async Call Stack] [Async] clinet_test.dll!client_test.Program.Main(string[] args) Line 662

Environment

NetMQ Version:    master branch as is on 02/Sept/2019 (The intention is to use 4.0.0.239-pre which throws a similar exception)
Operating System: Windows 10 Pro x64
.NET Version:     .Net Core 2.1

Questions

What causes the NetMQPoller to be already disposed when the NetMQRuntime object goes out of the using() scope?

Is it enough to catch the exception, ignore it, and then proceed with creating a new instance of NetMQRuntime?

Should I take a different approach, in comparison with the provided example, when using NetMQRuntime?

Should I avoid cancelling the CancellationTokenSource from TaskManager.Dispose? (due to the fact that the Garbage Collector can also call it)

Example code of TaskManager class

TaskManager : IDisposable
{
    private CancellationTokenSource cts = new CancellationTokenSource();
    private Thread runtimeThread;
    ....

    public void Dispose()
    {
        Dispose(true);
    }

    protected virtual void Dispose(bool disposing)
    {
        if (disposed)
        {
            return;
        }

        if (disposing)
        {
            cts.Cancel();              // Allows NetMQRuntime.Run() to return
            runtimeFinished.WaitOne(); // Do not continue until NetMQRuntime is disposed
            cts.Dispose();
        }

        disposed = true;
    }


    public void Initialize()
    {
        runtimeThread = new Thread(()=>
            {
                using (requestDealerSocket = new DealerSocket(">tcp://" + remoteHost + ":" + remoteRequestAcceptorPort))
                using (eventDealerSocket = new DealerSocket(">tcp://" + remoteHost + ":" + remoteEventPublisherPort))
                using (requestForwarderSocket = new PairSocket("@inproc://request"))
                using (requestInitiatorSocket = new PairSocket(">inproc://request"))
                using (var netMqRuntime = new NetMQRuntime())
                {
                    netMqRuntime.Run(cts.Token, ForwardRequestToServerAsync(cts.Token), ListenForRequestAckFromServerAsync(cts.Token))                
                }
                
                runtimeFinished.Set(); // NetMQRuntime disposed
                
            });
        runtimeThread.Start();
    }
    
    async Task ForwardRequestToServerAsync(CancellationToken ct)
    {
        while(!ct.IsCancellationRequested)
        {
            var result = await requestForwarderSocket.ReceiveFrameBytesAsync();
            NetMQMessage message = new NetMQMessage();
            // populate message
            requestDealerSocket.SendMultipartMessage(message);
        }
    }
    
    async Task ListenForRequestAckFromServerAsync(CancellationToken ct)
    {
        while(!ct.IsCancellationRequested)
        {
            var message = await ReceiveMultipartMessageBytesAsync(requestDealerSocket);
            // do work
        }
    }
    
    async Task<List<byte[]>> ReceiveMultipartMessageBytesAsync(DealerSocket socket)
    {
        List<byte[]> message = new List<byte[]>();
        (byte[], bool) response;

        do
        {
            response = await socket.ReceiveFrameBytesAsync();
            message.Add(response.Item1);
        }
        while (response.Item2);

        return message;
    }
}

ageorgallides avatar Sep 02 '19 14:09 ageorgallides

By investigating further, the runtimeThread which calls the netMqRuntime.Run() is the one that also calls the NetMQPoller.Dispose(). After the NetMQPoller is disposed, a WorkerThread - which was instantiated by the NetMQRuntime - calls the NetMQPoller.TryExecuteTaskInline and consequently the CheckDisposed() which throws the exception.

Should any WorkerThreads be alive after the disposal of the NetMQPoller?

ageorgallides avatar Sep 02 '19 15:09 ageorgallides

Weird behavior, what call is on:

client.dll!Client.TaskManager.Dispose(bool disposing) Line 98

I have an idea for a fix, I will make a branch for it, will you be able to test?

Also, can you make a pull request with a test that reproduce it?

somdoron avatar Sep 03 '19 08:09 somdoron

please check https://github.com/zeromq/netmq/pull/815/commits/83c7c50dfda7ab26cf4e4bf10a0249c0c8f22cf1

somdoron avatar Sep 03 '19 08:09 somdoron

To clarify the "client.dll!Client.TaskManager.Dispose(bool disposing) Line 98", I just have a Client object that has a TaskManager as a member. When I call Client.Dispose(), in turn it calls TaskManager.Dispose(). Nothing special to it.

It seems that the change in 83c7c50 does not solve the issue.

There is a race condition between the thread that sets the NetMQRuntime's cancellationToken (which continues and calls the NetMQPoller.TryExecuteTaskInline) and another thread that calls the NetMQPoller.Dispose() via the NetMQRuntime.Dispose().

An easy way to reproduce the exception is by adding a Thread.Sleep within the NetMQPoller.TryExecuteTaskInline and trying to cancel the CancellationToken provided in the NetMQRuntime.Run() as shown below:

protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
    if (task == null)
        throw new ArgumentNullException(nameof(task));
    System.Threading.Thread.Sleep(5000); // Give time to the other thread to dispose this object
    CheckDisposed(); // will throw
    return CanExecuteTaskInline && TryExecuteTask(task);
}

I think it is the m_poller.Dispose(), in the NetMQRuntime.Dispose(), that causes the problem rather than the m_poller.Remove(socket).

ageorgallides avatar Sep 03 '19 12:09 ageorgallides

I asked about line 98 in TaskManager. I'm trying to understand what exactly is calling TryExecuteTaskInline

somdoron avatar Sep 03 '19 12:09 somdoron

Ok, I think I found the issue.

somdoron avatar Sep 03 '19 12:09 somdoron

Can you check https://github.com/zeromq/netmq/pull/815/commits/7cdbcc6ef6f89a46ad17c1146e29a41fe6803cc3

somdoron avatar Sep 03 '19 13:09 somdoron

Thanks. I will check it out and let you know as soon as possible.

ageorgallides avatar Sep 03 '19 15:09 ageorgallides

Hi, thanks for the fix!

May I ask when will be the next (pre)release including this change?

ageorgallides avatar Sep 26 '19 12:09 ageorgallides

I also would like to ask what version this fix will be released in.

toonetown avatar Dec 06 '19 17:12 toonetown

Just released https://www.nuget.org/packages/NetMQ/4.0.0.272-pre Should include it.

somdoron avatar Dec 08 '19 08:12 somdoron

Thank you! Just out of curiosity, how often are non-pre releases made?

toonetown avatar Dec 09 '19 21:12 toonetown

Not sure if I should create a new issue but I stumbled upon more exceptions regarding cancelling the token.

There is an issue in the async extensions which will leak the event registration in case the token is cancelled because this:

https://github.com/zeromq/netmq/blob/43d0d5630c1f21762117a277474f55973c32faf8/src/NetMQ/AsyncReceiveExtensions.cs#L100

is never called.

I see two issues here:

  • If the server eventually responds, an exception is thrown - simple try..finally would solve that.
  • If the server never responds the client will leak though the event registration.

What happens if I dispose the client before the server responds, will socket.TryReceive fail?

Adding the de-registration to cancellationToken.Register might result in non-deterministic behavior (since SynchronizationContext is not captured), IMO additional synchronization is needed.

See repro here.

shadow-cs avatar Feb 13 '20 09:02 shadow-cs

This issue has been automatically marked as stale because it has not had activity for 365 days. It will be closed if no further activity occurs within 56 days. Thank you for your contributions.

stale[bot] avatar Jun 28 '21 12:06 stale[bot]

@somdoron any feedback, please 😉

shadow-cs avatar Jul 14 '21 08:07 shadow-cs