librdkafka
librdkafka copied to clipboard
Calling DeleteConsumerGroupOffsets causes assertion error on Kafka 2.3
Description
I'm updating our Ruby bindings to librdkafka and was testing Kafka 2.3. It seems like librdkafka isn't handling the response from Kafka that the API isn't supported but I don't know enough about the internals to know for sure.
Client logs
%7|1641073760.952|DELETECONSUMERGROUPOFFSETS|rdkafka#producer-1| [thrd:main]: DELETECONSUMERGROUPOFFSETS worker called in state initializing: Success
%7|1641073760.952|SEND|rdkafka#producer-1| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/1001: Sent FindCoordinatorRequest (v2, 71 bytes @ 0, CorrId 5)
%7|1641073760.953|RECV|rdkafka#producer-1| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/1001: Received FindCoordinatorResponse (v2, 31 bytes, CorrId 5, rtt 0.54ms)
%7|1641073760.953|ADMINFAIL|rdkafka#producer-1| [thrd:main]: Admin DELETECONSUMERGROUPOFFSETS result error: DELETECONSUMERGROUPOFFSETS worker failed to send request: OffsetDelete API (KIP-496) not supported by broker, requires broker version >= 2.4.0
ruby: rdkafka_queue.h:1027: rd_kafka_enq_once_del_source_return: Assertion `eonce->refcnt > 0' failed.
Broker logs
[2022-01-01 21:55:21,110] INFO Creating topic ed27278e-62e5-444c-9169-ca6b7bb9592e with configuration {} and initial partition assignment Map(0 -> ArrayBuffer(1001)) (kafka.zk.AdminZkClient)
[2022-01-01 21:55:21,133] INFO [ReplicaFetcherManager on broker 1001] Removed fetcher for partitions Set(ed27278e-62e5-444c-9169-ca6b7bb9592e-0) (kafka.server.ReplicaFetcherManager)
[2022-01-01 21:55:21,134] INFO [Log partition=ed27278e-62e5-444c-9169-ca6b7bb9592e-0, dir=/kafka/kafka-logs-ed7676d639d0] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2022-01-01 21:55:21,134] INFO [Log partition=ed27278e-62e5-444c-9169-ca6b7bb9592e-0, dir=/kafka/kafka-logs-ed7676d639d0] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 0 ms (kafka.log.Log)
[2022-01-01 21:55:21,134] INFO Created log for partition ed27278e-62e5-444c-9169-ca6b7bb9592e-0 in /kafka/kafka-logs-ed7676d639d0 with properties {compression.type -> producer, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.bytes -> 1073741824, retention.ms -> 604800000, flush.messages -> 9223372036854775807, message.format.version -> 2.3-IV1, file.delete.delay.ms -> 60000, max.compaction.lag.ms -> 9223372036854775807, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
[2022-01-01 21:55:21,135] INFO [Partition ed27278e-62e5-444c-9169-ca6b7bb9592e-0 broker=1001] No checkpointed highwatermark is found for partition ed27278e-62e5-444c-9169-ca6b7bb9592e-0 (kafka.cluster.Partition)
[2022-01-01 21:55:21,135] INFO Replica loaded for partition ed27278e-62e5-444c-9169-ca6b7bb9592e-0 with initial high watermark 0 (kafka.cluster.Replica)
[2022-01-01 21:55:21,135] INFO [Partition ed27278e-62e5-444c-9169-ca6b7bb9592e-0 broker=1001] ed27278e-62e5-444c-9169-ca6b7bb9592e-0 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
[2022-01-01 21:55:21,404] INFO [GroupCoordinator 1001]: Preparing to rebalance group kafka-ruby-77894d5c-1842-4938-a3d0-062258f94ceb in state PreparingRebalance with old generation 0 (__consumer_offsets-6) (reason: Adding new member rdkafka-beccc0c0-42a0-47bb-8569-0ce8aa9be8d5 with group instanceid None) (kafka.coordinator.group.GroupCoordinator)
[2022-01-01 21:55:21,404] INFO [GroupCoordinator 1001]: Stabilized group kafka-ruby-77894d5c-1842-4938-a3d0-062258f94ceb generation 1 (__consumer_offsets-6) (kafka.coordinator.group.GroupCoordinator)
[2022-01-01 21:55:21,405] INFO [GroupCoordinator 1001]: Assignment received from leader for group kafka-ruby-77894d5c-1842-4938-a3d0-062258f94ceb for generation 1 (kafka.coordinator.group.GroupCoordinator)
How to reproduce
I've created test.rb below which (mostly) uses direct calls to librdkafka and should be generally translatable to C. At a high level it creates a single partition topic, produces some messages, creates a consumer group to read them, then goes to delete the offsets for that consumer group.
The assertion errors comes from the call to rd_kafka_DeleteConsumerGroupOffsets. Our test suite does read from the queue that is returned and still exhibits the issue.
git checkout https://github.com/deadmanssnitch/kafka.git kafka-ruby
cd kafka-ruby
bundle
ruby test.rb
test.rb
$LOAD_PATH.unshift File.join(File.expand_path(__dir__), "lib")
require "kafka"
require "securerandom"
topic = SecureRandom.uuid
group = "kafka-ruby-#{SecureRandom.uuid}"
config = {
"bootstrap.servers" => "127.0.0.1:9092",
"api.version.request" => true,
}
# Setup
admin = Kafka::Admin.new(Kafka::Config.new(config.merge("debug" => "all")))
admin.create_topic(topic, 1, 1)
# Producer a couple messages so we have offsets defined.
begin
producer = Kafka::Producer.new(Kafka::Config.new(config))
5.times { |i| producer.produce(topic, i.to_s) }
producer.close
end
# Consume messages on the topic so the consumer group state exists.
begin
consumer = Kafka::Consumer.new(Kafka::Config.new(config.merge({
"group.id" => group,
"enable.partition.eof" => true,
})))
consumer.subscribe(topic)
consumer.poll(timeout: 1000) { |msg| puts msg; consumer.commit(msg, async: true) }
consumer.close
rescue
# ignore
end
# Reuse the Kafka::FFI::Producer that is initialized inside Kafka::Admin
client = admin.instance_variable_get(:@client)
tpl = Kafka::FFI::TopicPartitionList.new
tpl.add(topic, 0)
delete = Kafka::FFI::Admin::DeleteConsumerGroupOffsets.new(group, tpl)
# Trickery to get an array pointer for passing to DeleteConsumerGroupOffsets
list = ::FFI::MemoryPointer.new(:pointer, 1)
list.write_array_of_pointer([delete.pointer])
queue = ::Kafka::FFI::Queue.new(client)
# client, request, count_requests, options, queue
::Kafka::FFI.rd_kafka_DeleteConsumerGroupOffsets(client, list, 1, nil, queue)
sleep 5
Checklist
- [x] librdkafka version (release number or git tag):
1.8.2 - [X] Apache Kafka version:
2.3.1 - [X] librdkafka client configuration:
<REPLACE with e.g., message.timeout.ms=123, auto.reset.offset=earliest, ..> - [X] Operating system:
Gentoo Linux - [X] Provide logs (with
debug=..as necessary) from librdkafka - [X] Provide broker log excerpts
- [ ] Critical issue
I assume you already figured that out, but that happened because the broker did not implement the required APIs. Your log already says: requires broker version >= 2.4.0
It's been a while but I believe the issue is that the assertion caused a crash making it seem like librdkafka wasn't handling the response correctly. I didn't expect DeleteConsumerGroupOffsets to work against Kafka 2.3 but I did expect calling it not to cause a crash.
Same happens in confluent-kafka-go when using AdminClient's ListConsumerGroupOffsets method.
If cluster configuration changes (e.g. node restart) app crashes with:
Assertion failed: (eonce->refcnt > 0), function rd_kafka_enq_once_del_source_return, file rdkafka_queue.h, line 1052.
bad thing about go that CGO errors cannot be handled and whole app just crashes completely