silverback icon indicating copy to clipboard operation
silverback copied to clipboard

Consumer is stuck

Open RomanOlegovich opened this issue 2 years ago • 9 comments

After some time idle, consumer stop to read messages from Kafka (without errors and logs). I need to re-run consumer to start reading again. Have you any ideas?

RomanOlegovich avatar Apr 06 '22 13:04 RomanOlegovich

It's strange that no log is written at all. Have you tried lowering the log level for the Silverback namespace?

The message broker also has a log and you should be able to see the reason why it got disconnected.

It's hard to help you further without a log or any other hint. What I can say is that it isn't a known issue and we would have noticed by now if this were a systematic bug. Can you maybe post your configuration code (endpoint configuration)?

BEagle1984 avatar Apr 06 '22 13:04 BEagle1984

Thank for quick answer.

  1. I did not change the log level. I'll try it.

  2. I not sure it got disconnected or not....

  3. I have app events, each event has own kafka topic and multiple consumers in different modules (cs projects).

Example from one module configuration.

            services.ConfigureSilverback()
                .AddEndpointsConfigurator(x =>
                {
                    return new NotificationsEndpointsConfigurator(moduleOptions.BridgeOptions);
                })
                // Add Subscribers
                .AddScopedSubscriber<DrivingAssignedEventQueueReader>(filter)
                .AddScopedSubscriber<DrivingRequestConfirmedQueueReader>(filter)
                .AddScopedSubscriber<DrivingCanceledEventQueueReader>(filter)
                .AddScopedSubscriber<DrivingRequestCanceledEventQueueReader>(filter)
                .AddScopedSubscriber<SurveyCreatedEventQueueReader>(filter)
                .AddScopedSubscriber<DrivingStartingQueueReader>(filter)
                .AddScopedSubscriber<OffsetFailedEventQueueReader>(filter)
                .AddScopedSubscriber<NotifyPaymentEventQueueReader>(filter)
                ;

Note: all consumers connect to a single broker. I use topic per event to integrate with system which are not using silverback Each module has own IEndpointsConfigurator with same BootstrapServers bot different ConsumerGroupId

    public class NotificationsEndpointsConfigurator : IEndpointsConfigurator
    {
        private readonly BridgeOptions bridgeOptions;

        public NotificationsEndpointsConfigurator(BridgeOptions bridgeOptions)
        {
            this.bridgeOptions = bridgeOptions;
        }

        public void Configure(IEndpointsConfigurationBuilder builder)
        {

            Action<KafkaConsumerConfig> consumersConfig = config =>
            {
                config.BootstrapServers =
                       bridgeOptions.BootstrapServers;
                config.GroupId = bridgeOptions.ConsumerGroupId;
                config.AutoOffsetReset = bridgeOptions.AutoOffsetReset;
            };

            Action<KafkaProducerConfig> producerConfig = config =>
            {
                config.BootstrapServers =
                    bridgeOptions.BootstrapServers;
            };

            builder
            .AddKafkaEndpoints(
                endpoints => endpoints
                        .AddOutbound<AddNotification>(endpoint => endpoint.ProduceTo(nameof(AddNotification)).Configure(producerConfig).SerializeAsJsonUsingNewtonsoft())

                        .AddInbound<DrivingRequestEvent>(endpoint => endpoint.ConsumeFrom(nameof(DrivingRequestEvent)).Configure(consumersConfig).DeserializeJsonUsingNewtonsoft())
                        .AddInbound<DrivingRequestConfirmedEvent>(endpoint => endpoint.ConsumeFrom(nameof(DrivingRequestConfirmedEvent)).Configure(consumersConfig).DeserializeJsonUsingNewtonsoft())
                        .AddInbound<DrivingAssignedEvent>(endpoint => endpoint.ConsumeFrom(nameof(DrivingAssignedEvent)).Configure(consumersConfig).DeserializeJsonUsingNewtonsoft())
                        .AddInbound<DrivingCanceledEvent>(endpoint => endpoint.ConsumeFrom(nameof(DrivingCanceledEvent)).Configure(consumersConfig).DeserializeJsonUsingNewtonsoft())
                        .AddInbound<DrivingRequestCanceledEvent>(endpoint => endpoint.ConsumeFrom(nameof(DrivingRequestCanceledEvent)).Configure(consumersConfig).DeserializeJsonUsingNewtonsoft())
                        .AddInbound<SurveyCreatedEvent>(endpoint => endpoint.ConsumeFrom(nameof(SurveyCreatedEvent)).Configure(consumersConfig).DeserializeJsonUsingNewtonsoft())
                        .AddInbound<DrivingStartingEvent>(endpoint => endpoint.ConsumeFrom(nameof(DrivingStartingEvent)).Configure(consumersConfig).DeserializeJsonUsingNewtonsoft())
                        .AddInbound<OffsetFailedEvent>(endpoint => endpoint.ConsumeFrom(nameof(OffsetFailedEvent)).Configure(consumersConfig).DeserializeJsonUsingNewtonsoft())
                        .AddInbound<NotifyPaymentEvent>(endpoint => endpoint.ConsumeFrom(nameof(NotifyPaymentEvent)).Configure(consumersConfig).DeserializeJsonUsingNewtonsoft())
                    );
        }
    }

