kafka-go icon indicating copy to clipboard operation
kafka-go copied to clipboard

Reader with ReaderGroup can lose first messages sent to the new topic

Open datacompboy opened this issue 1 year ago • 1 comments
trafficstars

Describe the bug

Consider the scenario ([P] = publisher, [R] = reader) [P] testtopic is created (empty) [R] new Reader is created with StartOffset: LastOffset => new ConsumerGroup initialized; with map[{topic:testtopic partition:0}:-1] => new reader starts, which seeks to offset 0 and start polling [P] publish message to testtopic [message with offset 0 became available] [R] tries to reader.read message, got connection error; it retries and fails N times in a row [R] reestablishes connection [R] get offset from ConsumerGroup: -1, which is "LastOffset" [R] starts reader which seeks to offset 1 [R] polls from offset 1

At this point message with offset [0] is lost. Basically, ANY amount of published messages until the first successful CommitMessages is lost.

Kafka Version

kafka-go v0.4.47

To Reproduce

Issue captured by running large system under deterministic distributed simulator. I am not sure how to construct hermetic test here as I am not familiar with your code.

Expected Behavior

Message with offset 0 is read and processed, at least once.

Observed Behavior

Few initial messages is lost:

...
00:00:53.23     info    [email protected]/consumergroup.go:952   joined group READERCONSUMERGROUP as member READER@READER (github.com/segmentio/kafka-go)-6dab7dfe-da33-4878-a70a-d1cf303436c9 in generation 3
00:00:53.23     info    [email protected]/consumergroup.go:1012  selected as leader for group, READERCONSUMERGROUP
00:00:53.27     info    [email protected]/consumergroup.go:1040  using 'range' balancer to assign group, READERCONSUMERGROUP
00:00:53.27     info    [email protected]/consumergroup.go:1042  found member: READER@READER (github.com/segmentio/kafka-go)-6dab7dfe-da33-4878-a70a-d1cf303436c9/[]byte(nil)
00:00:53.27     info    [email protected]/consumergroup.go:1045  found topic/partition: TOPICNAME/0
00:00:53.27     info    [email protected]/consumergroup.go:966   assigned member/topic/partitions READER@READER (github.com/segmentio/kafka-go)-6dab7dfe-da33-4878-a70a-d1cf303436c9/TOPICNAME/[0]
00:00:53.27     info    [email protected]/consumergroup.go:973   joinGroup succeeded for response, READERCONSUMERGROUP.  generationID=3, memberID=READER@READER (github.com/segmentio/kafka-go)-6dab7dfe-da33-4878-a70a-d1cf303436c9
00:00:53.27     info    [email protected]/consumergroup.go:806   Joined group READERCONSUMERGROUP as member READER@READER (github.com/segmentio/kafka-go)-6dab7dfe-da33-4878-a70a-d1cf303436c9 in generation 3
00:00:53.27     info    [email protected]/consumergroup.go:1139  Syncing 1 assignments for generation 3 as member READER@READER (github.com/segmentio/kafka-go)-6dab7dfe-da33-4878-a70a-d1cf303436c9
00:00:53.29     info    [email protected]/consumergroup.go:1104  sync group finished for group, READERCONSUMERGROUP
00:00:53.31     info    [email protected]/reader.go:141  subscribed to topics and partitions: map[{topic:TOPICNAME partition:0}:-1]
00:00:53.31     info    [email protected]/consumergroup.go:467   started heartbeat for group, READERCONSUMERGROUP [3s]
00:00:53.31     info    [email protected]/reader.go:1272 initializing kafka reader for partition 0 of TOPICNAME starting at offset last offset
00:00:53.31     info    [email protected]/reader.go:274  started commit for group READERCONSUMERGROUP
00:00:53.43     info    [email protected]/reader.go:1473 the kafka reader for partition 0 of TOPICNAME is seeking to offset 0
00:00:53.57     info    [email protected]/reader.go:1375 no messages received from kafka within the allocated time for partition 0 of TOPICNAME at offset 0: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
...
00:03:00.06     info    [email protected]/reader.go:1427 the kafka reader got an unknown error reading partition 0 of TOPICNAME at offset 0: read tcp 11.0.0.4:36119->11.0.0.5:9093: i/o timeout
00:03:00.16     info    [email protected]/reader.go:1272 initializing kafka reader for partition 0 of TOPICNAME starting at offset 0
00:03:00.28     info    [email protected]/reader.go:1473 the kafka reader for partition 0 of TOPICNAME is seeking to offset 0
00:03:00.42     info    [email protected]/reader.go:1375 no messages received from kafka within the allocated time for partition 0 of TOPICNAME at offset 0: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
00:03:00.52     info    [email protected]/reader.go:1427 the kafka reader got an unknown error reading partition 0 of TOPICNAME at offset 0: read tcp 11.0.0.4:47346->11.0.0.5:9093: i/o timeout
00:03:00.62     info    [email protected]/reader.go:1272 initializing kafka reader for partition 0 of TOPICNAME starting at offset 0
00:03:07.74     info    [email protected]/reader.go:1473 the kafka reader for partition 0 of TOPICNAME is seeking to offset 0
00:03:07.88     info    [email protected]/reader.go:1427 the kafka reader got an unknown error reading partition 0 of TOPICNAME at offset 0: read tcp 11.0.0.4:27880->11.0.0.5:9093: i/o timeout
00:03:07.98     info    [email protected]/reader.go:1272 initializing kafka reader for partition 0 of TOPICNAME starting at offset 0
00:03:09.1      info    [email protected]/reader.go:1473 the kafka reader for partition 0 of TOPICNAME is seeking to offset 0
00:03:09.24     info    [email protected]/reader.go:1427 the kafka reader got an unknown error reading partition 0 of TOPICNAME at offset 0: read tcp 11.0.0.4:31747->11.0.0.5:9093: i/o timeout
00:03:09.34     info    [email protected]/reader.go:1272 initializing kafka reader for partition 0 of TOPICNAME starting at offset 0
00:03:13.48     info    [email protected]/reader.go:1473 the kafka reader for partition 0 of TOPICNAME is seeking to offset 0
00:03:17.64     info    [email protected]/reader.go:1427 the kafka reader got an unknown error reading partition 0 of TOPICNAME at offset 0: read tcp 11.0.0.4:51137->11.0.0.5:9093: i/o timeout
00:03:17.74     info    [email protected]/reader.go:1272 initializing kafka reader for partition 0 of TOPICNAME starting at offset 0
00:03:24.86     info    [email protected]/reader.go:1473 the kafka reader for partition 0 of TOPICNAME is seeking to offset 0
00:03:28        info    [email protected]/reader.go:1427 the kafka reader got an unknown error reading partition 0 of TOPICNAME at offset 0: read tcp 11.0.0.4:49546->11.0.0.5:9093: i/o timeout
00:03:28.1      info    [email protected]/reader.go:1272 initializing kafka reader for partition 0 of TOPICNAME starting at offset 0
00:03:31.24     info    [email protected]/reader.go:1473 the kafka reader for partition 0 of TOPICNAME is seeking to offset 0
00:03:31.31     info    [email protected]/consumergroup.go:470   stopped heartbeat for group READERCONSUMERGROUP
00:03:31.31     info    [email protected]/reader.go:277  stopped commit for group READERCONSUMERGROUP
00:03:38.18     info    [email protected]/reader.go:1302 error initializing the kafka reader for partition 0 of TOPICNAME: read tcp 11.0.0.4:58618->11.0.0.5:9093: i/o timeout
00:03:40.3      info    [email protected]/consumergroup.go:952   joined group READERCONSUMERGROUP as member READER@READER (github.com/segmentio/kafka-go)-6dab7dfe-da33-4878-a70a-d1cf303436c9 in generation 4
...
00:03:46.42     info    [email protected]/reader.go:141  subscribed to topics and partitions: map[{topic:TOPICNAME partition:0}:-1]
^^^ still -1 in the ConsumerGroup
00:03:46.42     info    [email protected]/consumergroup.go:467   started heartbeat for group, READERCONSUMERGROUP [3s]
00:03:46.42     info    [email protected]/reader.go:1272 initializing kafka reader for partition 0 of TOPICNAME starting at offset last offset
00:03:46.42     info    [email protected]/reader.go:274  started commit for group READERCONSUMERGROUP
00:03:54.59     info    [email protected]/reader.go:1473 the kafka reader for partition 0 of TOPICNAME is seeking to offset 2
^^^ At this point messages at offset 0 and 1 is lost
00:03:57.53     info    [email protected]/reader.go:1302 error initializing the kafka reader for partition 0 of TOPICNAME: read tcp 11.0.0.4:48613->11.0.0.5:9093: i/o timeout
00:03:57.63     info    [email protected]/reader.go:1272 initializing kafka reader for partition 0 of TOPICNAME starting at offset last offset
00:03:58.75     info    [email protected]/reader.go:1473 the kafka reader for partition 0 of TOPICNAME is seeking to offset 3
^^^ At this point message at offset 2 is also lost
00:03:58.89     info    [email protected]/reader.go:1427 the kafka reader got an unknown error reading partition 0 of TOPICNAME at offset 3: read tcp 11.0.0.4:60875->11.0.0.5:9093: i/o timeout
00:03:58.99     info    [email protected]/reader.go:1272 initializing kafka reader for partition 0 of TOPICNAME starting at offset 3
00:04:00.13     info    [email protected]/reader.go:1473 the kafka reader for partition 0 of TOPICNAME is seeking to offset 3
00:04:00.27     info    [email protected]/reader.go:1375 no messages received from kafka within the allocated time for partition 0 of TOPICNAME at offset 4: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
00:04:00.27     info    reader/reader.go:143    FetchMessage result: {Topic:TOPICNAME Partition:0 Offset:3 HighWaterMark:4 Key:[] Value:....}
^^^ first delivered message to the app -- from the offset 3
00:04:00.37     info    [email protected]/reader.go:1427 the kafka reader got an unknown error reading partition 0 of TOPICNAME at offset 4: read tcp 11.0.0.4:22278->11.0.0.5:9093: i/o timeout
...

