ruby-kafka
ruby-kafka copied to clipboard
Max_wait_time same as max.interval.poll.ms
Few Partitions of the topic are very slow intermittently. Lag for 2-3 partitions will keep increasing and not decreasing. Once we restart the service everything looks normal. All Lag is getting reduced fast.
If this is a bug report, please fill out the following:
Version of Ruby: ruby 2.3.6p384
Version of Kafka: 2.4.1.1
Version of ruby-kafka: 1.3.0 (Recently upgraded from 0.7 to 1.3.0), we are facing the below issues in both versions. Tested with the master as well.
Steps to reproduce
The below code is running as a rake task.
kafka_consumer = Kafka.new(...)
topics.split(',').collect(&:strip).each do |topic|
kafka_consumer.subscribe(topic, start_from_beginning: start_from_beginning
end
stop_consumer_proc = proc do
Rails.logger.info('Stopping kafka consumer...')
kafka_consumer.stop
Rails.logger.info('Stopped kafka consumer...')
exit
end
trap('SIGINT', &stop_consumer_proc)
trap('SIGTERM', &stop_consumer_proc)
kafka_consumer.each_message(max_wait_time: 10) do |message|
begin
kafka_consumer.each_message(max_wait_time: 10) do |message|
#raises exception or True
consume_messages(message)
end
rescue Kafka::Error => e
Rails.logger.error("Kafka processing error", error_message: e.message, backtrace: e.backtrace)
raise
rescue StandardError => e
Rails.logger.error("Error processing messages", error_message: e.message, backtrace: e.backtrace)
raise
end
end
Expected outcome
When there are any issues in Kafka connection error like EOF error, Timeout error. Ruby-kafka should raise error. Once ruby-kafka raises an error, the Consumer should shut down. Actual outcome
We can see timeouts and EOF errors are happening while making fetch, kafka-offset api. We found this from ruby-kafka notification events.
ActiveSupport::Notifications.subscribe(/.*\.kafka$/) do |*args|
event = ActiveSupport::Notifications::Event.new(*args)
if ["heartbeat", "fetch", "offset_commit"].include?(event.payload[:api].to_s)
Rails.logger.info('[rb-kafka] kafka notification', event.payload)
end
end
But there is no exception raised by the ruby-kafka client. The impact is that whenever timeouts are happening, there are few partitions (2-3 partitions) that are getting stuck due to that combined lag is increasing. This is currently happening in the production environment.
We have other Kafka consumers written in python which is working expected at this time of incidents.
We still are figuring out if max_wait_time is same as max.interval.poll.ms configuration in Kafka? If not then what is the parameter used here to define the poll time? Actually, this will help us identify is it an issue in our configuration setting or on the Kafka instance itself
@dasch Can you please help me with this it is urgent
Issue has been marked as stale due to a lack of activity.