Rebus icon indicating copy to clipboard operation
Rebus copied to clipboard

RandlerActivator.Register fails to get correct registred instance

Open hartmark opened this issue 9 months ago • 0 comments

I'm trying to write some tests and have message handlers in my DI, but when I'm running the messages fails to get its handler.

I have written a small reproduction case. TestCase 3 and 4 should work.

The issue is that when the type is not known at compile-time the register function doesn't work. I need to cast to IHandleMessages to get it compile but then it fails to get the message handler when sending messages.

using System;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using NUnit.Framework;
using Rebus.Activation;
using Rebus.Config;
using Rebus.Handlers;
using Rebus.Persistence.InMem;
using Rebus.Routing.TypeBased;
using Rebus.Transport.InMem;

namespace Processor.Tests.EndToEndTests;

[TestFixture]
public class RebusTest
{
    [TestCase(1)]
    [TestCase(2)]
    [TestCase(3)]
    [TestCase(4)]
    public async Task Test_Register(int caseNumber)
    {
        var serviceProvider = new ServiceCollection()
            .AddTransient<MyHandler>()
            .BuildServiceProvider();
        
        var handlerActivator  = new BuiltinHandlerActivator();

        switch (caseNumber)
        {
            case 1:
                handlerActivator.Register(() => (IHandleMessages<MyMessage>)serviceProvider.GetRequiredService<MyHandler>());
                break;
            
            case 2:
                handlerActivator.Register(() => serviceProvider.GetRequiredService<MyHandler>());
                break;
            
            case 3:
                var assembly = Assembly.GetAssembly(typeof(MyHandler));
        
                // Find all types that implement IHandleMessages<T>
                var handlerTypes = assembly!.GetTypes()
                    .Where(t => t.GetInterfaces().Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IHandleMessages<>)));
                foreach (var handlerType in handlerTypes)
                {
                    Console.WriteLine($"Registering handler: {handlerType.Name}");
                    handlerActivator.Register(() => (IHandleMessages)serviceProvider.GetRequiredService<MyHandler>());
                }

                break;
            
            case 4:
                handlerActivator.Register(() => (IHandleMessages)serviceProvider.GetRequiredService<MyHandler>());
                break;
        }
        
        var bus = Configure.With(handlerActivator )
            .Transport(t => t.UseInMemoryTransport(new InMemNetwork(), "MyInput"))
            .Routing(r => r.TypeBased()
                    .Map<MyMessage>("MyInput")
            )
            .Timeouts(t => t.StoreInMemory())
            .Start();
        
        
        var myMessage = new MyMessage();
        await bus.Send(myMessage);
        await bus.SendLocal(myMessage);
        
        await Task.Delay(1000);
    }
}

public class MyHandler : IHandleMessages<MyMessage>
{
    public Task Handle(MyMessage message)
    {
        return Task.CompletedTask;
    }
}

public class MyMessage
{
}

Example output from TestCase 3:


Registering handler: MyHandler
[INF] Rebus.Routing.TypeBased.TypeBasedRouter (NonParallelWorker): Mapped .Processor.Tests.EndToEndTests.MyMessage -> "MyInput"
[INF] Rebus.Threading.TaskParallelLibrary.TplAsyncTask (NonParallelWorker): Starting periodic task "CleanupTrackedErrors" with interval 00:00:10
[INF] Rebus.Threading.TaskParallelLibrary.TplAsyncTask (NonParallelWorker): Starting periodic task "DueMessagesSender" with interval 00:00:01
[INF] Rebus.Bus.RebusBus (NonParallelWorker): Bus "Rebus 3" setting number of workers to 1
[DBG] Rebus.Bus.RebusBus (NonParallelWorker): Adding worker "Rebus 3 worker 1"
[INF] Rebus.Bus.RebusBus (NonParallelWorker): Bus "Rebus 3" started
[DBG] Rebus.Workers.ThreadPoolBased.ThreadPoolWorker (Rebus 3 worker 1): Starting (threadpool-based) worker "Rebus 3 worker 1"
[DBG] Rebus.Pipeline.Send.SendOutgoingMessageStep (NonParallelWorker): Sending Processor.Tests.EndToEndTests.MyMessage -> "MyInput"
[WRN] Rebus.Retry.Simple.DefaultExceptionLogger (Rebus 3 worker 1): Unhandled exception 1 (FINAL) while handling message with ID "99965292-5af7-4533-9981-46a0bcb2bf2e"
Rebus.Exceptions.MessageCouldNotBeDispatchedToAnyHandlersException: Message with ID 99965292-5af7-4533-9981-46a0bcb2bf2e and type Processor.Tests.EndToEndTests.MyMessage, Processor.Tests could not be dispatched to any handlers (and will not be retried under the default fail-fast settings)
   at Rebus.Pipeline.Receive.DispatchIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)
   at Rebus.Sagas.LoadSagaDataStep.Process(IncomingStepContext context, Func`1 next)
   at Rebus.Pipeline.Receive.ActivateHandlersStep.Process(IncomingStepContext context, Func`1 next)
   at Rebus.Pipeline.Receive.HandleRoutingSlipsStep.Process(IncomingStepContext context, Func`1 next)
   at Rebus.Pipeline.Receive.DeserializeIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)
   at Rebus.DataBus.ClaimCheck.HydrateIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)
   at Rebus.Pipeline.Receive.HandleDeferredMessagesStep.Process(IncomingStepContext context, Func`1 next)
   at Rebus.Retry.Simple.DefaultRetryStep.Process(IncomingStepContext context, Func`1 next)
[ERR] Rebus.Retry.PoisonQueues.DeadletterQueueErrorHandler (Rebus 3 worker 1): Moving message with ID "99965292-5af7-4533-9981-46a0bcb2bf2e" to error queue "error" - error details: "1 unhandled exceptions: 2025-03-14 01:22:15 +01:00: Rebus.Exceptions.MessageCouldNotBeDispatchedToAnyHandlersException: Message with ID 99965292-5af7-4533-9981-46a0bcb2bf2e and type Processor.Tests.EndToEndTests.MyMessage, Processor.Tests could not be dispatched to any handlers (and will not be retried under the default fail-fast settings)
   at Rebus.Pipeline.Receive.DispatchIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)
   at Rebus.Sagas.LoadSagaDataStep.Process(IncomingStepContext context, Func`1 next)
   at Rebus.Pipeline.Receive.ActivateHandlersStep.Process(IncomingStepContext context, Func`1 next)
   at Rebus.Pipeline.Receive.HandleRoutingSlipsStep.Process(IncomingStepContext context, Func`1 next)
   at Rebus.Pipeline.Receive.DeserializeIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)
   at Rebus.DataBus.ClaimCheck.HydrateIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)
   at Rebus.Pipeline.Receive.HandleDeferredMessagesStep.Process(IncomingStepContext context, Func`1 next)
   at Rebus.Retry.Simple.DefaultRetryStep.Process(IncomingStepContext context, Func`1 next)"

hartmark avatar Mar 14 '25 00:03 hartmark