librdkafka icon indicating copy to clipboard operation
librdkafka copied to clipboard

Calling DeleteConsumerGroupOffsets causes assertion error on Kafka 2.3

Open gaffneyc opened this issue 3 years ago • 3 comments

Description

I'm updating our Ruby bindings to librdkafka and was testing Kafka 2.3. It seems like librdkafka isn't handling the response from Kafka that the API isn't supported but I don't know enough about the internals to know for sure.

Client logs

%7|1641073760.952|DELETECONSUMERGROUPOFFSETS|rdkafka#producer-1| [thrd:main]: DELETECONSUMERGROUPOFFSETS worker called in state initializing: Success
%7|1641073760.952|SEND|rdkafka#producer-1| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/1001: Sent FindCoordinatorRequest (v2, 71 bytes @ 0, CorrId 5)
%7|1641073760.953|RECV|rdkafka#producer-1| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/1001: Received FindCoordinatorResponse (v2, 31 bytes, CorrId 5, rtt 0.54ms)
%7|1641073760.953|ADMINFAIL|rdkafka#producer-1| [thrd:main]: Admin DELETECONSUMERGROUPOFFSETS result error: DELETECONSUMERGROUPOFFSETS worker failed to send request: OffsetDelete API (KIP-496) not supported by broker, requires broker version >= 2.4.0
ruby: rdkafka_queue.h:1027: rd_kafka_enq_once_del_source_return: Assertion `eonce->refcnt > 0' failed.

Broker logs

[2022-01-01 21:55:21,110] INFO Creating topic ed27278e-62e5-444c-9169-ca6b7bb9592e with configuration {} and initial partition assignment Map(0 -> ArrayBuffer(1001)) (kafka.zk.AdminZkClient)
[2022-01-01 21:55:21,133] INFO [ReplicaFetcherManager on broker 1001] Removed fetcher for partitions Set(ed27278e-62e5-444c-9169-ca6b7bb9592e-0) (kafka.server.ReplicaFetcherManager)
[2022-01-01 21:55:21,134] INFO [Log partition=ed27278e-62e5-444c-9169-ca6b7bb9592e-0, dir=/kafka/kafka-logs-ed7676d639d0] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2022-01-01 21:55:21,134] INFO [Log partition=ed27278e-62e5-444c-9169-ca6b7bb9592e-0, dir=/kafka/kafka-logs-ed7676d639d0] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 0 ms (kafka.log.Log)
[2022-01-01 21:55:21,134] INFO Created log for partition ed27278e-62e5-444c-9169-ca6b7bb9592e-0 in /kafka/kafka-logs-ed7676d639d0 with properties {compression.type -> producer, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.bytes -> 1073741824, retention.ms -> 604800000, flush.messages -> 9223372036854775807, message.format.version -> 2.3-IV1, file.delete.delay.ms -> 60000, max.compaction.lag.ms -> 9223372036854775807, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
[2022-01-01 21:55:21,135] INFO [Partition ed27278e-62e5-444c-9169-ca6b7bb9592e-0 broker=1001] No checkpointed highwatermark is found for partition ed27278e-62e5-444c-9169-ca6b7bb9592e-0 (kafka.cluster.Partition)
[2022-01-01 21:55:21,135] INFO Replica loaded for partition ed27278e-62e5-444c-9169-ca6b7bb9592e-0 with initial high watermark 0 (kafka.cluster.Replica)
[2022-01-01 21:55:21,135] INFO [Partition ed27278e-62e5-444c-9169-ca6b7bb9592e-0 broker=1001] ed27278e-62e5-444c-9169-ca6b7bb9592e-0 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
[2022-01-01 21:55:21,404] INFO [GroupCoordinator 1001]: Preparing to rebalance group kafka-ruby-77894d5c-1842-4938-a3d0-062258f94ceb in state PreparingRebalance with old generation 0 (__consumer_offsets-6) (reason: Adding new member rdkafka-beccc0c0-42a0-47bb-8569-0ce8aa9be8d5 with group instanceid None) (kafka.coordinator.group.GroupCoordinator)
[2022-01-01 21:55:21,404] INFO [GroupCoordinator 1001]: Stabilized group kafka-ruby-77894d5c-1842-4938-a3d0-062258f94ceb generation 1 (__consumer_offsets-6) (kafka.coordinator.group.GroupCoordinator)
[2022-01-01 21:55:21,405] INFO [GroupCoordinator 1001]: Assignment received from leader for group kafka-ruby-77894d5c-1842-4938-a3d0-062258f94ceb for generation 1 (kafka.coordinator.group.GroupCoordinator)

How to reproduce

I've created test.rb below which (mostly) uses direct calls to librdkafka and should be generally translatable to C. At a high level it creates a single partition topic, produces some messages, creates a consumer group to read them, then goes to delete the offsets for that consumer group.

The assertion errors comes from the call to rd_kafka_DeleteConsumerGroupOffsets. Our test suite does read from the queue that is returned and still exhibits the issue.

git checkout https://github.com/deadmanssnitch/kafka.git kafka-ruby
cd kafka-ruby
bundle
ruby test.rb

test.rb

$LOAD_PATH.unshift File.join(File.expand_path(__dir__), "lib")

require "kafka"
require "securerandom"

topic = SecureRandom.uuid
group = "kafka-ruby-#{SecureRandom.uuid}"

config = {
  "bootstrap.servers" => "127.0.0.1:9092",
  "api.version.request" => true,
}

# Setup
admin = Kafka::Admin.new(Kafka::Config.new(config.merge("debug" => "all")))
admin.create_topic(topic, 1, 1)

# Producer a couple messages so we have offsets defined.
begin
  producer = Kafka::Producer.new(Kafka::Config.new(config))
  5.times { |i| producer.produce(topic, i.to_s) }
  producer.close
end

# Consume messages on the topic so the consumer group state exists.
begin
  consumer = Kafka::Consumer.new(Kafka::Config.new(config.merge({
    "group.id" => group,
    "enable.partition.eof" => true,
  })))

  consumer.subscribe(topic)
  consumer.poll(timeout: 1000) { |msg| puts msg; consumer.commit(msg, async: true) }
  consumer.close
rescue
  # ignore
end

# Reuse the Kafka::FFI::Producer that is initialized inside Kafka::Admin
client = admin.instance_variable_get(:@client)

tpl = Kafka::FFI::TopicPartitionList.new
tpl.add(topic, 0)

delete = Kafka::FFI::Admin::DeleteConsumerGroupOffsets.new(group, tpl)

# Trickery to get an array pointer for passing to DeleteConsumerGroupOffsets
list = ::FFI::MemoryPointer.new(:pointer, 1)
list.write_array_of_pointer([delete.pointer])

queue = ::Kafka::FFI::Queue.new(client)

# client, request, count_requests, options, queue
::Kafka::FFI.rd_kafka_DeleteConsumerGroupOffsets(client, list, 1, nil, queue)
sleep 5

Checklist

  • [x] librdkafka version (release number or git tag): 1.8.2
  • [X] Apache Kafka version: 2.3.1
  • [X] librdkafka client configuration: <REPLACE with e.g., message.timeout.ms=123, auto.reset.offset=earliest, ..>
  • [X] Operating system: Gentoo Linux
  • [X] Provide logs (with debug=.. as necessary) from librdkafka
  • [X] Provide broker log excerpts
  • [ ] Critical issue

gaffneyc avatar Jan 01 '22 22:01 gaffneyc

I assume you already figured that out, but that happened because the broker did not implement the required APIs. Your log already says: requires broker version >= 2.4.0

aratz-lasa avatar Aug 25 '23 11:08 aratz-lasa

It's been a while but I believe the issue is that the assertion caused a crash making it seem like librdkafka wasn't handling the response correctly. I didn't expect DeleteConsumerGroupOffsets to work against Kafka 2.3 but I did expect calling it not to cause a crash.

gaffneyc avatar Aug 25 '23 12:08 gaffneyc

Same happens in confluent-kafka-go when using AdminClient's ListConsumerGroupOffsets method. If cluster configuration changes (e.g. node restart) app crashes with:

Assertion failed: (eonce->refcnt > 0), function rd_kafka_enq_once_del_source_return, file rdkafka_queue.h, line 1052.

bad thing about go that CGO errors cannot be handled and whole app just crashes completely

absorbb avatar Mar 23 '24 08:03 absorbb