sarama icon indicating copy to clipboard operation
sarama copied to clipboard

producer write broken pipe

Open chowyu08 opened this issue 5 years ago β€’ 21 comments

Versions

V1.24.1

Sarama Kafka Go
1.24.1 Kafka 2.1.1 1.13.1
Configuration

What configuration values are you using for Sarama and Kafka?

conf := sarama.NewConfig()
	conf.Version = sarama.V2_0_0_0
	conf.Producer.RequiredAcks = sarama.WaitForLocal
	conf.ChannelBufferSize = 1024
	conf.Net.KeepAlive = 30 * time.Second
Logs
logs: CLICK ME

time="2019-12-25T09:15:51Z" level=info msg="producer/broker/3 state change to [closing] because write tcp xxxxx:59382->xxxx:9092: write: broken pipe\n" time="2019-12-25T09:15:51Z" level=info msg="producer/broker/2 state change to [closing] because write tcp xxxx:33336->xxxx:9092: write: broken pipe\n" time="2019-12-25T09:15:51Z" level=info msg="Closed connection to broker xxxx:9092\n" time="2019-12-25T09:15:51Z" level=info msg="producer/leader/metric.transfer/0 state change to [retrying-1]\n" time="2019-12-25T09:15:51Z" level=info msg="producer/leader/metric.transfer/0 abandoning broker 3\n" time="2019-12-25T09:15:51Z" level=info msg="Closed connection to broker xxxx:9092\n" time="2019-12-25T09:15:51Z" level=info msg="producer/broker/3 input chan closed\n" time="2019-12-25T09:15:51Z" level=info msg="producer/broker/3 shut down\n" time="2019-12-25T09:15:51Z" level=info msg="producer/leader/msg.republish/1 state change to [retrying-1]\n" time="2019-12-25T09:15:51Z" level=info msg="producer/leader/msg.republish/1 abandoning broker 2\n" time="2019-12-25T09:15:51Z" level=info msg="producer/leader/metric.hit/0 state change to [retrying-1]\n" time="2019-12-25T09:15:51Z" level=info msg="producer/leader/metric.hit/0 abandoning broker 2\n" time="2019-12-25T09:15:51Z" level=info msg="producer/broker/2 input chan closed\n" time="2019-12-25T09:15:51Z" level=info msg="producer/broker/2 shut down\n" time="2019-12-25T09:15:51Z" level=info msg="client/metadata fetching metadata for [metric.transfer] from broker xxxx:9092\n" time="2019-12-25T09:15:51Z" level=info msg="client/metadata fetching metadata for [msg.republish] from broker xxxx:9092\n" time="2019-12-25T09:15:51Z" level=info msg="client/metadata fetching metadata for [metric.hit] from broker xxxx:9092\n" time="2019-12-25T09:15:51Z" level=info msg="client/brokers replaced registered broker #1 with xxxx:9092" time="2019-12-25T09:15:51Z" level=info msg="ClientID is the default of 'sarama', you should consider setting it to something application-specific." time="2019-12-25T09:15:51Z" level=info msg="producer/broker/3 starting up\n" time="2019-12-25T09:15:51Z" level=info msg="producer/broker/3 state change to [open] on metric.transfer/0\n" time="2019-12-25T09:15:51Z" level=info msg="client/brokers replaced registered broker #1 with xxxx:9092" time="2019-12-25T09:15:51Z" level=info msg="ClientID is the default of 'sarama', you should consider setting it to something application-specific." time="2019-12-25T09:15:51Z" level=info msg="producer/leader/metric.transfer/0 selected broker 3\n" time="2019-12-25T09:15:51Z" level=info msg="producer/leader/metric.transfer/0 state change to [flushing-1]\n" time="2019-12-25T09:15:51Z" level=info msg="producer/leader/metric.transfer/0 state change to [normal]\n" time="2019-12-25T09:15:51Z" level=info msg="producer/broker/2 starting up\n" time="2019-12-25T09:15:51Z" level=info msg="producer/broker/2 state change to [open] on metric.ruleengine.hit/0\n" time="2019-12-25T09:15:51Z" level=info msg="client/brokers replaced registered broker #1 with xxxx:9092" time="2019-12-25T09:15:51Z" level=info msg="producer/leader/metric.hit/0 selected broker 2\n" time="2019-12-25T09:15:51Z" level=info msg="producer/leader/msg.republish/1 selected broker 2\n" time="2019-12-25T09:15:51Z" level=info msg="producer/broker/2 state change to [open] on msg.republish/1\n" time="2019-12-25T09:15:51Z" level=info msg="producer/leader/msg.republish/1 state change to [flushing-1]\n" time="2019-12-25T09:15:51Z" level=info msg="producer/leader/msg.republish/1 state change to [normal]\n" time="2019-12-25T09:15:51Z" level=info msg="producer/leader/metric.hit/0 state change to [flushing-1]\n" time="2019-12-25T09:15:51Z" level=info msg="producer/leader/metric.hit/0 state change to [normal]\n" time="2019-12-25T09:15:52Z" level=info msg="Connected to broker at xxxx:9092 (registered as #2)\n" time="2019-12-25T09:15:53Z" level=info msg="Connected to broker at xxxx:9092 (registered as #3)\n"

