kafkaflow icon indicating copy to clipboard operation
kafkaflow copied to clipboard

[Bug Report]: Scope Middleware is Disposed of in OnConsumeCompleted or OnConsumeError Global Events

Open c5racing opened this issue 1 year ago • 2 comments
trafficstars

Prerequisites

  • [X] I have searched issues to ensure it has not already been reported

Description

Using the setup below, I am able to resolve services in MessageConsumeStarted such as

using (var scope = context.DependencyResolver.CreateScope()) { //Code Here }

If I attempt to use the same code in OnConsumeCompleted or OnConsumeError, I get an error the following error:

System.AggregateException: One or more errors occurred. (Instances cannot be resolved and nested lifetimes cannot be created from this LifetimeScope as it (or one of its parent scopes) has already been disposed.)

In my message handlers, I'm injecting my DBContext for EF, so it's important that I'm able register these as scoped. I can't use Transient as I set different parameters that need to be available vi scoped DI down the pipeline.

services.AddKafkaFlowHostedService(kafka => kafka
                    .UseMicrosoftLog()
                    .SubscribeGlobalEvents(observers =>
                    {
                        observers.MessageConsumeStarted.Subscribe(async eventContext => await ActivityLogConsumerEventsHandler.OnConsumeStarted(eventContext.MessageContext));
                        observers.MessageConsumeCompleted.Subscribe(async eventContext => await ActivityLogConsumerEventsHandler.OnConsumeCompleted(eventContext.MessageContext));
                        observers.MessageConsumeError.Subscribe(async eventContext => await ActivityLogConsumerEventsHandler.OnConsumeError(eventContext.MessageContext, eventContext.Exception));
                    })
                    .AddCluster(cluster => cluster
                    .WithBrokers(brokers)
                    .CreateTopicIfNotExists("sample-topic", 1, 1)
                    .AddConsumer(consumer => consumer
                    .Topic("sample-topic")
                    .WithGroupId("test-group")
                    .WithWorkersCount(1)
                    .WithAutoOffsetReset(AutoOffsetReset.Earliest)
                    .WithBufferSize(200)
                    .WithAutoCommitIntervalMs(2000)
                    .WithMaxPollIntervalMs(null)
                    .WithPartitionsRevokedHandler((resolver, partitions) =>
                    {
                        var logger = resolver.Resolve<ILogHandler>();

                        partitions.ForEach(partition =>
                        {
                            logger?.Warning($"Partition is being revoked: [{partition}]", null);
                        });
                    })
                    .AddMiddlewares(middlewares => middlewares
                    .AddSingleTypeDeserializer<NewtonsoftJsonDeserializer>(typeof(BaseEntity))
                    .AddTypedHandlers(h => h.AddHandler<UpdateDbForLanguage>().WithHandlerLifetime(InstanceLifetime.Scoped))
                    ))
                    .EnableAdminMessages("kafkaflow.admin")
            ));
        });

Steps to reproduce

  1. Create Scope Consumer
  2. Create 3 Global Events, MessageConsumeStarted, MessageConsumeCompleted, and MessageConsumeError
  3. Observe dependencies can be resolved in MessageConsumeStarted but not in MessageConsumeCompleted or MessageConsumeError

Expected behavior

All Global Events can resolve dependencies

Actual behavior

In the Global Event Handlers, OnConsumeCompleted or OnConsumeError, Dependancies cannot be resolved.

Using var scope = context.DependencyResolver.CreateScope() or context.DependencyResolver.Resolve<BaseEntity>(); results in the folowwing error:

System.AggregateException: One or more errors occurred. (Instances cannot be resolved and nested lifetimes cannot be created from this LifetimeScope as it (or one of its parent scopes) has already been disposed.)

KafkaFlow version

3.0.10

c5racing avatar Aug 05 '24 15:08 c5racing

I have similar problem. I have the error:

System.ObjectDisposedException: Cannot access a disposed object.
Object name: 'IServiceProvider'.

on MessageConsumeError

tomaszprasolek avatar Oct 29 '24 09:10 tomaszprasolek

@c5racing @tomaszprasolek I had the same problem, but I found out that for MessageConsumeCompleted calling context.MessageContext.ConsumerContext.ConsumerDependencyResolver.Resolve() instead of context.MessageContext.DependencyResolver.Resolve() worked. Not sure if that the intended way, but it was a fine workaround for me.

jimmyrn avatar Feb 05 '25 12:02 jimmyrn