datacompboy avatar Aug 13 '24 16:08 datacompboy

Not sure how good is that approach, but pushing the initial offset if it differs to the parent and commit it from FetchMessage itself seems like fixes the behaviour:

diff -u ~/go/pkg/mod/cache/download/github.com/segmentio/kafka-go/@v/github.com/segmentio/[email protected]/reader.go ./reader.go
--- ~/go/pkg/mod/cache/download/github.com/segmentio/kafka-go/@v/github.com/segmentio/[email protected]/reader.go	1979-12-31 00:00:00.000000000 +0100
+++ ./reader.go	2024-08-14 13:09:29.231441428 +0200
@@ -835,6 +835,16 @@
 				return Message{}, io.EOF
 			}
 
+            // "initial offset" notification
+            if m.error == nil && m.version == version && m.message.HighWaterMark==-1 && m.watermark == -1 {
+                if r.useConsumerGroup() {
+                    if err := r.CommitMessages(ctx, m.message); err != nil {
+                        return Message{}, err
+                    }
+                }
+                continue
+            }
+
 			if m.version >= version {
 				r.mutex.Lock()
 
@@ -1312,6 +1322,14 @@
 
 		// Now we're sure to have an absolute offset number, may anything happen
 		// to the connection we know we'll want to restart from this offset.
+        if offset != start {
+            // If we had to seek from non-absolute offset, inform Reader about initial point
+            // So it get remembered in case of reconnection.
+            if err = r.sendMessage(ctx, Message{Topic:r.topic, Partition:r.partition, Offset:start-1, HighWaterMark:-1}, -1); err != nil {
+                conn.Close()
+                return
+            }
+        }
 		offset = start
 
 		errcount := 0

Just as an idea to fix, i've verified it fixes the issue -- but I believe the code may and should be better & tested :) -- so I don't have ready-to-use patch here.

datacompboy avatar Aug 14 '24 11:08 datacompboy