Problem Description

After 10 minutes, the producer scoket write broken pipe.

chowyu08 avatar Dec 25 '19 09:12 chowyu08

πŸ‘ experiencing similar behaviour with kafka : 0.10.2.1 go : 1.13 sarma : 1.24.1 SyncProducer

talbspx avatar Jan 28 '20 11:01 talbspx

update: this comes from kafka - connections.max.idle.ms broker config that defaults to 10 minutes. kafka removes idle connections for producers that longer than connections.max.idle.ms. two possible ways to overcome this -

  1. Feature request to sarama - since kafka 0.11 it is possible to send a metadata request which will mimic a heartbeat from the producer to the broker in which the connection remains alive.
  2. set connections.max.idle.ms config on the broker side to be high enough value to the expected time your producer should produce a message thus keeping the connection alive all the time.

talbspx avatar Apr 13 '20 05:04 talbspx

So the client does already run a periodic metadata refresh based on backgroundMetadata goroutine in client.go which ticks on client.conf.Metadata.RefreshFrequency which defaults to 10m so you could reduce that to cause it to happen more frequently.

https://github.com/Shopify/sarama/blob/6159078aacb29adb9ecbd2a83ab9c2732c80add2/client.go#L768

However, that is sent to a random broker each time so won't necessarily keep alive your idle connections to all of the brokers in the cluster

dnwe avatar Apr 13 '20 08:04 dnwe

thanks for the quick reply πŸ‘

so i have seen this configuration and set it to 5min (kafka broker remains on 10min for idle connection) on local single kafka broker and this is still happening.

LOGS {"level":"info","version":"dev","time":"2020-04-13T14:15:59+03:00","message":"Scout service starting"} {"level":"info","time":"2020-04-13T14:15:59+03:00","message":"starting to init server config"} {"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:383","time":"2020-04-13T14:15:59+03:00","message":"[Initializing new client]"} {"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:383","time":"2020-04-13T14:15:59+03:00","message":"[ClientID is the default of 'sarama', you should consider setting it to something application-specific.]"} {"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:383","time":"2020-04-13T14:15:59+03:00","message":"[ClientID is the default of 'sarama', you should consider setting it to something application-specific.]"} {"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:15:59+03:00","message":"client/metadata fetching metadata for all topics from broker [localhost:9092]\n"} {"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:15:59+03:00","message":"Connected to broker at [localhost:9092] (unregistered)\n"} {"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:15:59+03:00","message":"client/brokers registered new broker #[1001 %!d(string=localhost:9092)] at %!s(MISSING)"} {"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:383","time":"2020-04-13T14:15:59+03:00","message":"[Successfully initialized new client]"} {"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:383","time":"2020-04-13T14:15:59+03:00","message":"[ClientID is the default of 'sarama', you should consider setting it to something application-specific.]"} {"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:15:59+03:00","message":"producer/broker/[1001] starting up\n"} {"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:15:59+03:00","message":"producer/broker/[1001 %!d(string=serverconfigurations) 0] state change to [open] on %!s(MISSING)/%!d(MISSING)\n"} {"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:15:59+03:00","message":"Connected to broker at [localhost:9092 %!s(int32=1001)] (registered as #%!d(MISSING))\n"}

{"level":"debug","entity_id":"5e8056bb89b48fcb0bc16737","caller":"/Users/talbenshabtay/Desktop/workspace/dev/pxScout-serverconfigurations/db/server_config_source.go:142","time":"2020-04-13T14:16:18+03:00","message":"received entity update"} <-- trigger produce to kafka

