silverback icon indicating copy to clipboard operation
silverback copied to clipboard

Provide example of creating Kafka topics

Open CeeJayCee opened this issue 2 years ago • 6 comments

I have just started investigating Silverback to use with Kafka. Apologies for the simple question:

What is the recommended pattern for creating and configuring topics in Kafka when using Silverback?

I have a basic .NetCore project using a simple Controller. The Controller takes an IEventPublisher in the constructor and uses this to publish messages when an HTTP end point is called.

The topic is created automatically, but I would like control over the creation options.

I have been able to take an IConfluentAdminClientBuilder in the constructor of my controller, and use this to build an IAdminClient which can then call CreateTopicsAsync(). This doesn't feel right!

Is there a better recommended way?

Thanks :-)

CeeJayCee avatar Aug 19 '21 08:08 CeeJayCee

We use a BrokerCallbackHandler to do this.

The following two classes show how to get the configured topics from Silverback.

internal class ConsumedKafkaTopicsProvider : IKafkaTopicsProvider
{
    private readonly IBrokerCollection _brokers;

    public ConsumedKafkaTopicsProvider(IBrokerCollection brokers)
    {
        _brokers = brokers;
    }

    public IList<string> GetKafkaTopics()
    {
        var kafkaBroker = _brokers.OfType<KafkaBroker>().FirstOrDefault();

        if (kafkaBroker == null)
        {
            return Array.Empty<string>();
        }

        return kafkaBroker.Consumers.OfType<KafkaConsumer>().Select(c => c.Endpoint.Name).ToList();
    }
}
internal class ProducedKafkaTopicsProvider : IKafkaTopicsProvider
{
    private readonly IServiceProvider _serviceProvider;

    private readonly IOutboundRoutingConfiguration _routingConfiguration;

    public ProducedKafkaTopicsProvider(IServiceProvider serviceProvider, IOutboundRoutingConfiguration routingConfiguration)
    {
        _serviceProvider = serviceProvider;
        _routingConfiguration = routingConfiguration;
    }

    public IList<string> GetKafkaTopics()
    {
        return _routingConfiguration.Routes
            .SelectMany(r => r.GetOutboundRouter(_serviceProvider).Endpoints)
            .OfType<KafkaProducerEndpoint>()
            .Select(e => e.Name)
            .ToList();
    }
}

The class KafkaTopicsCreator gets the topics from all registered IKafkaTopicsProvider and uses the IAdminClient to create them.

internal class KafkaTopicsCreator : IKafkaTopicsCreator
{
    private readonly IEnumerable<IKafkaTopicsProvider> _topicsProviders;
    private readonly IAdminClient _adminClient;
    private readonly ILogger<KafkaTopicsCreator> _logger;

    public KafkaTopicsCreator(IEnumerable<IKafkaTopicsProvider> topicsProviders, IAdminClient adminClient, ILogger<KafkaTopicsCreator> logger)
    {
        _topicsProviders = topicsProviders;
        _adminClient = adminClient;
        _logger = logger;
    }

    public async Task CreateTopicsAsync(KafkaTopicCreationSettings settings)
    {
        IList<string> topics = _topicsProviders
            .SelectMany(p => p.GetKafkaTopics())
            .Distinct(StringComparer.InvariantCulture)
            .ToList();

        if (!topics.Any())
        {
            return;
        }

        _logger.ExecutingCreateTopics(topics);

        try
        {
            await _adminClient.CreateTopicsAsync(topics.Select(t => new TopicSpecification { Name = t, NumPartitions = settings.Partitions, ReplicationFactor = settings.ReplicationFactor }));
            _logger.ExecutedCreateTopics();
        }
        catch (CreateTopicsException ex)
        {
            if (ex.Results.All(r =>
                (r.Error.IsError && r.Error.Code == ErrorCode.TopicAlreadyExists) || (!r.Error.IsError)))
            {
                _logger.AllTopicsAlreadyExist();
                return;
            }

            throw;
        }
    }
}

We provide this functionality as a library and hence defined the necessary registrations in an extension method.

