sarama icon indicating copy to clipboard operation
sarama copied to clipboard

Transactional Producer send msg always got ErrOutOfOrderSequenceNumber error when brokers reconnected

Open songxinjianqwe opened this issue 1 year ago • 4 comments

Versions

Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.

Sarama Kafka Go
1.40.1 2.6.2 1.19
Configuration

What configuration values are you using for Sarama and Kafka?

	var appName = "tx-id-1"
	var config = sarama.NewConfig()
	config.ClientID = appName
	config.Producer.Idempotent = true
	config.Producer.Return.Errors = true
	config.Producer.Return.Successes = true
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Partitioner = sarama.NewManualPartitioner
	config.Producer.Transaction.Retry.Max = 3
	config.Producer.Transaction.ID = appName
	config.Net.MaxOpenRequests = 1
	config.Producer.Retry.Max = 1
	config.Metadata.Retry.Max = 0
	config.Net.DialTimeout = time.Second * 3
	config.Net.ReadTimeout = time.Second * 3
	config.Net.WriteTimeout = time.Second * 3
Logs

When filing an issue please provide logs from Sarama and Kafka if at all possible. You can set sarama.Logger to a log.Logger to capture Sarama debug output.

logs: CLICK ME

2023/08/21 15:36:04 begin txn start
[Sarama] 2023/08/21 15:36:47 txnmgr/transition [tx-id-1] transition from ProducerTxnStateReady to ProducerTxnStateInTransaction
2023/08/21 15:36:53 begin txn done
2023/08/21 15:36:57 send msg: 10 start 
[Sarama] 2023/08/21 15:36:58 client/metadata fetching metadata for [slowsql_unitest] from broker alikafka-post-cn-zxu39oke500i-3-vpc.alikafka.aliyuncs.com:9092
[Sarama] 2023/08/21 15:36:58 client/brokers registered new broker #103 at 10.130.232.87:9092
[Sarama] 2023/08/21 15:36:58 client/brokers registered new broker #102 at 10.130.232.86:9092
[Sarama] 2023/08/21 15:36:58 producer/broker/102 starting up
[Sarama] 2023/08/21 15:36:58 producer/broker/102 state change to [open] on slowsql_unitest/0
[Sarama] 2023/08/21 15:36:58 producer/leader/slowsql_unitest/0 selected broker 102
[Sarama] 2023/08/21 15:36:58 Connected to broker at 10.130.232.86:9092 (registered as #102)
[Sarama] 2023/08/21 15:36:58 txnmgr/add-partition-to-txn [tx-id-1] successful to add partitions txn &{ThrottleTime:0s Errors:map[slowsql_unitest:[0xc000022310]]}
[Sarama] 2023/08/21 15:36:58 txnmgr/transition [tx-id-1] transition from ProducerTxnStateInTransaction to ProducerTxnStateInError|ProducerTxnStateAbortableError
2023/08/21 15:37:14 send msg: 10 done
2023/08/21 15:37:14 send msg 10 fail, err: kafka server: The broker received an out of order sequence number
2023/08/21 15:37:14 abort txn start
[Sarama] 2023/08/21 15:37:16 producer/txnmgr [tx-id-1] aborting transaction
[Sarama] 2023/08/21 15:37:16 txnmgr/transition [tx-id-1] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateEndTransaction|ProducerTxnStateAbortingTransaction
[Sarama] 2023/08/21 15:37:16 txnmgr/endtxn [tx-id-1] successful to end txn &{ThrottleTime:0s Err:kafka server: Not an error, why are you printing me?}
[Sarama] 2023/08/21 15:37:16 txnmgr/transition [tx-id-1] transition from ProducerTxnStateEndTransaction|ProducerTxnStateAbortingTransaction to ProducerTxnStateReady
[Sarama] 2023/08/21 15:37:16 producer/txnmgr [tx-id-1] transaction aborted
2023/08/21 15:37:18 abort txn done

Problem Description

hi, I am using the transactional api in a sync-send-multi-messages-in-txn scenario, but I found an unexpected case when the network not connectable. The code like this:

		for {
			select {
			case <-ctx.Done():
				return nil
			default:
				log.Printf("begin txn start")
				if err := producer.BeginTxn(); err != nil {
					log.Printf("could not begin transaction: %v", err)
					time.Sleep(time.Second)
					continue
				}
				log.Printf("begin txn done")
				msgList := make([]*sarama.ProducerMessage, 0, 10)
				for j := 0; j < 10; j++ {
					message := sarama.ProducerMessage{Topic: topic, Partition: 0, Key: sarama.StringEncoder(strconv.Itoa(i)), Value: sarama.StringEncoder(msgBody)}
					msgList = append(msgList, &message)
					i += 1
				}
				var commit = true
				for _, msg := range msgList {
					log.Printf("send msg: %s start ", msg.Key)
					_, _, err := producer.SendMessage(msg)
					log.Printf("send msg: %s done", msg.Key)
					if err != nil {
						log.Printf("send msg %s fail, err: %+v", msg.Key, err)
						log.Printf("abort txn start")
						if err := producer.AbortTxn(); err != nil {
							log.Printf("could not abort transaction: %+v", err)
							time.Sleep(time.Second)
							commit = false
							break
						}
						log.Printf("abort txn done")
						time.Sleep(time.Second)
						commit = false
						break
					}
					time.Sleep(time.Second)
				}
				if !commit {
					continue
				}
				log.Printf("commit txn start")
				if err := producer.CommitTxn(); err != nil {
					log.Printf("could not commit transaction: %v\n", err)
					log.Printf("abort txn start")
					if err := producer.AbortTxn(); err != nil {
						log.Printf("could not abort transaction: %v", err)
						time.Sleep(time.Second)
						continue
					}
					log.Printf("abort txn done")
					time.Sleep(time.Second)
					continue
				}
				log.Printf("commit txn done")
				time.Sleep(time.Second)
			}
		}

When brokers die, all messages-sending-call will return error ErrOutOfOrderSequenceNumber (45), until I recreate a sync producer. The case looks like this problem https://github.com/IBM/sarama/issues/1430, and this is a fix for the problem : https://github.com/IBM/sarama/pull/1661. I look at this commit https://github.com/IBM/sarama/commit/ba2b4bc7c3b10ef7b14f7db63f77214cb113867d and I found maybe this line of code is the reason for ErrOutOfOrderSequenceNumber.

func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
	if p.IsTransactional() {
		_ = p.maybeTransitionToErrorState(err)
	}
	// We need to reset the producer ID epoch if we set a sequence number on it, because the broker
	// will never see a message with this number, so we can never continue the sequence.
        // this line! 
	if !p.IsTransactional() && msg.hasSequence {
		Logger.Printf("producer/txnmanager rolling over epoch due to publish failure on %s/%d", msg.Topic, msg.Partition)
		p.bumpIdempotentProducerEpoch()
	}

!p.IsTransactional() && msg.hasSequence will bump epoch only when non-transactional. But I write a similar code of Java to send messages using Kafka Java SDK, and ErrOutOfOrderSequenceNumber will not happen after the brokers reconnect. I found a log printed by Kafak Java SDK:

2023-08-21 15:51:11.980 [INFO] [kafka-producer-network-thread | producer-1] [org.apache.kafka.clients.producer.internals.TransactionManager] @@@traceId=TID: N/A@@@ [Producer clientId=producer-1, transactionalId=1] ProducerId set to 42002 with epoch 46

I think this log means that no matter transaction is enabled, kafka will always bump the epoch after messages sent fail, but in sarama, only idempotent enabled and not transaction enabled will bump the epoch. Is this a bug in sarama ? Could we fix this by removing the transaction predicate in if !p.IsTransactional() && msg.hasSequence condition.

songxinjianqwe avatar Aug 21 '23 08:08 songxinjianqwe

Update: I found a way to fix this, set Version >= 2.5.0 I read transactionManager and found this code

func (p *asyncProducer) maybeTransitionToErrorState(err error) error {
	if errors.Is(err, ErrClusterAuthorizationFailed) ||
		errors.Is(err, ErrProducerFenced) ||
		errors.Is(err, ErrUnsupportedVersion) ||
		errors.Is(err, ErrTransactionalIDAuthorizationFailed) {
		return p.txnmgr.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, err)
	}
	if p.txnmgr.coordinatorSupportsBumpingEpoch && p.txnmgr.currentTxnStatus()&ProducerTxnFlagEndTransaction == 0 {
		p.txnmgr.epochBumpRequired = true
	}
	return p.txnmgr.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, err)
}

and this code:

	if t.client.Config().Version.IsAtLeast(V2_5_0_0) {
		req.Version = 3
		isEpochBump = t.producerID != noProducerID && t.producerEpoch != noProducerEpoch
		t.coordinatorSupportsBumpingEpoch = true
		req.ProducerID = t.producerID
		req.ProducerEpoch = t.producerEpoch
	} else if t.client.Config().Version.IsAtLeast(V2_4_0_0) {
		req.Version = 2
	}

So if we set Version >= 2.5.0, the producer will bump the epoch after messages send fail.

songxinjianqwe avatar Aug 21 '23 09:08 songxinjianqwe

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

github-actions[bot] avatar Jan 26 '24 10:01 github-actions[bot]

@songxinjianqwe yes exactly right, that bit of code is essentially Sarama's client-side implementation of KIP-360: Improve reliability of idempotent/transactional producer / KAFKA-8805: Bump producer epoch following recoverable errors, and it relies upon a broker change (KAFKA-8710) that was made in Kafka 2.5.0

dnwe avatar Jan 26 '24 10:01 dnwe

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

github-actions[bot] avatar Apr 25 '24 12:04 github-actions[bot]