kafka-go icon indicating copy to clipboard operation
kafka-go copied to clipboard

the kafka reader got an unknown error reading partition

Open nsomarouthu opened this issue 3 years ago • 16 comments

First issue

we're often getting a reader error ~500k/day.

the kafka reader got an unknown error reading partition 9 of SOME_TOPIC at offset 3: read tcp IP_ADDRESS:46406->IP_ADDRESS:9093: i/o timeout

https://github.com/segmentio/kafka-go/blob/a4890bd956b5658ca57e964dd28381c0ee4fd617/reader.go#L1366-L1375

Kafka-go 0.4.16

Kafka 2.5.0

Second issue

The errors happened when kafka reader is committing the message after it has been processed successfully, the message was re-consumed by another replica

"msg": "debezium.Consumer: failed to commit message: write tcp IP_ADDRESS:49610->IP_ADDRESS:9093: use of closed network connection" Receiving Successfully handled message and a while afterward getting failed to commit message

nsomarouthu avatar Aug 25 '21 22:08 nsomarouthu

Hello @nsomarouthu, thanks for reporting the issue.

Do you know whether these errors are causing adverse effect on your application? Or are they simply transient errors that are handled and recovered by kafka-go automatically?

I/O timeouts seem like frequent problems in network systems, so I would not be too concerned about it, but let me know if this is causing any other issues!

achille-roussel avatar Aug 31 '21 22:08 achille-roussel

Hello @achille-roussel, If there is any setting in kafka.ReaderConfig to reduce the number of errors that we are experiencing currently with our configuration. Reader exit the loops when reporting this issue and reconnect. This doesn't impact the reading functionality of the message, but this seems expensive.

nsomarouthu avatar Sep 01 '21 20:09 nsomarouthu

@achille-roussel I am in the same team/company as @nsomarouthu, working together on the 2 issues. For the 1st issue, we have many the kafka reader got an unknown error reading partition as well as numerous no messages received from kafka within the allocated time for partition logged. I believe this is because our applications set ReaderConfig.MaxWait https://github.com/segmentio/kafka-go/blob/master/reader.go#L395 to 100ms. There are some messages that we want the application to fetch and handle as soon as possible. Do you see any concern with setting a short ReaderConfig.MaxWait? Does ReaderConfig.MaxWait duration impact the expect message consumption rate? (We only need a short ReaderConfig.MaxWait for some of our apps, not all of our apps).

For the 2nd issue, we have a few error messages about 5 seconds after finished handling and trying to commit the Kafka message with error such as write tcp 100.96.92.51:52652->100.67.54.193:9093: use of closed network connection . I believe this is related to ConsumerGroupConfig.Timeout (default to 5s) https://github.com/segmentio/kafka-go/blob/master/consumergroup.go#L161 Is there a way to set a different (longer) value when creating a NewReader with a ReaderConfig? It does not look like it is configurable here: https://github.com/segmentio/kafka-go/blob/master/reader.go#L704

chihweichang avatar Sep 02 '21 23:09 chihweichang

A MaxWait for 100ms definitely seems short. Typically, I've seen this value configured to between 250ms and a couple of seconds. Kafka is designed for batch processing, clients use long-polling to fetch batches of messages, which is what MaxWait configures. In my experience, Kafka can be complex to configure well to achieve low latency.

For you second question, it appears we are not exposing this configuration option when creating the ConsumerGroup internally in the Reader. Would you be able to contribute a change to add this option?

achille-roussel avatar Sep 07 '21 21:09 achille-roussel

@achille-roussel thanks for the feedback. We will test tuning of MaxWait (likely in combination of MinBytes and MaxBites) in our applications.

We can also test configuring ConsumerGroupConfig.Timeout on our end to see if that reduces the errors we are seeing. If it works, we can contribute to making this option configurable.

chihweichang avatar Sep 09 '21 17:09 chihweichang

Hello @chihweichang, how did your investigation go? Are there any follow ups we should discuss here?

achille-roussel avatar Nov 05 '21 15:11 achille-roussel

same error message when reading large data, server side client reader no issue, local dev environment reader cannot read the large data with error: the kafka reader got an unknown error reading partition .... : i/o timeout Found the solution, need to update the code in reader.go, there is an safeTimeout which set 10 sec, after updated to 60 sec, no issue any more

