sarama
sarama copied to clipboard
sarama does not correctly use s.SaramaConfig.Net.MaxOpenRequests
Versions
Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.
Sarama | Kafka | Go |
---|---|---|
Version 1.30.0 (2021-09-29) | 2.6 | 1.16 |
Configuration
What configuration values are you using for Sarama and Kafka?
s.SaramaConfig = sarama.NewConfig()
s.SaramaConfig.Version = sarama.V1_1_0_0
s.SaramaConfig.Metadata.Timeout = time.Second * 30
s.SaramaConfig.Producer.RequiredAcks = sarama.WaitForAll
s.SaramaConfig.Producer.Return.Successes = true
s.SaramaConfig.Producer.Return.Errors = true
s.SaramaConfig.Producer.Retry.Max = 0 // disable retry, use application level retry
s.SaramaConfig.Producer.Flush.Bytes = 1024 * 1024
s.SaramaConfig.Producer.Flush.Frequency = time.Millisecond
s.SaramaConfig.Net.MaxOpenRequests = 5
s.SaramaConfig.Producer.Flush.MaxMessages = 10
s.SaramaConfig.Producer.Flush.Messages = 10
s.SaramaConfig.Producer.MaxMessageBytes = 10 * 1024 * 1024
bytes := RandStringBytes(1024)
go func() {
for i:=0; i < 1000 * 80; i++ {
s.Send(&Message{
Meta: map[string]interface{}{
"flag": strconv.Itoa(i),
},
Data: bytes,
})
}
}()
<- time.After(10 * time.Second)
fmt.Println("qps", len(s.Done))
Problem Description
Like Java client, there should be a configuration like "max.in.flight.requests.per.connection" which is default 5 in Java client. However, in Sarama all the requests are strictly sent one by one:
in "async_producer.go" line 694:
go withRecover(func() {
for set := range bridge {
request := set.buildRequest()
response, err := broker.Produce(request)
responses <- &brokerProducerResponse{
set: set,
err: err,
res: response,
}
}
close(responses)
})
The "Produce" function finally uses "sendNoReceive" in "broker.go" line 853:
func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error {
responseHeaderVersion := int16(-1)
if res != nil {
responseHeaderVersion = res.headerVersion()
}
promise, err := b.send(req, res != nil, responseHeaderVersion)
if err != nil {
return err
}
if promise == nil {
return nil
}
select {
case buf := <-promise.packets:
return versionedDecode(buf, res, req.version())
case err = <-promise.errors:
return err
}
}
This design makes that the sending is strictly one send, waiting one request, one send, waiting one request. When the network has some latency, this scheme greatly reduced the client throughput.
I would recommend that it would be better if multiple requests can be send concurrently.
This was fixed in Version 1.31.0 (2022-01-18) https://github.com/Shopify/sarama/blob/main/CHANGELOG.md#version-1310-2022-01-18
feat: allow AsyncProducer to have MaxOpenRequests inflight produce requests per broker by @xujianhai666 in https://github.com/Shopify/sarama/pull/1686
Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.
Closing as fixed in Version 1.31.0