kafka-go
kafka-go copied to clipboard
Reader does not consume all the message[data lost]
Describe the bug a reader, with group id specified, consumes from a topic with multiple partitions, did not get all the messages. Occasionally lost a few records.
Kafka Version 1.0.2
To Reproduce I'am not sure it's very easy to produce or not, however, in our environment, this can be consistently reproduced after it runs for a couple(e.g 10) of minutes. I usually ran another Kafka consumer (java), to consume from the same topic, and compare the output of the two consumers, to check if any difference.
Here is how I use reader.
conf := kafka.ReaderConfig{
Brokers: brokers,
GroupID: Groupid,
Topic: Topic,
StartOffset: startOffset,
MaxWait: time.Millisecond * 200,
RebalanceTimeout: time.Second * 5,
QueueCapacity: 3000,
}
reader := kafka.NewReader(conf)
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond * 1500)
defer cancel()
m, err := o.reader.FetchMessage(ctx)
Expected behavior should fetch all the record
Additional context the Kafka topic I used has 6 partitions, produce about 120 msg/s. In one of my test, we identified there was a data lost around time 1570603004139, and below are the logs we captured around that time:
1570603004010 initializing kafka reader for partition [2 %!d(string=connect2_topic_telemetry) 2995799371] of %!s(MISSING) starting at offset %!d(MISSING)
1570603004012 the kafka reader for partition [2 %!d(string=connect2_topic_telemetry) 2995799371] of %!s(MISSING) is seeking to offset %!d(MISSING)
1570603004055 no messages received from kafka within the allocated time for partition [5 %!d(string=connect2_topic_telemetry) 2863189] of %!s(MISSING) at offset %!d(MISSING)
1570603004075 no messages received from kafka within the allocated time for partition [0 %!d(string=connect2_topic_telemetry) 207261481] of %!s(MISSING) at offset %!d(MISSING)
1570603004122 no messages received from kafka within the allocated time for partition [1 %!d(string=connect2_topic_telemetry) 17511105] of %!s(MISSING) at offset %!d(MISSING)
1570603004146 no messages received from kafka within the allocated time for partition [3 %!d(string=connect2_topic_telemetry) 624874093] of %!s(MISSING) at offset %!d(MISSING)
1570603004204 no messages received from kafka within the allocated time for partition [5 %!d(string=connect2_topic_telemetry) 2863189] of %!s(MISSING) at offset %!d(MISSING)
1570603004248 initializing kafka reader for partition [4 %!d(string=connect2_topic_telemetry) 1311881242] of %!s(MISSING) starting at offset %!d(MISSING)
1570603004250 the kafka reader for partition [4 %!d(string=connect2_topic_telemetry) 1311881242] of %!s(MISSING) is seeking to offset %!d(MISSING)
1570603004262 initializing kafka reader for partition [2 %!d(string=connect2_topic_telemetry) 2995799399] of %!s(MISSING) starting at offset %!d(MISSING)
1570603004265 the kafka reader for partition [2 %!d(string=connect2_topic_telemetry) 2995799399] of %!s(MISSING) is seeking to offset %!d(MISSING)
1570603004270 no messages received from kafka within the allocated time for partition [1 %!d(string=connect2_topic_telemetry) 17511105] of %!s(MISSING) at offset %!d(MISSING)
1570603004296 no messages received from kafka within the allocated time for partition [3 %!d(string=connect2_topic_telemetry) 624874093] of %!s(MISSING) at offset %!d(MISSING)
1570603004323 initializing kafka reader for partition [0 %!d(string=connect2_topic_telemetry) 207261482] of %!s(MISSING) starting at offset %!d(MISSING)
1570603004325 the kafka reader for partition [0 %!d(string=connect2_topic_telemetry) 207261482] of %!s(MISSING) is seeking to offset %!d(MISSING)
1570603004354 no messages received from kafka within the allocated time for partition [5 %!d(string=connect2_topic_telemetry) 2863189] of %!s(MISSING) at offset %!d(MISSING)
1570603004399 KafkaSource try to peek a barrier
1570603004499 initializing kafka reader for partition [4 %!d(string=connect2_topic_telemetry) 1311881272] of %!s(MISSING) starting at offset %!d(MISSING)
1570603004501 the kafka reader for partition [4 %!d(string=connect2_topic_telemetry) 1311881272] of %!s(MISSING) is seeking to offset %!d(MISSING)
1570603004502 no messages received from kafka within the allocated time for partition [5 %!d(string=connect2_topic_telemetry) 2863189] of %!s(MISSING) at offset %!d(MISSING)
1570603004514 initializing kafka reader for partition [2 %!d(string=connect2_topic_telemetry) 2995799426] of %!s(MISSING) starting at offset %!d(MISSING)
1570603004516 the kafka reader for partition [2 %!d(string=connect2_topic_telemetry) 2995799426] of %!s(MISSING) is seeking to offset %!d(MISSING)
1570603004520 initializing kafka reader for partition [1 %!d(string=connect2_topic_telemetry) 17511106] of %!s(MISSING) starting at offset %!d(MISSING)
1570603004521 the kafka reader for partition [1 %!d(string=connect2_topic_telemetry) 17511106] of %!s(MISSING) is seeking to offset %!d(MISSING)
1570603004546 initializing kafka reader for partition [3 %!d(string=connect2_topic_telemetry) 624874101] of %!s(MISSING) starting at offset %!d(MISSING)
1570603004550 the kafka reader for partition [3 %!d(string=connect2_topic_telemetry) 624874101] of %!s(MISSING) is seeking to offset %!d(MISSING)
1570603004576 initializing kafka reader for partition [0 %!d(string=connect2_topic_telemetry) 207261483] of %!s(MISSING) starting at offset %!d(MISSING)
1570603004578 the kafka reader for partition [0 %!d(string=connect2_topic_telemetry) 207261483] of %!s(MISSING) is seeking to offset %!d(MISSING)
1570603004652 no messages received from kafka within the allocated time for partition [5 %!d(string=connect2_topic_telemetry) 2863189] of %!s(MISSING) at offset %!d(MISSING)
1570603004671 no messages received from kafka within the allocated time for partition [1 %!d(string=connect2_topic_telemetry) 17511106] of %!s(MISSING) at offset %!d(MISSING)
1570603004751 initializing kafka reader for partition [4 %!d(string=connect2_topic_telemetry) 1311881292] of %!s(MISSING) starting at offset %!d(MISSING)
1570603004753 the kafka reader for partition [4 %!d(string=connect2_topic_telemetry) 1311881292] of %!s(MISSING) is seeking to offset %!d(MISSING)
1570603004766 initializing kafka reader for partition [2 %!d(string=connect2_topic_telemetry) 2995799455] of %!s(MISSING) starting at offset %!d(MISSING)
1570603004768 the kafka reader for partition [2 %!d(string=connect2_topic_telemetry) 2995799455] of %!s(MISSING) is seeking to offset %!d(MISSING)
1570603004801 no messages received from kafka within the allocated time for partition [5 %!d(string=connect2_topic_telemetry) 2863189] of %!s(MISSING) at offset %!d(MISSING)
1570603004802 initializing kafka reader for partition [3 %!d(string=connect2_topic_telemetry) 624874108] of %!s(MISSING) starting at offset %!d(MISSING)
1570603004806 the kafka reader for partition [3 %!d(string=connect2_topic_telemetry) 624874108] of %!s(MISSING) is seeking to offset %!d(MISSING)
1570603004827 initializing kafka reader for partition [0 %!d(string=connect2_topic_telemetry) 207261486] of %!s(MISSING) starting at offset %!d(MISSING)
1570603004829 the kafka reader for partition [0 %!d(string=connect2_topic_telemetry) 207261486] of %!s(MISSING) is seeking to offset %!d(MISSING)
1570603004920 initializing kafka reader for partition [1 %!d(string=connect2_topic_telemetry) 17511107] of %!s(MISSING) starting at offset %!d(MISSING)
1570603004922 the kafka reader for partition [1 %!d(string=connect2_topic_telemetry) 17511107] of %!s(MISSING) is seeking to offset %!d(MISSING)
1570603004950 no messages received from kafka within the allocated time for partition [5 %!d(string=connect2_topic_telemetry) 2863189] of %!s(MISSING) at offset %!d(MISSING)
1570603005002 initializing kafka reader for partition [4 %!d(string=connect2_topic_telemetry) 1311881310] of %!s(MISSING) starting at offset %!d(MISSING)
1570603005005 the kafka reader for partition [4 %!d(string=connect2_topic_telemetry) 1311881310] of %!s(MISSING) is seeking to offset %!d(MISSING)
1570603005018 initializing kafka reader for partition [2 %!d(string=connect2_topic_telemetry) 2995799474] of %!s(MISSING) starting at offset %!d(MISSING)
1570603005020 the kafka reader for partition [2 %!d(string=connect2_topic_telemetry) 2995799474] of %!s(MISSING) is seeking to offset %!d(MISSING)
1570603005057 initializing kafka reader for partition [3 %!d(string=connect2_topic_telemetry) 624874121] of %!s(MISSING) starting at offset %!d(MISSING)
1570603005061 the kafka reader for partition [3 %!d(string=connect2_topic_telemetry) 624874121] of %!s(MISSING) is seeking to offset %!d(MISSING)
1570603005078 initializing kafka reader for partition [0 %!d(string=connect2_topic_telemetry) 207261487] of %!s(MISSING) starting at offset %!d(MISSING)
1570603005080 the kafka reader for partition [0 %!d(string=connect2_topic_telemetry) 207261487] of %!s(MISSING) is seeking to offset %!d(MISSING)
1570603005100 no messages received from kafka within the allocated time for partition [5 %!d(string=connect2_topic_telemetry) 2863189] of %!s(MISSING) at offset %!d(MISSING)
1570603005172 initializing kafka reader for partition [1 %!d(string=connect2_topic_telemetry) 17511108] of %!s(MISSING) starting at offset %!d(MISSING)
1570603005174 the kafka reader for partition [1 %!d(string=connect2_topic_telemetry) 17511108] of %!s(MISSING) is seeking to offset %!d(MISSING)
1570603005212 no messages received from kafka within the allocated time for partition [3 %!d(string=connect2_topic_telemetry) 624874121] of %!s(MISSING) at offset %!d(MISSING)
1570603005230 no messages received from kafka within the allocated time for partition [0 %!d(string=connect2_topic_telemetry) 207261487] of %!s(MISSING) at offset %!d(MISSING)
1570603005249 no messages received from kafka within the allocated time for partition [5 %!d(string=connect2_topic_telemetry) 2863189] of %!s(MISSING) at offset %!d(MISSING)
1570603005255 initializing kafka reader for partition [4 %!d(string=connect2_topic_telemetry) 1311881336] of %!s(MISSING) starting at offset %!d(MISSING)
1570603005257 the kafka reader for partition [4 %!d(string=connect2_topic_telemetry) 1311881336] of %!s(MISSING) is seeking to offset %!d(MISSING)
1570603005270 initializing kafka reader for partition [2 %!d(string=connect2_topic_telemetry) 2995799513] of %!s(MISSING) starting at offset %!d(MISSING)
1570603005272 the kafka reader for partition [2 %!d(string=connect2_topic_telemetry) 2995799513] of %!s(MISSING) is seeking to offset %!d(MISSING)
1570603005324 no messages received from kafka within the allocated time for partition [1 %!d(string=connect2_topic_telemetry) 17511108] of %!s(MISSING) at offset %!d(MISSING)
1570603005362 no messages received from kafka within the allocated time for partition [3 %!d(string=connect2_topic_telemetry) 624874121] of %!s(MISSING) at offset %!d(MISSING)
1570603005379 no messages received from kafka within the allocated time for partition [0 %!d(string=connect2_topic_telemetry) 207261487] of %!s(MISSING) at offset %!d(MISSING)
1570603005399 no messages received from kafka within the allocated time for partition [5 %!d(string=connect2_topic_telemetry) 2863189] of %!s(MISSING) at offset %!d(MISSING)
1570603005507 initializing kafka reader for partition [4 %!d(string=connect2_topic_telemetry) 1311881351] of %!s(MISSING) starting at offset %!d(MISSING)
1570603005509 the kafka reader for partition [4 %!d(string=connect2_topic_telemetry) 1311881351] of %!s(MISSING) is seeking to offset %!d(MISSING)
1570603005512 no messages received from kafka within the allocated time for partition [3 %!d(string=connect2_topic_telemetry) 624874121] of %!s(MISSING) at offset %!d(MISSING)
1570603005522 initializing kafka reader for partition [2 %!d(string=connect2_topic_telemetry) 2995799546] of %!s(MISSING) starting at offset %!d(MISSING)
1570603005524 the kafka reader for partition [2 %!d(string=connect2_topic_telemetry) 2995799546] of %!s(MISSING) is seeking to offset %!d(MISSING)
1570603005529 no messages received from kafka within the allocated time for partition [0 %!d(string=connect2_topic_telemetry) 207261487] of %!s(MISSING) at offset %!d(MISSING)
1570603005547 no messages received from kafka within the allocated time for partition [5 %!d(string=connect2_topic_telemetry) 2863189] of %!s(MISSING) at offset %!d(MISSING)
1570603005574 initializing kafka reader for partition [1 %!d(string=connect2_topic_telemetry) 17511109] of %!s(MISSING) starting at offset %!d(MISSING)
1570603005576 the kafka reader for partition [1 %!d(string=connect2_topic_telemetry) 17511109] of %!s(MISSING) is seeking to offset %!d(MISSING)
1570603005661 no messages received from kafka within the allocated time for partition [3 %!d(string=connect2_topic_telemetry) 624874121] of %!s(MISSING) at offset %!d(MISSING)
1570603005679 no messages received from kafka within the allocated time for partition [0 %!d(string=connect2_topic_telemetry) 207261487] of %!s(MISSING) at offset %!d(MISSING)
1570603005697 no messages received from kafka within the allocated time for partition [5 %!d(string=connect2_topic_telemetry) 2863189] of %!s(MISSING) at offset %!d(MISSING)
1570603005726 no messages received from kafka within the allocated time for partition [1 %!d(string=connect2_topic_telemetry) 17511109] of %!s(MISSING) at offset %!d(MISSING)
1570603005758 initializing kafka reader for partition [4 %!d(string=connect2_topic_telemetry) 1311881373] of %!s(MISSING) starting at offset %!d(MISSING)
1570603005760 the kafka reader for partition [4 %!d(string=connect2_topic_telemetry) 1311881373] of %!s(MISSING) is seeking to offset %!d(MISSING)
1570603005774 initializing kafka reader for partition [2 %!d(string=connect2_topic_telemetry) 2995799581] of %!s(MISSING) starting at offset %!d(MISSING)
1570603005776 the kafka reader for partition [2 %!d(string=connect2_topic_telemetry) 2995799581] of %!s(MISSING) is seeking to offset %!d(MISSING)
1570603005811 no messages received from kafka within the allocated time for partition [3 %!d(string=connect2_topic_telemetry) 624874121] of %!s(MISSING) at offset %!d(MISSING)
1570603005828 no messages received from kafka within the allocated time for partition [0 %!d(string=connect2_topic_telemetry) 207261487] of %!s(MISSING) at offset %!d(MISSING)
1570603005845 no messages received from kafka within the allocated time for partition [5 %!d(string=connect2_topic_telemetry) 2863189] of %!s(MISSING) at offset %!d(MISSING)
1570603005876 no messages received from kafka within the allocated time for partition [1 %!d(string=connect2_topic_telemetry) 17511109] of %!s(MISSING) at offset %!d(MISSING)
1570603005977 no messages received from kafka within the allocated time for partition [0 %!d(string=connect2_topic_telemetry) 207261487] of %!s(MISSING) at offset %!d(MISSING)
1570603005994 no messages received from kafka within the allocated time for partition [5 %!d(string=connect2_topic_telemetry) 2863189] of %!s(MISSING) at offset %!d(MISSING)
1570603006010 initializing kafka reader for partition [4 %!d(string=connect2_topic_telemetry) 1311881384] of %!s(MISSING) starting at offset %!d(MISSING)
1570603006012 the kafka reader for partition [4 %!d(string=connect2_topic_telemetry) 1311881384] of %!s(MISSING) is seeking to offset %!d(MISSING)
1570603006025 no messages received from kafka within the allocated time for partition [1 %!d(string=connect2_topic_telemetry) 17511109] of %!s(MISSING) at offset %!d(MISSING)
1570603006026 initializing kafka reader for partition [2 %!d(string=connect2_topic_telemetry) 2995799604] of %!s(MISSING) starting at offset %!d(MISSING)
1570603006028 the kafka reader for partition [2 %!d(string=connect2_topic_telemetry) 2995799604] of %!s(MISSING) is seeking to offset %!d(MISSING)
1570603006061 initializing kafka reader for partition [3 %!d(string=connect2_topic_telemetry) 624874127] of %!s(MISSING) starting at offset %!d(MISSING)
1570603006065 the kafka reader for partition [3 %!d(string=connect2_topic_telemetry) 624874127] of %!s(MISSING) is seeking to offset %!d(MISSING)
1570603006126 no messages received from kafka within the allocated time for partition [0 %!d(string=connect2_topic_telemetry) 207261487] of %!s(MISSING) at offset %!d(MISSING)
1570603006144 no messages received from kafka within the allocated time for partition [5 %!d(string=connect2_topic_telemetry) 2863189] of %!s(MISSING) at offset %!d(MISSING)
1570603006174 no messages received from kafka within the allocated time for partition [1 %!d(string=connect2_topic_telemetry) 17511109] of %!s(MISSING) at offset %!d(MISSING)
1570603006216 no messages received from kafka within the allocated time for partition [3 %!d(string=connect2_topic_telemetry) 624874127] of %!s(MISSING) at offset %!d(MISSING)
1570603006262 initializing kafka reader for partition [4 %!d(string=connect2_topic_telemetry) 1311881412] of %!s(MISSING) starting at offset %!d(MISSING)
1570603006264 the kafka reader for partition [4 %!d(string=connect2_topic_telemetry) 1311881412] of %!s(MISSING) is seeking to offset %!d(MISSING)
1570603006276 no messages received from kafka within the allocated time for partition [0 %!d(string=connect2_topic_telemetry) 207261487] of %!s(MISSING) at offset %!d(MISSING)
1570603006277 initializing kafka reader for partition [2 %!d(string=connect2_topic_telemetry) 2995799637] of %!s(MISSING) starting at offset %!d(MISSING)
1570603006289 the kafka reader for partition [2 %!d(string=connect2_topic_telemetry) 2995799637] of %!s(MISSING) is seeking to offset %!d(MISSING)
1570603006293 no messages received from kafka within the allocated time for partition [5 %!d(string=connect2_topic_telemetry) 2863189] of %!s(MISSING) at offset %!d(MISSING)
1570603006323 no messages received from kafka within the allocated time for partition [1 %!d(string=connect2_topic_telemetry) 17511109] of %!s(MISSING) at offset %!d(MISSING)
Thanks for the report.
Would you have more information on the issue, for example:
Which version of the package is your program depending on?
Is the producer configured to compress messages? If so which compression algorithm is employed?
How many partitions exist in the topic? And how many consumers?
I’d love to try to reproduce it so I want to get a setup that is as close to yours as possible.
version: v0.3.3 and latest master no compression partitions: 6 (each with 2 replicas) there were quit a few consumers(around 7) on that topic, however under different group ids. The kafka-go client was using consumer group, though only one consumer instance.
Any news on that? Interested to know if that's safe to use this lib :) thanks
I still haven't been able to reproduce this, but it could be related to https://twitter.com/abdullin/status/1224618978779119618?s=21
any updates on this issue? Does the latest release resolve this?
I am also interested in more information. I am seeing "EOF" in my logs and no messages for the latest kafka-go. This is occurring in ReadPartition and ReadMessage and I am left wondering what the hangup is.
I also had a similar problem with lost messages. I assumed we might have run out of context when we committed messages. As a result, we committed the message and received an error with an expired context. Problem line https://github.com/segmentio/kafka-go/blob/f0d3749a707d1a63b089082f8a1aa9980566427d/reader.go#L857-L876
@achille-roussel any updates on this? I have same problem
@ubyyj any update? Same problem. Can you solve it?
Hi @jcr1984,
Are you experiencing this issue with the latest version of kafka-go?
If so are you able to provide a runnable reproduction?
Thanks!
Hi, After debuging, I found and fixed problem by removing timeout context in my code. The essence of the problem is that we have timed out and shifted the offset, and then we go to the next loop, where we read the next message, shift the offset again and commit it.
Old bad implementation, with lost
func (c *worker) listen(ctx context.Context) {
for {
kctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
m, err := c.fromKafka(kctx, topic)
if err == ErrMsgNotFound {
continue
}
if err != nil {
return err
}
c.proccess(m)
}
}
func (c *worker) fromKafka(ctx context.Context, topic string) ([]byte, error) {
b, err := c.kafkaReader.TopicRead(ctx, topic)
if err == io.EOF || err == context.DeadlineExceeded {
return b, ErrMsgNotFound
}
if err != nil {
return b, err
}
return b, nil
}
New implementation, lost fixed
func (c *worker) listen(ctx context.Context) {
for {
m, err := c.fromKafka(ctx, topic)
if err == ErrMsgNotFound {
continue
}
if err == context.DeadlineExceeded {
return
}
if err != nil {
continue
}
c.proccess(m)
}
}
func (c *worker) fromKafka(ctx context.Context, topic string) ([]byte, error) {
b, err := c.kafkaReader.TopicRead(ctx, topic)
if err == io.EOF {
return nil, ErrMsgNotFound
}
if err != nil {
return nil, err
}
return b, nil
}
Any update on this issue?