kafka-go
kafka-go copied to clipboard
batch.ReadMessage/Close fail with ErrUnexpectedEOF
Describe the bug
When setting a read deadline on a connection batch.ReadMessage sometime fails with a ErrUnexpectedEOF.
I based my code on reader.read, my goal is to make the equivalent of maxWait
work. You can see the code here.
Here's a sample output:
$ ./kafkago-bug
2019/05/08 18:13:50 msg: 400
2019/05/08 18:13:50 msg: 400
2019/05/08 18:13:50 msg: 400
2019/05/08 18:13:50 msg: 400
2019/05/08 18:13:50 msg: 400
2019/05/08 18:13:50 read nothing
2019/05/08 18:13:50 read nothing
2019/05/08 18:13:50 read nothing
2019/05/08 18:13:50 read nothing
2019/05/08 18:13:50 read nothing
2019/05/08 18:13:51 unable to close batch: unexpected EOF
I tried to tweak the deadlines, using 10s everywhere but I always end up getting a ErrUnexpectedEOF.
I managed to track the source of the error to this code. And looking at checkTimeoutErr it seems to fail because the deadline wasn't reached yet, but somehow there was still a errShortRead before. That EOF error is then replaced with dontExpectEOF
.
Also note that if I use Reader
with a Logger defined they're logged too:
kafka 2019/05/08 18:16:05 initializing kafka reader for partition 0 of foobar starting at offset -2
kafka 2019/05/08 18:16:05 the kafka reader for partition 0 of foobar is seeking to offset 0
kafka 2019/05/08 18:16:05 looking up offset of kafka reader for partition 0 of foobar: -2
kafka 2019/05/08 18:16:05 the kafka reader got an unknown error reading partition 0 of foobar at offset 4: EOF
2019/05/08 18:16:05 msg: 400
2019/05/08 18:16:05 msg: 400
2019/05/08 18:16:05 msg: 400
2019/05/08 18:16:05 msg: 400
kafka 2019/05/08 18:16:05 initializing kafka reader for partition 0 of foobar starting at offset 4
kafka 2019/05/08 18:16:05 the kafka reader for partition 0 of foobar is seeking to offset 4
kafka 2019/05/08 18:16:06 no messages received from kafka within the allocated time for partition 0 of foobar at offset 5
2019/05/08 18:16:06 msg: 400
kafka 2019/05/08 18:16:06 no messages received from kafka within the allocated time for partition 0 of foobar at offset 5
kafka 2019/05/08 18:16:06 the kafka reader got an unknown error reading partition 0 of foobar at offset 5: unexpected EOF
kafka 2019/05/08 18:16:06 initializing kafka reader for partition 0 of foobar starting at offset 5
kafka 2019/05/08 18:16:06 the kafka reader for partition 0 of foobar is seeking to offset 5
kafka 2019/05/08 18:16:06 no messages received from kafka within the allocated time for partition 0 of foobar at offset 5
kafka 2019/05/08 18:16:06 no messages received from kafka within the allocated time for partition 0 of foobar at offset 5
kafka 2019/05/08 18:16:06 no messages received from kafka within the allocated time for partition 0 of foobar at offset 5
kafka 2019/05/08 18:16:07 no messages received from kafka within the allocated time for partition 0 of foobar at offset 5
kafka 2019/05/08 18:16:07 no messages received from kafka within the allocated time for partition 0 of foobar at offset 5
kafka 2019/05/08 18:16:07 no messages received from kafka within the allocated time for partition 0 of foobar at offset 5
kafka 2019/05/08 18:16:07 no messages received from kafka within the allocated time for partition 0 of foobar at offset 5
kafka 2019/05/08 18:16:07 the kafka reader got an unknown error reading partition 0 of foobar at offset 5: unexpected EOF
kafka 2019/05/08 18:16:07 initializing kafka reader for partition 0 of foobar starting at offset 5
kafka 2019/05/08 18:16:07 the kafka reader for partition 0 of foobar is seeking to offset 5
kafka 2019/05/08 18:16:07 the kafka reader got an unknown error reading partition 0 of foobar at offset 5: unexpected EOF
Kafka Version
Kafka 2.1.0, Kafka 2.2.0, Kafka 1.1.1 and Kafka 0.11.0.3, all fail in the same way. All are running on localhost.
To Reproduce
See above
Expected behavior
No ErrUnexpectedEOF returned.
@vrischmann thanks for reporting.
Could you confirm which version of kafka-go you are using?
I tested with 7c6cb19885b3302c6320871f91f1949b8249bec0
Hi. Just to note, I tested again with v0.2.5 just now and I still get these errors.
This time around I only tested using the Reader, not my code above to reduce the possibility of errors. Here's the code to reproduce:
package main
import (
"context"
"log"
"os"
"time"
"github.com/segmentio/kafka-go"
)
const (
kb = 1024
)
func readPartition(ctx context.Context, partition int) error {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
MinBytes: 1 * kb,
MaxBytes: 2 * kb,
MaxWait: 10000 * time.Millisecond,
Topic: "foobar",
Partition: 0,
Logger: log.New(os.Stderr, "kafka ", log.LstdFlags),
})
for {
msg, err := reader.FetchMessage(ctx)
if err != nil {
return err
}
log.Printf("msg: %d", len(msg.Value))
}
}
func main() {
ctx := context.Background()
if err := readPartition(ctx, 0); err != nil {
log.Fatal(err)
}
}
I produce a stream of messages with this:
./bin/kafka-producer-perf-test.sh --topic foobar --throughput 1000 --num-records 2000000 --producer-props bootstrap.servers=localhost:9092 --print-metrics --record-size 400
Then start the program above and I get this in my logs:
afka 2019/06/25 21:12:50 initializing kafka reader for partition 0 of foobar starting at offset -2
kafka 2019/06/25 21:12:50 the kafka reader for partition 0 of foobar is seeking to offset 5
kafka 2019/06/25 21:12:50 looking up offset of kafka reader for partition 0 of foobar: -2
kafka 2019/06/25 21:12:50 the kafka reader got an unknown error reading partition 0 of foobar at offset 30: EOF
2019/06/25 21:12:50 msg: 400
2019/06/25 21:12:50 msg: 400
2019/06/25 21:12:50 msg: 400
2019/06/25 21:12:50 msg: 400
2019/06/25 21:12:50 msg: 400
2019/06/25 21:12:50 msg: 400
2019/06/25 21:12:50 msg: 400
2019/06/25 21:12:50 msg: 400
2019/06/25 21:12:50 msg: 400
2019/06/25 21:12:50 msg: 400
2019/06/25 21:12:50 msg: 400
2019/06/25 21:12:50 msg: 400
2019/06/25 21:12:50 msg: 400
2019/06/25 21:12:50 msg: 400
2019/06/25 21:12:50 msg: 400
2019/06/25 21:12:50 msg: 400
2019/06/25 21:12:50 msg: 400
2019/06/25 21:12:50 msg: 400
2019/06/25 21:12:50 msg: 400
2019/06/25 21:12:50 msg: 400
2019/06/25 21:12:50 msg: 400
2019/06/25 21:12:50 msg: 400
2019/06/25 21:12:50 msg: 400
2019/06/25 21:12:50 msg: 400
2019/06/25 21:12:50 msg: 400
kafka 2019/06/25 21:12:50 initializing kafka reader for partition 0 of foobar starting at offset 30
kafka 2019/06/25 21:12:50 the kafka reader for partition 0 of foobar is seeking to offset 30
kafka 2019/06/25 21:12:50 the kafka reader got an unknown error reading partition 0 of foobar at offset 34: EOF
2019/06/25 21:12:50 msg: 400
2019/06/25 21:12:50 msg: 400
2019/06/25 21:12:50 msg: 400
2019/06/25 21:12:50 msg: 400
kafka 2019/06/25 21:12:50 initializing kafka reader for partition 0 of foobar starting at offset 34
kafka 2019/06/25 21:12:50 the kafka reader for partition 0 of foobar is seeking to offset 34
kafka 2019/06/25 21:12:50 the kafka reader got an unknown error reading partition 0 of foobar at offset 48: EOF
2019/06/25 21:12:50 msg: 400
2019/06/25 21:12:50 msg: 400
2019/06/25 21:12:50 msg: 400
2019/06/25 21:12:50 msg: 400
The EOF never stop as far as I can tell. I played around with MinBytes
, MaxBytes
and MaxWait
and I still regularly get EOF errors.
It fails against Kafka 0.11.0.3, 1.1.1, 2.1.0 and 2.2.0 still.
i have the same error : kafka 2019/06/25 21:12:50 the kafka reader got an unknown error reading partition 0 of foobar at offset 48: EOF, any help ?
may be is it this function issue ?
batch.go
Any updates here? I am using latest kafka-go and all I seem to get is "EOF" errors. ReadPartitions and ReadMessage appear to be the culprits.
Just wanted to leave an update that this is still being tracked, thanks everyone for all the context you've shared!
Same problem with reader here, e.g.
1:55PM DBG no messages received from kafka within the allocated time for partition 1 of test-entry-changed at offset 2533029
1:55PM DBG the kafka reader got an unknown error reading partition 9 of test-item-changed at offset 297481: unexpected EOF
1:55PM DBG initializing kafka reader for partition 9 of test-item-changed starting at offset 297481
1:55PM DBG the kafka reader for partition 9 of test-item-changed is seeking to offset 297481
1:55PM DBG no messages received from kafka within the allocated time for partition 9 of test-entry-changed at offset 2545200
1:55PM DBG no messages received from kafka within the allocated time for partition 4 of test-item-changed at offset 302591
This continues on and on. Any update on this would be very welcome.
The work in https://github.com/segmentio/kafka-go/pull/846 is likely going to help identify the conditions that trigger the issue reported here.