kafkaflow
kafkaflow copied to clipboard
Kafka configuration from DI
Hello.
In our project we have external configration settings storage. We access this storage via service injected by DI container. But to use KafkaFlow we must provide all settings - broker addresses, timeouts, etc on DI container configuration step.
serviceCollection.AddKafka(builder => builder.AddCluster(cluster => cluster.WithBrokers(/*WE DON'T KNOW THEM YET*/)));
It would be great to have ability to fill this settings later, when we can inject settings storage service. Maybe something like this:
serviceCollection.AddKafka(builder => builder.AddCluster(cluster => cluster.WithBrokers(resolver => resolver.GetRequiredService<ISettingsService>().GetKafkaBrokers())));
Or by registering in DI implementations of interface like
public interface IKafkaConsumerConfigurator {
void Conifgure(IRuntimeClusterConfigurationBuilder builder);
}
where IRuntimeClusterConfigurationBuilder is subset of IClusterConfigurationBuilder which allows only to configure stuff that doesn't need to be added to DI. And then using it in KafkaConfigurationBuilder
public void Build()
{
this.dependencyConfigurator.AddSingleton(
resolver =>
{
var configuration = new KafkaConfiguration();
var configurators = resolver.ResolveAll(typeof(IKafkaConsumerConfigurator)).Select(x => x as IKafkaConsumerConfigurator);
configuration.AddClusters(
this.clusters.Select(
x =>
{
foreach (var configurator in configurators)
{
configurator?.Conifgure(x);
}
return x.Build(configuration);
}
)
);
return configuration;
}
);
this.dependencyConfigurator.AddSingleton<IProducerAccessor>(
resolver =>
{
var configuration = resolver.Resolve(typeof(KafkaConfiguration)) as KafkaConfiguration!;
return new ProducerAccessor(
configuration!.Clusters
.SelectMany(x => x.Producers)
.Select(
producer => new MessageProducer(
resolver,
producer
)
)
);
}
);
this.dependencyConfigurator
.AddTransient(typeof(ILogHandler), this.logHandlerType)
.AddSingleton<IDateTimeProvider, DateTimeProvider>()
.AddSingleton<IConsumerAccessor>(new ConsumerAccessor())
.AddSingleton<IConsumerManagerFactory>(new ConsumerManagerFactory());
}
and also on bus creation:
public IKafkaBus CreateBus(IDependencyResolver resolver)
{
var scope = resolver.CreateScope();
var configuration = resolver.Resolve(typeof(KafkaConfiguration)) as KafkaConfiguration;
return new KafkaBus(
scope.Resolver,
configuration,
scope.Resolver.Resolve<IConsumerManagerFactory>(),
scope.Resolver.Resolve<IConsumerAccessor>(),
scope.Resolver.Resolve<IProducerAccessor>());
}
I would like to hear your thoughts on this issue. Thanks.
Hi @SonicGD ,
this option looks good to me. Can you open a pull request?
serviceCollection.AddKafka(builder => builder.AddCluster(cluster => cluster.WithBrokers(resolver => resolver.GetRequiredService<ISettingsService>().GetKafkaBrokers())));
@SonicGD @filipeesch has there been any progress made on this? We too need the ability to fill in broker, consumer, producer information at a later time.
@SonicGD was there a workaround for this since there is not a pull request?
@kjsulls Hello. I did started working on this, but then switched to other tasks. Current workaround we are using is to build small temporary DI-provider with configuration services and their dependencies, get all data from them, configure KafkaFlow on main Service Collecton and then dispose provider.
I still hope to work on proper solution when i have time.
@SonicGD I completely understand. Your workaround in the meantime seems like a good solution. I'll play around with it. Thanks!