sarama icon indicating copy to clipboard operation
sarama copied to clipboard

sarama does not correctly use s.SaramaConfig.Net.MaxOpenRequests

Open baoxc-shopee opened this issue 3 years ago • 1 comments

Versions

Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.

Sarama Kafka Go
Version 1.30.0 (2021-09-29) 2.6 1.16
Configuration

What configuration values are you using for Sarama and Kafka?

        s.SaramaConfig = sarama.NewConfig()
	s.SaramaConfig.Version = sarama.V1_1_0_0
	s.SaramaConfig.Metadata.Timeout = time.Second * 30

	s.SaramaConfig.Producer.RequiredAcks = sarama.WaitForAll
	

	s.SaramaConfig.Producer.Return.Successes = true
	s.SaramaConfig.Producer.Return.Errors = true

	s.SaramaConfig.Producer.Retry.Max = 0 // disable retry, use application level retry
	s.SaramaConfig.Producer.Flush.Bytes = 1024 * 1024
	s.SaramaConfig.Producer.Flush.Frequency = time.Millisecond
	s.SaramaConfig.Net.MaxOpenRequests = 5


	s.SaramaConfig.Producer.Flush.MaxMessages = 10
	s.SaramaConfig.Producer.Flush.Messages = 10
	s.SaramaConfig.Producer.MaxMessageBytes = 10 * 1024 * 1024

	bytes := RandStringBytes(1024)
	go func() {
		for i:=0; i < 1000 * 80; i++ {
			s.Send(&Message{
				Meta: map[string]interface{}{
					"flag": strconv.Itoa(i),
				},
				Data: bytes,
			})
		}
	}()

	<- time.After(10 * time.Second)

	fmt.Println("qps", len(s.Done))


Problem Description

Like Java client, there should be a configuration like "max.in.flight.requests.per.connection" which is default 5 in Java client. However, in Sarama all the requests are strictly sent one by one:

in "async_producer.go" line 694:


	go withRecover(func() {
		for set := range bridge {
			request := set.buildRequest()

			response, err := broker.Produce(request)

			responses <- &brokerProducerResponse{
				set: set,
				err: err,
				res: response,
			}
		}
		close(responses)
	})

The "Produce" function finally uses "sendNoReceive" in "broker.go" line 853:

func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error {
	responseHeaderVersion := int16(-1)
	if res != nil {
		responseHeaderVersion = res.headerVersion()
	}

	promise, err := b.send(req, res != nil, responseHeaderVersion)
	if err != nil {
		return err
	}

	if promise == nil {
		return nil
	}

	select {
	case buf := <-promise.packets:
		return versionedDecode(buf, res, req.version())
	case err = <-promise.errors:
		return err
	}
}

This design makes that the sending is strictly one send, waiting one request, one send, waiting one request. When the network has some latency, this scheme greatly reduced the client throughput.

I would recommend that it would be better if multiple requests can be send concurrently.

baoxc-shopee avatar Oct 22 '21 07:10 baoxc-shopee

This was fixed in Version 1.31.0 (2022-01-18) https://github.com/Shopify/sarama/blob/main/CHANGELOG.md#version-1310-2022-01-18

feat: allow AsyncProducer to have MaxOpenRequests inflight produce requests per broker by @xujianhai666 in https://github.com/Shopify/sarama/pull/1686

rtreffer-fita avatar Oct 27 '22 11:10 rtreffer-fita

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

github-actions[bot] avatar Aug 19 '23 16:08 github-actions[bot]

Closing as fixed in Version 1.31.0

dnwe avatar Aug 19 '23 19:08 dnwe