kafka-go
kafka-go copied to clipboard
the kafka reader got an unknown error reading partition
First issue
we're often getting a reader error ~500k/day.
the kafka reader got an unknown error reading partition 9 of SOME_TOPIC at offset 3: read tcp IP_ADDRESS:46406->IP_ADDRESS:9093: i/o timeout
https://github.com/segmentio/kafka-go/blob/a4890bd956b5658ca57e964dd28381c0ee4fd617/reader.go#L1366-L1375
Kafka-go 0.4.16
Kafka 2.5.0
Second issue
The errors happened when kafka reader is committing the message after it has been processed successfully, the message was re-consumed by another replica
"msg": "debezium.Consumer: failed to commit message: write tcp IP_ADDRESS:49610->IP_ADDRESS:9093: use of closed network connection" Receiving Successfully handled message and a while afterward getting failed to commit message
Hello @nsomarouthu, thanks for reporting the issue.
Do you know whether these errors are causing adverse effect on your application? Or are they simply transient errors that are handled and recovered by kafka-go automatically?
I/O timeouts seem like frequent problems in network systems, so I would not be too concerned about it, but let me know if this is causing any other issues!
Hello @achille-roussel, If there is any setting in kafka.ReaderConfig to reduce the number of errors that we are experiencing currently with our configuration. Reader exit the loops when reporting this issue and reconnect. This doesn't impact the reading functionality of the message, but this seems expensive.
@achille-roussel I am in the same team/company as @nsomarouthu, working together on the 2 issues.
For the 1st issue, we have many the kafka reader got an unknown error reading partition
as well as numerous no messages received from kafka within the allocated time for partition
logged. I believe this is because our applications set ReaderConfig.MaxWait
https://github.com/segmentio/kafka-go/blob/master/reader.go#L395 to 100ms. There are some messages that we want the application to fetch and handle as soon as possible. Do you see any concern with setting a short ReaderConfig.MaxWait
? Does ReaderConfig.MaxWait
duration impact the expect message consumption rate? (We only need a short ReaderConfig.MaxWait
for some of our apps, not all of our apps).
For the 2nd issue, we have a few error messages about 5 seconds after finished handling and trying to commit the Kafka message with error such as write tcp 100.96.92.51:52652->100.67.54.193:9093: use of closed network connection
. I believe this is related to ConsumerGroupConfig.Timeout
(default to 5s) https://github.com/segmentio/kafka-go/blob/master/consumergroup.go#L161
Is there a way to set a different (longer) value when creating a NewReader
with a ReaderConfig
? It does not look like it is configurable here: https://github.com/segmentio/kafka-go/blob/master/reader.go#L704
A MaxWait for 100ms definitely seems short. Typically, I've seen this value configured to between 250ms and a couple of seconds. Kafka is designed for batch processing, clients use long-polling to fetch batches of messages, which is what MaxWait configures. In my experience, Kafka can be complex to configure well to achieve low latency.
For you second question, it appears we are not exposing this configuration option when creating the ConsumerGroup internally in the Reader. Would you be able to contribute a change to add this option?
@achille-roussel thanks for the feedback. We will test tuning of MaxWait (likely in combination of MinBytes and MaxBites) in our applications.
We can also test configuring ConsumerGroupConfig.Timeout
on our end to see if that reduces the errors we are seeing. If it works, we can contribute to making this option configurable.
Hello @chihweichang, how did your investigation go? Are there any follow ups we should discuss here?
same error message when reading large data, server side client reader no issue, local dev environment reader cannot read the large data with error:
the kafka reader got an unknown error reading partition .... : i/o timeout
Found the solution, need to update the code in reader.go, there is an safeTimeout
which set 10 sec, after updated to 60 sec, no issue any more
func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) {
r.stats.fetches.observe(1)
r.stats.offset.observe(offset)
t0 := time.Now()
conn.SetReadDeadline(t0.Add(r.maxWait))
batch := conn.ReadBatchWith(ReadBatchConfig{
MinBytes: r.minBytes,
MaxBytes: r.maxBytes,
IsolationLevel: r.isolationLevel,
})
highWaterMark := batch.HighWaterMark()
t1 := time.Now()
r.stats.waitTime.observeDuration(t1.Sub(t0))
var msg Message
var err error
var size int64
var bytes int64
const safetyTimeout = 60 * time.Second // changed from 10 to 60
deadline := time.Now().Add(safetyTimeout)
conn.SetReadDeadline(deadline)
And not sure the meaning of code below
const safetyTimeout = 60 * time.Second // changed from 10 to 60
deadline := time.Now().Add(safetyTimeout)
conn.SetReadDeadline(deadline)
it overwrite the deadline which been set before and cannot be configured with MaxWait in Reader configure. Can we remove the code above? Refer to commented code below
func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) {
r.stats.fetches.observe(1)
r.stats.offset.observe(offset)
t0 := time.Now()
conn.SetReadDeadline(t0.Add(r.maxWait))
batch := conn.ReadBatchWith(ReadBatchConfig{
MinBytes: r.minBytes,
MaxBytes: r.maxBytes,
IsolationLevel: r.isolationLevel,
})
highWaterMark := batch.HighWaterMark()
t1 := time.Now()
r.stats.waitTime.observeDuration(t1.Sub(t0))
var msg Message
var err error
var size int64
var bytes int64
//const safetyTimeout = 10 * time.Second
//deadline := time.Now().Add(safetyTimeout)
//conn.SetReadDeadline(deadline)
for {
//if now := time.Now(); deadline.Sub(now) < (safetyTimeout / 2) {
//deadline = now.Add(safetyTimeout)
//conn.SetReadDeadline(deadline)
//}
if msg, err = batch.ReadMessage(); err != nil {
batch.Close()
break
}
n := int64(len(msg.Key) + len(msg.Value))
r.stats.messages.observe(1)
r.stats.bytes.observe(n)
if err = r.sendMessage(ctx, msg, highWaterMark); err != nil {
batch.Close()
break
}
offset = msg.Offset + 1
r.stats.offset.observe(offset)
r.stats.lag.observe(highWaterMark - offset)
size++
bytes += n
}
conn.SetReadDeadline(time.Time{})
t2 := time.Now()
r.stats.readTime.observeDuration(t2.Sub(t1))
r.stats.fetchSize.observe(size)
r.stats.fetchBytes.observe(bytes)
return offset, err
}
Hello @chihweichang, how did your investigation go? Are there any follow ups we should discuss here?
@achille-roussel sorry, I haven't had time to investigate further on this issue.
Hello @chihweichang!
I wanted to follow up on this issue and ask if you were able to investigate it further.
@lujiacn thanks for following up with suggested changes.
The 10s safeguard is definitely very opinionated, it would be better for this value to be configurable. Would you be available to send a pull request to support tuning this timeout?
One of the users of my project is experiencing the same issue and I don't know what's causing the issue.
@nsomarouthu @chihweichang Have you been able to pinpoint the issue here?
@moogacs You seem to have fixed the issue on your fork. Is there anything you can add here?
@mostafa, the issue has been detected and indeed allowing to configure the reads timeout will fix is so the user of the lib will be able to adjust the read timeouts based on their needs.
atm it's hard coded to 10 sec. and increasing that will resolve it.
So I am waiting in approving my PR and merging it
@achille-roussel i think can help in peroritizing https://github.com/segmentio/kafka-go/pull/989
@achille-roussel I saw your review on @moogacs's PR, #989. What does it take to merge that PR? Is there anything missing? Can I help in any way?
#989 has been merged, let me know if you are still experiencing the issue on the latest version of kafka-go!
@achille-roussel One of the users of xk6-kafka (and kafka-go) is still experiencing the issue: https://github.com/mostafa/xk6-kafka/issues/185.