func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) {
	r.stats.fetches.observe(1)
	r.stats.offset.observe(offset)

	t0 := time.Now()
	conn.SetReadDeadline(t0.Add(r.maxWait))

	batch := conn.ReadBatchWith(ReadBatchConfig{
		MinBytes:       r.minBytes,
		MaxBytes:       r.maxBytes,
		IsolationLevel: r.isolationLevel,
	})
	highWaterMark := batch.HighWaterMark()

	t1 := time.Now()
	r.stats.waitTime.observeDuration(t1.Sub(t0))

	var msg Message
	var err error
	var size int64
	var bytes int64

	const safetyTimeout = 60 * time.Second // changed from 10 to 60
	deadline := time.Now().Add(safetyTimeout)
	conn.SetReadDeadline(deadline)

lujiacn avatar Nov 11 '21 02:11 lujiacn

And not sure the meaning of code below

	const safetyTimeout = 60 * time.Second // changed from 10 to 60
	deadline := time.Now().Add(safetyTimeout)
	conn.SetReadDeadline(deadline)

it overwrite the deadline which been set before and cannot be configured with MaxWait in Reader configure. Can we remove the code above? Refer to commented code below

func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) {
	r.stats.fetches.observe(1)
	r.stats.offset.observe(offset)

	t0 := time.Now()
	conn.SetReadDeadline(t0.Add(r.maxWait))

	batch := conn.ReadBatchWith(ReadBatchConfig{
		MinBytes:       r.minBytes,
		MaxBytes:       r.maxBytes,
		IsolationLevel: r.isolationLevel,
	})
	highWaterMark := batch.HighWaterMark()

	t1 := time.Now()
	r.stats.waitTime.observeDuration(t1.Sub(t0))

	var msg Message
	var err error
	var size int64
	var bytes int64

	//const safetyTimeout = 10 * time.Second
	//deadline := time.Now().Add(safetyTimeout)
	//conn.SetReadDeadline(deadline)

	for {
		//if now := time.Now(); deadline.Sub(now) < (safetyTimeout / 2) {
		//deadline = now.Add(safetyTimeout)
		//conn.SetReadDeadline(deadline)
		//}

		if msg, err = batch.ReadMessage(); err != nil {
			batch.Close()
			break
		}

		n := int64(len(msg.Key) + len(msg.Value))
		r.stats.messages.observe(1)
		r.stats.bytes.observe(n)

		if err = r.sendMessage(ctx, msg, highWaterMark); err != nil {
			batch.Close()
			break
		}

		offset = msg.Offset + 1
		r.stats.offset.observe(offset)
		r.stats.lag.observe(highWaterMark - offset)

		size++
		bytes += n
	}

	conn.SetReadDeadline(time.Time{})

	t2 := time.Now()
	r.stats.readTime.observeDuration(t2.Sub(t1))
	r.stats.fetchSize.observe(size)
	r.stats.fetchBytes.observe(bytes)
	return offset, err
}

lujiacn avatar Nov 11 '21 06:11 lujiacn

Hello @chihweichang, how did your investigation go? Are there any follow ups we should discuss here?

@achille-roussel sorry, I haven't had time to investigate further on this issue.

chihweichang avatar Nov 12 '21 18:11 chihweichang

Hello @chihweichang!

I wanted to follow up on this issue and ask if you were able to investigate it further.


@lujiacn thanks for following up with suggested changes.

The 10s safeguard is definitely very opinionated, it would be better for this value to be configurable. Would you be available to send a pull request to support tuning this timeout?

achille-roussel avatar Feb 15 '22 17:02 achille-roussel

One of the users of my project is experiencing the same issue and I don't know what's causing the issue.

@nsomarouthu @chihweichang Have you been able to pinpoint the issue here?

@moogacs You seem to have fixed the issue on your fork. Is there anything you can add here?

mostafa avatar Oct 11 '22 21:10 mostafa

@mostafa, the issue has been detected and indeed allowing to configure the reads timeout will fix is so the user of the lib will be able to adjust the read timeouts based on their needs.

atm it's hard coded to 10 sec. and increasing that will resolve it.

So I am waiting in approving my PR and merging it

moogacs avatar Oct 11 '22 21:10 moogacs

@achille-roussel i think can help in peroritizing https://github.com/segmentio/kafka-go/pull/989

moogacs avatar Oct 11 '22 21:10 moogacs

@achille-roussel I saw your review on @moogacs's PR, #989. What does it take to merge that PR? Is there anything missing? Can I help in any way?

mostafa avatar Oct 11 '22 21:10 mostafa

#989 has been merged, let me know if you are still experiencing the issue on the latest version of kafka-go!

achille-roussel avatar Oct 14 '22 17:10 achille-roussel

@achille-roussel One of the users of xk6-kafka (and kafka-go) is still experiencing the issue: https://github.com/mostafa/xk6-kafka/issues/185.

mostafa avatar Jan 12 '23 10:01 mostafa