kafka-go
kafka-go copied to clipboard
Reading from a compacted topic is blocking seems not working
Hi Guys,
Describe the bug I have a secure cluster with mTLS, with a compacted topic, actually it is a Debesium CDC topic. with a config like this:
cleanup.policy=compact,
min.compaction.lag.ms=900000,
max.compaction.lag.ms=3600000,
min.cleanable.dirty.ratio=0.25,
delete.retention.ms=90000,
segment.bytes=52428800,
segment.ms=3600000
After some time, consuming is stopping, it always reaches the ctx.Deadline. I have run some new tests, and before I thought it stops at the same offset, but it is non-deterministic unfortunately. Maybe where Kafka compacted the log, that offset confuse something in my client. I do not know :( - I hope you guys can help me to solve this.
offset:139381, key: {"autoid":150137}
offset:139382, key: {"autoid":150138}
offset:139385, key: {"autoid":150141}
context deadline exceeded
duration: 30.6454485s
context deadline exceeded
duration: 30.0008544s
context deadline exceeded
duration: 30.0001268s
context deadline exceeded
duration: 30.0005493s
like I said KafkaCat during this time can read and continuously consume from the same topic.
108559
216610
191967
% Reached end of topic live.scgps.dbapp.basedata.autok [0] at offset 2626860
174974
129638
129074
105552
% Reached end of topic live.scgps.dbapp.basedata.autok [0] at offset 2626864
with this command:
kafkacat -b broker:31090 -X security.protocol=ssl -X ssl.key.location=./certs/broker-key.pem -X ssl.certificate.location=./certs/broker-crt.pem -X ssl.ca.location=./certs/kafka-ca-crt.pem
-G "test-$(date +%s)" -t "live.scgps.dbapp.basedata.autok" -C -v -v -v
Kafka Version What version(s) of Kafka are you testing against? Server: Confluent Kafka 5.4.1 Client: github.com/segmentio/kafka-go v0.3.5
To Reproduce Steps to reproduce the behavior. Bonus points for a code sample.
It's very easy to reporoduce on my side. Just start consiming with a new group id (because the last one is always stuck at the given offset) with this code:
reader := kafka.NewReader(
kafka.ReaderConfig{
Brokers: []string{"broker:31090", "broker:31091", "broker:31092"},
GroupID: "test-consumer-group-02",
Topic: "live.scgps.dbapp.basedata.autok",
Dialer: &kafka.Dialer{
ClientID: "test-client",
Timeout: 3 * time.Second,
KeepAlive: 10 * time.Second,
TLS: tlsConfig,
TransactionalID: "TransactionalID",
},
QueueCapacity: 100,
MinBytes: 1,
MaxBytes: 1024 * 1024,
MaxWait: 15 * time.Second,
ReadLagInterval: 0,
HeartbeatInterval: 3 * time.Second,
CommitInterval: 5 * time.Second,
PartitionWatchInterval: 0,
WatchPartitionChanges: false,
SessionTimeout: 60 * time.Second,
RebalanceTimeout: 30 * time.Second,
JoinGroupBackoff: 3 * time.Second,
StartOffset: kafka.FirstOffset,
ReadBackoffMin: 250 * time.Millisecond,
ReadBackoffMax: 1000 * time.Millisecond,
IsolationLevel: kafka.ReadCommitted,
MaxAttempts: 5,
})
for {
startTime := time.Now()
for i := 0; i < 100; i++ {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
m, err := reader.ReadMessage(ctx) // Same behavior happens with Fetch too
cancel()
if err != nil {
fmt.Printf("%v\n", err)
break
}
fmt.Printf("offset:%v, key: %s\n", m.Offset, m.Key)
}
fmt.Printf("duration: %v\n", time.Since(startTime))
}
Expected behavior A clear and concise description of what you expected to happen.
So i would like to continuously consume this topic and process the CDC events and if I reach the end of the topic, I would like to know that like with kafkacat...io.EOF...but this should be the documented behavior, but it is not happening in my case.
Additional context Guys, can you please help me to solve this? Thanks, Darvi
Hi,
One more thing. With this code, where I skip the strange offsets, can work, but for me this seems not a good solution.
var offset = int64(139380)
err = reader.SetOffset(offset)
if err != nil {
panic(err)
}
for {
startTime := time.Now()
for i := 0; i < 100; i++ {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
m, err := reader.ReadMessage(ctx)
cancel()
if err != nil {
fmt.Printf("offset: err: %v\n", err)
offset ++ // skipping the "bad / strange offsets"
err = reader.SetOffset(offset)
if err != nil {
panic(err)
}
break
}
fmt.Printf("offset:%v, key: %s\n", m.Offset, m.Key)
offset = m.Offset
}
fmt.Printf("duration: %v\n", time.Since(startTime))
}
offset:2786543, key: {"autoid":183521}
offset:2786547, key: {"autoid":118212}
offset:2786556, key: {"autoid":204573}
context deadline exceeded
duration: 20.0761017s
context deadline exceeded
duration: 20.0001072s
context deadline exceeded
duration: 20.0010273s
offset:2786566, key: {"autoid":57161}
offset:2786567, key: {"autoid":160263}
So what do you think? What is the problem? Topic is corrupted or client is confused? ;) Maybe I miss configured something (I think this has the highest probability :D but I do not know which parameter is that :))
Any help, suggestion is appreciated!
Thanks, Darvi
Hello @fireking77!
It does look like the client is having trouble dealing with the missing offsets of the compacted topic. We're probably missing a test to validate the behavior when compaction occurs (kafka doesn't make it simple to construct this sort of tests unfortunately).
Out of curiosity, how have you handled the problem?
I encountered this issue in my project and did a little digging around to find out what was happening,
Say a topic-partition has the following offsets:
101 102 105 109 151 153 160 162
The batch connects to Kafka and requests the first batch and Kafka returns [ 101, 102, 105, 109 ]
We process these messages and the offset is now 110, but it's not there so we reconnect and send a batch request for 110 and Kafka sends us [ 101, 102, 105, 109 ], we iterate through them and 110 is not there, so... we reconnect and send a batch request for 110...
Basically I think that if the end offset id for a log segment isn't actually the last offset (because it's been compacted) the library gets stuck. In my code I have a very naive solution of just advancing by 1 each loop until we get to an offset from the next log segment and then it continues on it's merry way.
@achille-roussel do you require further details?
Thanks for all the details you've provided, I just wanted to follow up to say this is still being tracked, we will provide updates as early as we can.
Hello @norganna, after a long investigation by @nlsun we have landed a fix in the latest kafka-go version (v0.4.28) that appears to be related to the issue you were facing https://github.com/segmentio/kafka-go/pull/813
Would you be able to let us know whether it has addressed you problem as well?
I had exactly the same issue running Debezium and topic compaction causing the consumer group to hang. Updating to v0.4.28 fixed it. Thanks!
Hi, I've also encountered a situation where the client stops reading messages from a compacted topic (v0.4.35). I'm not using a consumer group (as I have no need to - the service when restarted will want to read from the first message).
If I add a GroupID to the reader then it seems to fix things, but don't think this should be necessary - so this issue still exists. I've not done any particular investigation but happy to if pointed in the right direction! @nlsun
I could potentially work around it by creating consumer groups on the fly, but am then left with cleaning up old ones, which isn't ideal. Or perhaps the consumer group offset can be reset - I'm not a Kafka expert. (edit https://pkg.go.dev/github.com/segmentio/kafka-go#Client.OffsetDelete seems promising)
Even with a consumer group I'm still experiencing the client locking up (or timing out if I set a deadline). Here is the topic configs (non-default):
cleanup.policy = compact
max.compaction.lag.ms = 1000012
retention.ms = -1
segment.bytes = 1000012
segment.ms = 300000
There are 24 partitions. (Was the fix tested on multiple partitions?)
Happy to help diagnose the issue, but in the meantime I'm going to have to look at using a different library I think - I can't see a sensible work around or a way that is not a lot of work.