{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:16:59+03:00","message":"client/metadata fetching metadata for all topics from broker [localhost:9092]\n"} {"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:17:59+03:00","message":"client/metadata fetching metadata for all topics from broker [localhost:9092]\n"} {"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:18:59+03:00","message":"client/metadata fetching metadata for all topics from broker [localhost:9092]\n"}

<--------- waited 3 minutes so connection is now idle and disconnected ------------------------>

{"level":"debug","entity_id":"5e8056bb89b48fcb0bc16737","caller":"/Users/talbenshabtay/Desktop/workspace/dev/pxScout-serverconfigurations/db/server_config_source.go:142","time":"2020-04-13T14:19:12+03:00","message":"received entity update"} <-- trigger produce to kafka

{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:19:12+03:00","message":"producer/broker/[1001 824634384640] state change to [closing] because %!s(MISSING)\n"} {"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:19:12+03:00","message":"Closed connection to broker [localhost:9092]\n"} {"level":"error","error":"EOF","entity_name":"serverconfigurations","entity_id":"5e8056bb89b48fcb0bc16737","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/perimeterx/[email protected]!r!c7/scout/entity_producer.go:71","time":"2020-04-13T14:19:12+03:00","message":"failed to produce entity"} {"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:19:12+03:00","message":"producer/leader/[serverconfigurations %!s(int32=0) %!s(int32=1001)]/%!d(MISSING) abandoning broker %!d(MISSING)\n"} {"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:19:12+03:00","message":"producer/broker/[1001] input chan closed\n"} {"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:19:12+03:00","message":"producer/broker/[1001] shut down\n"} {"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:19:12+03:00","message":"client/metadata fetching metadata for [[serverconfigurations] localhost:9092] from broker %!s(MISSING)\n"} {"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:383","time":"2020-04-13T14:19:12+03:00","message":"[ClientID is the default of 'sarama', you should consider setting it to something application-specific.]"} {"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:19:12+03:00","message":"producer/broker/[1001] starting up\n"} {"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:19:12+03:00","message":"producer/broker/[1001 %!d(string=serverconfigurations) 0] state change to [open] on %!s(MISSING)/%!d(MISSING)\n"} {"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:19:12+03:00","message":"producer/leader/[serverconfigurations %!s(int32=0) %!s(int32=1001)]/%!d(MISSING) selected broker %!d(MISSING)\n"} {"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:19:12+03:00","message":"Connected to broker at [localhost:9092 %!s(int32=1001)] (registered as #%!d(MISSING))\n"}

@dnwe updated with logs

talbspx avatar Apr 13 '20 09:04 talbspx

@talbspx interesting. If it's just local single broker instance can you change the idle broker config to 2m, set Sarama to 60s refresh and enable debug logs on the broker side? I'll also try and reproduce in the week if I can

dnwe avatar Apr 13 '20 09:04 dnwe

@dnwe yeah np :) will try to have it later on today.

talbspx avatar Apr 13 '20 10:04 talbspx

Interested, we've seen lots of these lately.

d1egoaz avatar Apr 13 '20 16:04 d1egoaz

@d1egoaz @dnwe to me it seems like the is an issue with the way that the channel keys are handled in the kafka broker on v0.10.x . (code looks a bit different under trunk)

is where the server socket polls the Processor channels and update its state using the processor.Selector: https://github.com/apache/kafka/blob/0.10.2.2/core/src/main/scala/kafka/network/SocketServer.scala#L494

then the Selector iterates over the channels and tries to update the channel state https://github.com/apache/kafka/blob/0.10.2.2/clients/src/main/java/org/apache/kafka/common/network/Selector.java#L312-L314

update the idlemanager over each channel key https://github.com/apache/kafka/blob/0.10.2.2/clients/src/main/java/org/apache/kafka/common/network/Selector.java#L339

and eventually release the channel if needed https://github.com/apache/kafka/blob/0.10.2.2/clients/src/main/java/org/apache/kafka/common/network/Selector.java#L324

so i believe the issue is with the SelectedKeys that are retrieved (managed) using the java.nio.Selector https://docs.oracle.com/javase/7/docs/api/java/nio/channels/Selector.html

talbspx avatar Apr 16 '20 12:04 talbspx

I am also interested in this issue. I am experiencing it with: Sarama version v1.26.3 Kafka version 2.6.0 Go version 1.12

This is quite a mission critical application, so I do need to fix this.

sshahar1 avatar Feb 11 '21 17:02 sshahar1

Interested in this issue. I'm experiencing it with:

Sarama version v1.27.2 Go version 1.14

DasTushar avatar Apr 06 '21 12:04 DasTushar

Interested in this issue. I'm experiencing it with:

Sarama version v1.28.0 Go version 1.15

caiofralmeida avatar May 22 '21 21:05 caiofralmeida

Experiencing the same issue with the recent sarama version upgrade v1.29.1 and go-1.15.6. Any help in figuring out the cause is appreciated. The producer client is running on a k8s container with 1 core and 2 GiB memory. Does it have to do anything on the producer client end since the only change done was the sarama version upgrade. Thanks in advance :)

mster429 avatar Aug 31 '21 16:08 mster429

Yes this is a bug. ~I believe the issue will be that the producer dispatcher func simply iterates over the Input channel blocking until new messages are put on the Input queue to send. If Kafka has closed the underlying broker connection in the mean time the producer currently won't realise until it next tries to send a message.~

Update: hmm after looking into this some more, the brokerProducer should account for the dropped connection and re-establish it fairly transparently anyway. Needs more investigation

dnwe avatar Sep 08 '21 09:09 dnwe

Yes this is a bug. ~I believe the issue will be that the producer dispatcher func simply iterates over the Input channel blocking until new messages are put on the Input queue to send. If Kafka has closed the underlying broker connection in the mean time the producer currently won't realise until it next tries to send a message.~

Update: hmm after looking into this some more, the brokerProducer should account for the dropped connection and re-establish it fairly transparently anyway. Needs more investigation

any update?

DesmonPong avatar Oct 29 '21 03:10 DesmonPong

update: this comes from kafka - connections.max.idle.ms broker config that defaults to 10 minutes. kafka removes idle connections for producers that longer than connections.max.idle.ms. two possible ways to overcome this -

  1. Feature request to sarama - since kafka 0.11 it is possible to send a metadata request which will mimic a heartbeat from the producer to the broker in which the connection remains alive.
  2. set connections.max.idle.ms config on the broker side to be high enough value to the expected time your producer should produce a message thus keeping the connection alive all the time.

Hi I met the same issue and could you share how can i send a metadata refresh as you said? Thanks a lot!

baoxc-shopee avatar Jan 25 '22 07:01 baoxc-shopee

Yes this is a bug. ~I believe the issue will be that the producer dispatcher func simply iterates over the Input channel blocking until new messages are put on the Input queue to send. If Kafka has closed the underlying broker connection in the mean time the producer currently won't realise until it next tries to send a message.~

Update: hmm after looking into this some more, the brokerProducer should account for the dropped connection and re-establish it fairly transparently anyway. Needs more investigation

any update

king-freshket avatar Jun 27 '22 04:06 king-freshket

Yes this is a bug. ~I believe the issue will be that the producer dispatcher func simply iterates over the Input channel blocking until new messages are put on the Input queue to send. If Kafka has closed the underlying broker connection in the mean time the producer currently won't realise until it next tries to send a message.~

Update: hmm after looking into this some more, the brokerProducer should account for the dropped connection and re-establish it fairly transparently anyway. Needs more investigation

we are facing the same issue, any update?

ghost avatar Oct 14 '22 13:10 ghost

Interested in this issue. I'm experiencing it with:

Sarama version v1.37.2 Go version 1.18 Confluent Cloud Kafka

david-bergman avatar Nov 03 '22 05:11 david-bergman

Any update on this?

Sarama version v1.37.2 Go version 1.19 Confluent Cloud Kafka

boscar avatar Jan 30 '23 14:01 boscar

Any update on this?

Sarama version v1.36.0 Go version 1.19 Kafka 3.x

3AceShowHand avatar Feb 10 '23 09:02 3AceShowHand

A possible workaround is sending heart beat message from client side which just lookup topic metadata. This workaround works in our product environment.

Ling Jin @.***> 于2023εΉ΄2月10ζ—₯周五 17:59ε†™ι“οΌš

Any update on this?

Sarama version v1.36.0 Go version 1.19 Kafka 3.x

β€” Reply to this email directly, view it on GitHub https://github.com/Shopify/sarama/issues/1565#issuecomment-1425544100, or unsubscribe https://github.com/notifications/unsubscribe-auth/AEDWME4FGEOEUN3JMJY7QVDWWYGRTANCNFSM4J7DXWRQ . You are receiving this because you commented.Message ID: @.***>

HY1310 avatar Feb 10 '23 13:02 HY1310

I am still experiencing this issue with the latest sarama v1.38.2-0.20230327141928-9127f1c432c0 periodically , so once every week or so, when it seems Confluent Kafka does some rolling upgrades on the cluster, i get this error, and then the worker hangs indefinitely. any plans to address this bug in future releases?

david-bergman avatar Jun 23 '23 11:06 david-bergman

With connections.max.idle.ms enabled server side, this should have been fixed with https://github.com/IBM/sarama/pull/2197 and https://github.com/IBM/sarama/pull/2234 both merged

@david-bergman it sounds like you are still seeing issues β€” I wonder if you were inadvertently using sasl v0 in v1.38.1 and missing out on the re-auth. Since https://github.com/IBM/sarama/pull/2572 the default was (put back) to sasl v1 and we also have a lot better protocol support. Can you re-test with the latest release?

dnwe avatar Aug 31 '23 02:08 dnwe

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

github-actions[bot] avatar Dec 12 '23 12:12 github-actions[bot]