kafkaflow
kafkaflow copied to clipboard
[Bug Report]: Scope Middleware is Disposed of in OnConsumeCompleted or OnConsumeError Global Events
Prerequisites
- [X] I have searched issues to ensure it has not already been reported
Description
Using the setup below, I am able to resolve services in MessageConsumeStarted such as
using (var scope = context.DependencyResolver.CreateScope()) { //Code Here }
If I attempt to use the same code in OnConsumeCompleted or OnConsumeError, I get an error the following error:
System.AggregateException: One or more errors occurred. (Instances cannot be resolved and nested lifetimes cannot be created from this LifetimeScope as it (or one of its parent scopes) has already been disposed.)
In my message handlers, I'm injecting my DBContext for EF, so it's important that I'm able register these as scoped. I can't use Transient as I set different parameters that need to be available vi scoped DI down the pipeline.
services.AddKafkaFlowHostedService(kafka => kafka
.UseMicrosoftLog()
.SubscribeGlobalEvents(observers =>
{
observers.MessageConsumeStarted.Subscribe(async eventContext => await ActivityLogConsumerEventsHandler.OnConsumeStarted(eventContext.MessageContext));
observers.MessageConsumeCompleted.Subscribe(async eventContext => await ActivityLogConsumerEventsHandler.OnConsumeCompleted(eventContext.MessageContext));
observers.MessageConsumeError.Subscribe(async eventContext => await ActivityLogConsumerEventsHandler.OnConsumeError(eventContext.MessageContext, eventContext.Exception));
})
.AddCluster(cluster => cluster
.WithBrokers(brokers)
.CreateTopicIfNotExists("sample-topic", 1, 1)
.AddConsumer(consumer => consumer
.Topic("sample-topic")
.WithGroupId("test-group")
.WithWorkersCount(1)
.WithAutoOffsetReset(AutoOffsetReset.Earliest)
.WithBufferSize(200)
.WithAutoCommitIntervalMs(2000)
.WithMaxPollIntervalMs(null)
.WithPartitionsRevokedHandler((resolver, partitions) =>
{
var logger = resolver.Resolve<ILogHandler>();
partitions.ForEach(partition =>
{
logger?.Warning($"Partition is being revoked: [{partition}]", null);
});
})
.AddMiddlewares(middlewares => middlewares
.AddSingleTypeDeserializer<NewtonsoftJsonDeserializer>(typeof(BaseEntity))
.AddTypedHandlers(h => h.AddHandler<UpdateDbForLanguage>().WithHandlerLifetime(InstanceLifetime.Scoped))
))
.EnableAdminMessages("kafkaflow.admin")
));
});
Steps to reproduce
- Create Scope Consumer
- Create 3 Global Events, MessageConsumeStarted, MessageConsumeCompleted, and MessageConsumeError
- Observe dependencies can be resolved in MessageConsumeStarted but not in MessageConsumeCompleted or MessageConsumeError
Expected behavior
All Global Events can resolve dependencies
Actual behavior
In the Global Event Handlers, OnConsumeCompleted or OnConsumeError, Dependancies cannot be resolved.
Using var scope = context.DependencyResolver.CreateScope() or context.DependencyResolver.Resolve<BaseEntity>(); results in the folowwing error:
System.AggregateException: One or more errors occurred. (Instances cannot be resolved and nested lifetimes cannot be created from this LifetimeScope as it (or one of its parent scopes) has already been disposed.)
KafkaFlow version
3.0.10
I have similar problem. I have the error:
System.ObjectDisposedException: Cannot access a disposed object.
Object name: 'IServiceProvider'.
on MessageConsumeError
@c5racing @tomaszprasolek I had the same problem, but I found out that for MessageConsumeCompleted calling
context.MessageContext.ConsumerContext.ConsumerDependencyResolver.Resolve() instead of
context.MessageContext.DependencyResolver.Resolve()
worked. Not sure if that the intended way, but it was a fine workaround for me.