sarama
sarama copied to clipboard
producer write broken pipe
Versions
V1.24.1
Sarama | Kafka | Go |
---|---|---|
1.24.1 | Kafka 2.1.1 | 1.13.1 |
Configuration
What configuration values are you using for Sarama and Kafka?
conf := sarama.NewConfig()
conf.Version = sarama.V2_0_0_0
conf.Producer.RequiredAcks = sarama.WaitForLocal
conf.ChannelBufferSize = 1024
conf.Net.KeepAlive = 30 * time.Second
Logs
logs: CLICK ME
time="2019-12-25T09:15:51Z" level=info msg="producer/broker/3 state change to [closing] because write tcp xxxxx:59382->xxxx:9092: write: broken pipe\n" time="2019-12-25T09:15:51Z" level=info msg="producer/broker/2 state change to [closing] because write tcp xxxx:33336->xxxx:9092: write: broken pipe\n" time="2019-12-25T09:15:51Z" level=info msg="Closed connection to broker xxxx:9092\n" time="2019-12-25T09:15:51Z" level=info msg="producer/leader/metric.transfer/0 state change to [retrying-1]\n" time="2019-12-25T09:15:51Z" level=info msg="producer/leader/metric.transfer/0 abandoning broker 3\n" time="2019-12-25T09:15:51Z" level=info msg="Closed connection to broker xxxx:9092\n" time="2019-12-25T09:15:51Z" level=info msg="producer/broker/3 input chan closed\n" time="2019-12-25T09:15:51Z" level=info msg="producer/broker/3 shut down\n" time="2019-12-25T09:15:51Z" level=info msg="producer/leader/msg.republish/1 state change to [retrying-1]\n" time="2019-12-25T09:15:51Z" level=info msg="producer/leader/msg.republish/1 abandoning broker 2\n" time="2019-12-25T09:15:51Z" level=info msg="producer/leader/metric.hit/0 state change to [retrying-1]\n" time="2019-12-25T09:15:51Z" level=info msg="producer/leader/metric.hit/0 abandoning broker 2\n" time="2019-12-25T09:15:51Z" level=info msg="producer/broker/2 input chan closed\n" time="2019-12-25T09:15:51Z" level=info msg="producer/broker/2 shut down\n" time="2019-12-25T09:15:51Z" level=info msg="client/metadata fetching metadata for [metric.transfer] from broker xxxx:9092\n" time="2019-12-25T09:15:51Z" level=info msg="client/metadata fetching metadata for [msg.republish] from broker xxxx:9092\n" time="2019-12-25T09:15:51Z" level=info msg="client/metadata fetching metadata for [metric.hit] from broker xxxx:9092\n" time="2019-12-25T09:15:51Z" level=info msg="client/brokers replaced registered broker #1 with xxxx:9092" time="2019-12-25T09:15:51Z" level=info msg="ClientID is the default of 'sarama', you should consider setting it to something application-specific." time="2019-12-25T09:15:51Z" level=info msg="producer/broker/3 starting up\n" time="2019-12-25T09:15:51Z" level=info msg="producer/broker/3 state change to [open] on metric.transfer/0\n" time="2019-12-25T09:15:51Z" level=info msg="client/brokers replaced registered broker #1 with xxxx:9092" time="2019-12-25T09:15:51Z" level=info msg="ClientID is the default of 'sarama', you should consider setting it to something application-specific." time="2019-12-25T09:15:51Z" level=info msg="producer/leader/metric.transfer/0 selected broker 3\n" time="2019-12-25T09:15:51Z" level=info msg="producer/leader/metric.transfer/0 state change to [flushing-1]\n" time="2019-12-25T09:15:51Z" level=info msg="producer/leader/metric.transfer/0 state change to [normal]\n" time="2019-12-25T09:15:51Z" level=info msg="producer/broker/2 starting up\n" time="2019-12-25T09:15:51Z" level=info msg="producer/broker/2 state change to [open] on metric.ruleengine.hit/0\n" time="2019-12-25T09:15:51Z" level=info msg="client/brokers replaced registered broker #1 with xxxx:9092" time="2019-12-25T09:15:51Z" level=info msg="producer/leader/metric.hit/0 selected broker 2\n" time="2019-12-25T09:15:51Z" level=info msg="producer/leader/msg.republish/1 selected broker 2\n" time="2019-12-25T09:15:51Z" level=info msg="producer/broker/2 state change to [open] on msg.republish/1\n" time="2019-12-25T09:15:51Z" level=info msg="producer/leader/msg.republish/1 state change to [flushing-1]\n" time="2019-12-25T09:15:51Z" level=info msg="producer/leader/msg.republish/1 state change to [normal]\n" time="2019-12-25T09:15:51Z" level=info msg="producer/leader/metric.hit/0 state change to [flushing-1]\n" time="2019-12-25T09:15:51Z" level=info msg="producer/leader/metric.hit/0 state change to [normal]\n" time="2019-12-25T09:15:52Z" level=info msg="Connected to broker at xxxx:9092 (registered as #2)\n" time="2019-12-25T09:15:53Z" level=info msg="Connected to broker at xxxx:9092 (registered as #3)\n"
Problem Description
After 10 minutes, the producer scoket write broken pipe.
π experiencing similar behaviour with kafka : 0.10.2.1 go : 1.13 sarma : 1.24.1 SyncProducer
update:
this comes from kafka - connections.max.idle.ms
broker config that defaults to 10 minutes.
kafka removes idle connections for producers that longer than connections.max.idle.ms
.
two possible ways to overcome this -
- Feature request to sarama - since kafka 0.11 it is possible to send a metadata request which will mimic a heartbeat from the producer to the broker in which the connection remains alive.
- set
connections.max.idle.ms
config on the broker side to be high enough value to the expected time your producer should produce a message thus keeping the connection alive all the time.
So the client does already run a periodic metadata refresh based on backgroundMetadata goroutine in client.go which ticks on client.conf.Metadata.RefreshFrequency
which defaults to 10m so you could reduce that to cause it to happen more frequently.
https://github.com/Shopify/sarama/blob/6159078aacb29adb9ecbd2a83ab9c2732c80add2/client.go#L768
However, that is sent to a random broker each time so won't necessarily keep alive your idle connections to all of the brokers in the cluster
thanks for the quick reply π
so i have seen this configuration and set it to 5min (kafka broker remains on 10min for idle connection) on local single kafka broker and this is still happening.
LOGS
{"level":"info","version":"dev","time":"2020-04-13T14:15:59+03:00","message":"Scout service starting"}
{"level":"info","time":"2020-04-13T14:15:59+03:00","message":"starting to init server config"}
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:383","time":"2020-04-13T14:15:59+03:00","message":"[Initializing new client]"}
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:383","time":"2020-04-13T14:15:59+03:00","message":"[ClientID is the default of 'sarama', you should consider setting it to something application-specific.]"}
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:383","time":"2020-04-13T14:15:59+03:00","message":"[ClientID is the default of 'sarama', you should consider setting it to something application-specific.]"}
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:15:59+03:00","message":"client/metadata fetching metadata for all topics from broker [localhost:9092]\n"}
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:15:59+03:00","message":"Connected to broker at [localhost:9092] (unregistered)\n"}
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:15:59+03:00","message":"client/brokers registered new broker #[1001 %!d(string=localhost:9092)] at %!s(MISSING)"}
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:383","time":"2020-04-13T14:15:59+03:00","message":"[Successfully initialized new client]"}
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:383","time":"2020-04-13T14:15:59+03:00","message":"[ClientID is the default of 'sarama', you should consider setting it to something application-specific.]"}
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:15:59+03:00","message":"producer/broker/[1001] starting up\n"}
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:15:59+03:00","message":"producer/broker/[1001 %!d(string=serverconfigurations) 0] state change to [open] on %!s(MISSING)/%!d(MISSING)\n"}
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:15:59+03:00","message":"Connected to broker at [localhost:9092 %!s(int32=1001)] (registered as #%!d(MISSING))\n"}
{"level":"debug","entity_id":"5e8056bb89b48fcb0bc16737","caller":"/Users/talbenshabtay/Desktop/workspace/dev/pxScout-serverconfigurations/db/server_config_source.go:142","time":"2020-04-13T14:16:18+03:00","message":"received entity update"} <-- trigger produce to kafka
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:16:59+03:00","message":"client/metadata fetching metadata for all topics from broker [localhost:9092]\n"}
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:17:59+03:00","message":"client/metadata fetching metadata for all topics from broker [localhost:9092]\n"}
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:18:59+03:00","message":"client/metadata fetching metadata for all topics from broker [localhost:9092]\n"}
<--------- waited 3 minutes so connection is now idle and disconnected ------------------------>
{"level":"debug","entity_id":"5e8056bb89b48fcb0bc16737","caller":"/Users/talbenshabtay/Desktop/workspace/dev/pxScout-serverconfigurations/db/server_config_source.go:142","time":"2020-04-13T14:19:12+03:00","message":"received entity update"} <-- trigger produce to kafka
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:19:12+03:00","message":"producer/broker/[1001 824634384640] state change to [closing] because %!s(MISSING)\n"}
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:19:12+03:00","message":"Closed connection to broker [localhost:9092]\n"}
{"level":"error","error":"EOF","entity_name":"serverconfigurations","entity_id":"5e8056bb89b48fcb0bc16737","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/perimeterx/[email protected]!r!c7/scout/entity_producer.go:71","time":"2020-04-13T14:19:12+03:00","message":"failed to produce entity"}
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:19:12+03:00","message":"producer/leader/[serverconfigurations %!s(int32=0) %!s(int32=1001)]/%!d(MISSING) abandoning broker %!d(MISSING)\n"}
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:19:12+03:00","message":"producer/broker/[1001] input chan closed\n"}
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:19:12+03:00","message":"producer/broker/[1001] shut down\n"}
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:19:12+03:00","message":"client/metadata fetching metadata for [[serverconfigurations] localhost:9092] from broker %!s(MISSING)\n"}
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:383","time":"2020-04-13T14:19:12+03:00","message":"[ClientID is the default of 'sarama', you should consider setting it to something application-specific.]"}
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:19:12+03:00","message":"producer/broker/[1001] starting up\n"}
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:19:12+03:00","message":"producer/broker/[1001 %!d(string=serverconfigurations) 0] state change to [open] on %!s(MISSING)/%!d(MISSING)\n"}
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:19:12+03:00","message":"producer/leader/[serverconfigurations %!s(int32=0) %!s(int32=1001)]/%!d(MISSING) selected broker %!d(MISSING)\n"}
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:19:12+03:00","message":"Connected to broker at [localhost:9092 %!s(int32=1001)] (registered as #%!d(MISSING))\n"}
@dnwe updated with logs
@talbspx interesting. If it's just local single broker instance can you change the idle broker config to 2m, set Sarama to 60s refresh and enable debug logs on the broker side? I'll also try and reproduce in the week if I can
@dnwe yeah np :) will try to have it later on today.
Interested, we've seen lots of these lately.
@d1egoaz @dnwe to me it seems like the is an issue with the way that the channel keys are handled in the kafka broker on v0.10.x . (code looks a bit different under trunk)
is where the server socket polls the Processor channels and update its state using the processor.Selector: https://github.com/apache/kafka/blob/0.10.2.2/core/src/main/scala/kafka/network/SocketServer.scala#L494
then the Selector iterates over the channels and tries to update the channel state https://github.com/apache/kafka/blob/0.10.2.2/clients/src/main/java/org/apache/kafka/common/network/Selector.java#L312-L314
update the idlemanager over each channel key https://github.com/apache/kafka/blob/0.10.2.2/clients/src/main/java/org/apache/kafka/common/network/Selector.java#L339
and eventually release the channel if needed https://github.com/apache/kafka/blob/0.10.2.2/clients/src/main/java/org/apache/kafka/common/network/Selector.java#L324
so i believe the issue is with the SelectedKeys that are retrieved (managed) using the java.nio.Selector https://docs.oracle.com/javase/7/docs/api/java/nio/channels/Selector.html
I am also interested in this issue. I am experiencing it with: Sarama version v1.26.3 Kafka version 2.6.0 Go version 1.12
This is quite a mission critical application, so I do need to fix this.
Interested in this issue. I'm experiencing it with:
Sarama version v1.27.2 Go version 1.14
Interested in this issue. I'm experiencing it with:
Sarama version v1.28.0 Go version 1.15
Experiencing the same issue with the recent sarama version upgrade v1.29.1 and go-1.15.6. Any help in figuring out the cause is appreciated. The producer client is running on a k8s container with 1 core and 2 GiB memory. Does it have to do anything on the producer client end since the only change done was the sarama version upgrade. Thanks in advance :)
Yes this is a bug. ~I believe the issue will be that the producer dispatcher func simply iterates over the Input
channel blocking until new messages are put on the Input queue to send. If Kafka has closed the underlying broker connection in the mean time the producer currently won't realise until it next tries to send a message.~
Update: hmm after looking into this some more, the brokerProducer should account for the dropped connection and re-establish it fairly transparently anyway. Needs more investigation
Yes this is a bug. ~I believe the issue will be that the producer dispatcher func simply iterates over the
Input
channel blocking until new messages are put on the Input queue to send. If Kafka has closed the underlying broker connection in the mean time the producer currently won't realise until it next tries to send a message.~Update: hmm after looking into this some more, the brokerProducer should account for the dropped connection and re-establish it fairly transparently anyway. Needs more investigation
any update?
update: this comes from kafka -
connections.max.idle.ms
broker config that defaults to 10 minutes. kafka removes idle connections for producers that longer thanconnections.max.idle.ms
. two possible ways to overcome this -
- Feature request to sarama - since kafka 0.11 it is possible to send a metadata request which will mimic a heartbeat from the producer to the broker in which the connection remains alive.
- set
connections.max.idle.ms
config on the broker side to be high enough value to the expected time your producer should produce a message thus keeping the connection alive all the time.
Hi I met the same issue and could you share how can i send a metadata refresh as you said? Thanks a lot!
Yes this is a bug. ~I believe the issue will be that the producer dispatcher func simply iterates over the
Input
channel blocking until new messages are put on the Input queue to send. If Kafka has closed the underlying broker connection in the mean time the producer currently won't realise until it next tries to send a message.~Update: hmm after looking into this some more, the brokerProducer should account for the dropped connection and re-establish it fairly transparently anyway. Needs more investigation
any update
Yes this is a bug. ~I believe the issue will be that the producer dispatcher func simply iterates over the
Input
channel blocking until new messages are put on the Input queue to send. If Kafka has closed the underlying broker connection in the mean time the producer currently won't realise until it next tries to send a message.~Update: hmm after looking into this some more, the brokerProducer should account for the dropped connection and re-establish it fairly transparently anyway. Needs more investigation
we are facing the same issue, any update?
Interested in this issue. I'm experiencing it with:
Sarama version v1.37.2 Go version 1.18 Confluent Cloud Kafka
Any update on this?
Sarama version v1.37.2 Go version 1.19 Confluent Cloud Kafka
Any update on this?
Sarama version v1.36.0 Go version 1.19 Kafka 3.x
A possible workaround is sending heart beat message from client side which just lookup topic metadata. This workaround works in our product environment.
Ling Jin @.***> δΊ2023εΉ΄2ζ10ζ₯ε¨δΊ 17:59ειοΌ
Any update on this?
Sarama version v1.36.0 Go version 1.19 Kafka 3.x
β Reply to this email directly, view it on GitHub https://github.com/Shopify/sarama/issues/1565#issuecomment-1425544100, or unsubscribe https://github.com/notifications/unsubscribe-auth/AEDWME4FGEOEUN3JMJY7QVDWWYGRTANCNFSM4J7DXWRQ . You are receiving this because you commented.Message ID: @.***>
I am still experiencing this issue with the latest sarama v1.38.2-0.20230327141928-9127f1c432c0 periodically , so once every week or so, when it seems Confluent Kafka does some rolling upgrades on the cluster, i get this error, and then the worker hangs indefinitely. any plans to address this bug in future releases?
With connections.max.idle.ms
enabled server side, this should have been fixed with https://github.com/IBM/sarama/pull/2197 and https://github.com/IBM/sarama/pull/2234 both merged
@david-bergman it sounds like you are still seeing issues β I wonder if you were inadvertently using sasl v0 in v1.38.1 and missing out on the re-auth. Since https://github.com/IBM/sarama/pull/2572 the default was (put back) to sasl v1 and we also have a lot better protocol support. Can you re-test with the latest release?
Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.