public static ISilverbackBuilder CreateConfiguredKafkaTopicsIfEnabled(this ISilverbackBuilder builder, IConfiguration configuration)
{
    var kafkaConnectionSettings = new KafkaConnectionSettings();
    configuration.Bind(KafkaConnectionSettings.Name, kafkaConnectionSettings);

    if (kafkaConnectionSettings.KafkaTopicCreation.CreateKafkaTopics)
    {
        builder.Services.AddSingleton(_ =>
        {
            var clientConfig = KafkaConfigFactory.CreateKafkaClientConfig(kafkaConnectionSettings);
            return new AdminClientBuilder(clientConfig).Build();
        });

        builder.Services.AddSingleton<IKafkaTopicsProvider, ConsumedKafkaTopicsProvider>();
        builder.Services.AddSingleton<IKafkaTopicsProvider, ProducedKafkaTopicsProvider>();
        builder.Services.AddSingleton<IKafkaTopicsCreator, KafkaTopicsCreator>();
        builder.AddSingletonBrokerCallbackHandler<TopicsCreationCallbackHandler>();
    }

    return builder;
}

You can use it like this

services.AddSilverback()
    .WithConnectionToMessageBroker(builder => builder.AddMockedKafka())
    .CreateConfiguredKafkaTopicsIfEnabled(Configuration)
    .AddEndpointsConfigurator<EndpointsConfigurator>();

@BEagle1984 @meggima, I see no reason to keep this into our own library. Move it to Silverback?

Disclaimer: This code is sponsored by Swiss Post :)

msallin avatar Aug 19 '21 09:08 msallin

The admin API is not covered (not abstracted, not proxied) by Silverback. Internally I use it to get the topic metadata and the IConfluentAdminClientBuilder was added explicitly for that and originally intended for internal use only. It's main purpose is allowing the tests to replace the actual AdminClient with a mock. I currently only mock the GetMetaData though. All other methods will thrown a NotSupportedException, which isn't probably ideal in your case.

That being said I see nothing wrong in using the AdminClient to create the topics and I honestly don't see the point in abstracting/wrapping this in Silverback, being it a just very Kafka specific. Plus I wouldn't add any value beside proxying the calls to the underlying AdminClient, just adding overhead to keep up with the changes that may occur in the Confluent library.

This kind of initialization should be done at startup though, not in the controller's constructor. You could either trigger it from the Startup.Configure method, from within an IHostedService or binding to a Silverback's callback as suggested by @msallin (even though it might not be relevant and not necessary in your case).

If I understood correctly, you would like to customize the topic parameters, so the approach proposed by @msallin, automatically looping into the configured topics and creating them all with the same/default settings wouldn't suit you well. Correct?

@msallin, @meggima: we surely could move that helper into Silverback (it should be slightly modified since it relies on the KafkaConnectionSettings, which are SwissPost specific...but as said I already deal with the creation of the AdminClient in Silverback, so it wouldn't be an issue at all). I wonder if we should offer some hooks to configure the topics (and how). Note that this approach implies some assumptions, like that the user you use to authenticate at runtime actually has the necessary permissions to create topics (which probably shouldn't and some poople would prefer to use a different user, much like many would do to run the database migrations).

BEagle1984 avatar Aug 19 '21 10:08 BEagle1984

@CeeJayCee, what would you expect from Silverback to help you with a cleaner implementation?

BEagle1984 avatar Aug 19 '21 10:08 BEagle1984

@msallin thank you, this works perfectly!

@BEagle1984 I fully understand not wanting to wrap IAdminClient, it could become a maintenance nightmare.

It would be nice if Silverback provided a configuration option to create the topic with options if it doesn't exist. Maybe in the IEndpointsConfigurator?

However, @msallin's example will work for my use-case.

Thanks to you both 👍

CeeJayCee avatar Aug 19 '21 12:08 CeeJayCee

It would be nice if Silverback provided a configuration option to create the topic with options if it doesn't exist. Maybe in the IEndpointsConfigurator?

I'm considering this for a future release. 👍

(BTW contributions are always welcome! 😉)

BEagle1984 avatar Aug 19 '21 12:08 BEagle1984

Just take into account our use case, which might be common. We need a setting to turn creation of topics off, because it's only used locally and in different kind of tests. Not on prod.

msallin avatar Aug 19 '21 13:08 msallin