ruby-kafka icon indicating copy to clipboard operation
ruby-kafka copied to clipboard

Max_wait_time same as max.interval.poll.ms

Open Ravinderbaid opened this issue 4 years ago • 1 comments

Few Partitions of the topic are very slow intermittently. Lag for 2-3 partitions will keep increasing and not decreasing. Once we restart the service everything looks normal. All Lag is getting reduced fast.

If this is a bug report, please fill out the following:

Version of Ruby: ruby 2.3.6p384
Version of Kafka: 2.4.1.1
Version of ruby-kafka: 1.3.0 (Recently upgraded from 0.7 to 1.3.0), we are facing the below issues in both versions. Tested with the master as well.

Steps to reproduce

The below code is running as a rake task.

kafka_consumer = Kafka.new(...)
topics.split(',').collect(&:strip).each do |topic|
   kafka_consumer.subscribe(topic, start_from_beginning: start_from_beginning
end
stop_consumer_proc = proc do
      Rails.logger.info('Stopping kafka consumer...')
      kafka_consumer.stop
      Rails.logger.info('Stopped kafka consumer...')
      exit
end
trap('SIGINT', &stop_consumer_proc)
trap('SIGTERM', &stop_consumer_proc)
kafka_consumer.each_message(max_wait_time: 10) do |message|
  begin
      kafka_consumer.each_message(max_wait_time: 10) do |message|
       #raises exception or True
        consume_messages(message)
      end
    rescue Kafka::Error => e
      Rails.logger.error("Kafka processing error", error_message: e.message, backtrace: e.backtrace)
      raise
    rescue StandardError => e
      Rails.logger.error("Error processing messages", error_message: e.message, backtrace: e.backtrace)
      raise
    end
end

Expected outcome

When there are any issues in Kafka connection error like EOF error, Timeout error. Ruby-kafka should raise error. Once ruby-kafka raises an error, the Consumer should shut down. Actual outcome

We can see timeouts and EOF errors are happening while making fetch, kafka-offset api. We found this from ruby-kafka notification events.

ActiveSupport::Notifications.subscribe(/.*\.kafka$/) do |*args|
  event = ActiveSupport::Notifications::Event.new(*args)
  if ["heartbeat", "fetch", "offset_commit"].include?(event.payload[:api].to_s)
    Rails.logger.info('[rb-kafka] kafka notification', event.payload)
  end
end

But there is no exception raised by the ruby-kafka client. The impact is that whenever timeouts are happening, there are few partitions (2-3 partitions) that are getting stuck due to that combined lag is increasing. This is currently happening in the production environment.

We have other Kafka consumers written in python which is working expected at this time of incidents.

We still are figuring out if max_wait_time is same as max.interval.poll.ms configuration in Kafka? If not then what is the parameter used here to define the poll time? Actually, this will help us identify is it an issue in our configuration setting or on the Kafka instance itself

Ravinderbaid avatar Aug 09 '21 15:08 Ravinderbaid

@dasch Can you please help me with this it is urgent

Ravinderbaid avatar Aug 10 '21 17:08 Ravinderbaid

Issue has been marked as stale due to a lack of activity.

github-actions[bot] avatar Jun 17 '23 00:06 github-actions[bot]