RomanOlegovich avatar Apr 06 '22 14:04 RomanOlegovich

What logging levels do you recommend https://github.com/BEagle1984/silverback/blob/master/docs/concepts/logging.md ?

  • Core
  • Integration
  • Kafka

RomanOlegovich avatar Apr 06 '22 14:04 RomanOlegovich

Just add this to your app.settings to have all possible logs (it's gonna be verbose):

{
  "Logging": {
    "LogLevel": {
      "Silverback": "Trace"
    }
  }

The configuration is really basic and I don't see anything wrong. If the consumer disconnects you should see the related info entries in the log (consumer disconnected, partitions revoked, etc.). The underlying library notifies about protocol errors and also in that case you should have a warning or error in the log.

When you say "after some time idle" do you mean that you don't produce any message for a while and when you start producing again your consumers don't pull them? Did I get it correctly?

BEagle1984 avatar Apr 06 '22 14:04 BEagle1984

LogLevel changed, we will check for errors...

Yes you are right.

Also I have a problem. Maybe it's related. I don't have persistent storage for kafka and zookeper yet. So the topics don't exist before the consumers start. If you run the project on a local PC then: all consumers are trying to read from the topic in an infinite loop (I see a warning "unknown topic"). but after deploying to kubernetes, consumers make several attempts and stop.

Can mode of build (Debug|Release, ASPNETCORE_ENVIRONMENT etc) change behavior?

RomanOlegovich avatar Apr 06 '22 15:04 RomanOlegovich

I have event A and B, consumer A and B, topic A and B. On event A produced, consumer B stoped (and vice versa) with OperationCanceledException. What do you think about this?

RomanOlegovich avatar Apr 06 '22 16:04 RomanOlegovich

It should work exactly the same on the local machine and in kubernetes. Unless you implemented some Kafka-related health check that causes your pod to be stopped in that situation.

The consumers/producers in Silverback are completely unrelated and I never observed the behavior you describe.

It's very difficult to help you with the information I have. Are you able to provide a sample project that reproduces the issue on a local environment (Kafka started via docker compose)? Or at least the trace logs of your service where I can try to figure out what happened?

BEagle1984 avatar Apr 07 '22 06:04 BEagle1984

I was no longer able to reproduce the problem, if it appears again I will let you know. The only thing I did was to remove the call to ConfigureSilverback in the application modules. But I'm not sure if that had any effect.

RomanOlegovich avatar Apr 27 '22 04:04 RomanOlegovich

OK, keep me posted.

No, calling ConfigureSilverback is fine. At SwissPost we also use a vertical slice architecture in our bigger .net projects and they all use Silverback as well, without issues.

BEagle1984 avatar Apr 27 '22 15:04 BEagle1984