kafkaflow icon indicating copy to clipboard operation
kafkaflow copied to clipboard

Kafka configuration from DI

Open SonicGD opened this issue 2 years ago • 4 comments

Hello.

In our project we have external configration settings storage. We access this storage via service injected by DI container. But to use KafkaFlow we must provide all settings - broker addresses, timeouts, etc on DI container configuration step.

serviceCollection.AddKafka(builder => builder.AddCluster(cluster => cluster.WithBrokers(/*WE DON'T KNOW THEM YET*/)));

It would be great to have ability to fill this settings later, when we can inject settings storage service. Maybe something like this:

serviceCollection.AddKafka(builder => builder.AddCluster(cluster => cluster.WithBrokers(resolver => resolver.GetRequiredService<ISettingsService>().GetKafkaBrokers())));

Or by registering in DI implementations of interface like

public interface IKafkaConsumerConfigurator {
    void Conifgure(IRuntimeClusterConfigurationBuilder builder);
}

where IRuntimeClusterConfigurationBuilder is subset of IClusterConfigurationBuilder which allows only to configure stuff that doesn't need to be added to DI. And then using it in KafkaConfigurationBuilder

  public void Build()
        {
            this.dependencyConfigurator.AddSingleton(
                resolver =>
                {
                    var configuration = new KafkaConfiguration();
                    var configurators = resolver.ResolveAll(typeof(IKafkaConsumerConfigurator)).Select(x => x as IKafkaConsumerConfigurator);
                    configuration.AddClusters(
                        this.clusters.Select(
                            x =>
                            {
                                foreach (var configurator in configurators)
                                {
                                    configurator?.Conifgure(x);
                                }

                                return x.Build(configuration);
                            }
                        )
                    );

                    return configuration;
                }
            );

            this.dependencyConfigurator.AddSingleton<IProducerAccessor>(
                resolver =>
                {
                    var configuration = resolver.Resolve(typeof(KafkaConfiguration)) as KafkaConfiguration!;
                    return new ProducerAccessor(
                        configuration!.Clusters
                            .SelectMany(x => x.Producers)
                            .Select(
                                producer => new MessageProducer(
                                    resolver,
                                    producer
                                )
                            )
                    );
                }
            );

            this.dependencyConfigurator
                .AddTransient(typeof(ILogHandler), this.logHandlerType)
                .AddSingleton<IDateTimeProvider, DateTimeProvider>()
                .AddSingleton<IConsumerAccessor>(new ConsumerAccessor())
                .AddSingleton<IConsumerManagerFactory>(new ConsumerManagerFactory());
        }

and also on bus creation:

 public IKafkaBus CreateBus(IDependencyResolver resolver)
        {
            var scope = resolver.CreateScope();
            var configuration = resolver.Resolve(typeof(KafkaConfiguration)) as KafkaConfiguration;
            return new KafkaBus(
                scope.Resolver,
                configuration,
                scope.Resolver.Resolve<IConsumerManagerFactory>(),
                scope.Resolver.Resolve<IConsumerAccessor>(),
                scope.Resolver.Resolve<IProducerAccessor>());
        }

I would like to hear your thoughts on this issue. Thanks.

SonicGD avatar Jun 03 '22 06:06 SonicGD

Hi @SonicGD ,

this option looks good to me. Can you open a pull request?

serviceCollection.AddKafka(builder => builder.AddCluster(cluster => cluster.WithBrokers(resolver => resolver.GetRequiredService<ISettingsService>().GetKafkaBrokers())));

filipeesch avatar Jul 21 '22 13:07 filipeesch

@SonicGD @filipeesch has there been any progress made on this? We too need the ability to fill in broker, consumer, producer information at a later time.

@SonicGD was there a workaround for this since there is not a pull request?

kjsulls avatar Feb 24 '23 14:02 kjsulls

@kjsulls Hello. I did started working on this, but then switched to other tasks. Current workaround we are using is to build small temporary DI-provider with configuration services and their dependencies, get all data from them, configure KafkaFlow on main Service Collecton and then dispose provider.

I still hope to work on proper solution when i have time.

SonicGD avatar Feb 24 '23 14:02 SonicGD

@SonicGD I completely understand. Your workaround in the meantime seems like a good solution. I'll play around with it. Thanks!

kjsulls avatar Feb 25 '23 18:02 kjsulls