Rebus icon indicating copy to clipboard operation
Rebus copied to clipboard

TplWorker does not support parallelism

Open elpht opened this issue 3 years ago • 2 comments

I was reading through the code (branch master, commit c2a32d5707a94696f05cccffac6e43c4ae28f27f) of the TplWorker and realized that when picking a new message from the transport all the subsequent tasks were awaited, without any chance for the worker to begin another parallel operation.

I just created a test in the class Rebus.Tests.Workers.TryUsingTaskParallelLibraryToReceiveMessages, reusing the existing test setup to try demonstrating my suspicions.

It fails when WorkerFactory is TaskParallelLibrary (actually it times out with these settings, but if the message count is decreased or the waiting time increased, it fails when checking maxParallelHandlers > 1):

        [TestCase(300, WorkerFactory.DedicatedWorkerThreads)]
        [TestCase(300, WorkerFactory.TaskParallelLibrary)]
        public async Task StuffHappensInParallel(int messageCount, WorkerFactory workerFactory)
        {
            var (activator, starter) = CreateBus(1, 10, workerFactory);

            var counter = new SharedCounter(messageCount);
            var parallelHandlers = 0;
            var maxParallelHandlers = 0;

            activator.Handle<string>(async str =>
            {
                var curParallelHandlers = Interlocked.Increment(ref parallelHandlers);
                if (curParallelHandlers > maxParallelHandlers)
                    maxParallelHandlers = curParallelHandlers;

                await Task.Delay(100);
                counter.Decrement();
                Interlocked.Decrement(ref parallelHandlers);
            });

            var bus = starter.Start();

            await Task.WhenAll(Enumerable.Range(0, messageCount)
                .Select(n => bus.SendLocal($"THIS IS MESSAGE {n}")));

            counter.WaitForResetEvent(timeoutSeconds: 15);
            Assert.That(maxParallelHandlers, Is.GreaterThan(1));
        }

elpht avatar Nov 19 '21 09:11 elpht

This is because, when using TPL-based workers, the "number of workers" IS the parallelism 🙂

While it would probably make sense to separate the two, unfortunately that's now how it works right now.

mookid8000 avatar Nov 19 '21 13:11 mookid8000

Thanks for the quick answer!

Ok, understood. Then I'd just like to say, if I'm allowed to make a suggestion (or two 🙂), that it would be nice to:

  • Mention in the documentation that that configuration only applies to ThreadPool-based ones.
  • Get an exception (NotSupportedException thrown by maybe TplOptionsExtensions.UseTplToReceiveMessages) or, at least, a warning log message when the bus is configured to use TPL-based workers and MaxParallelism > 1.

In any case, thanks for your time and effort invested in this library.

elpht avatar Nov 19 '21 13:11 elpht

I've given the wiki page linked above an overhaul, e.g. by adding a section about TPL-based workers

mookid8000 avatar Nov 14 '23 16:11 mookid8000