pulsar-client-go icon indicating copy to clipboard operation
pulsar-client-go copied to clipboard

[Bug] Cannot receive txn message after previous message be aborted

Open SmileYun opened this issue 1 year ago • 1 comments

Expected behavior

  1. Receive committed messages
  2. Even if the producer send a normal message, consumer cannot receive an abort message

Actual behavior

  1. Start a consumer Output:

  2. Send 3 normal messages (seq: 0,1,2) Msg received: Normal msg. seq: 0 Msg received: Normal msg. seq: 1 Msg received: Normal msg. seq: 2

  3. Send 3 txn messages (seq: 0: commit,1: abort, 2: commit) Msg received: Transaction msg with commit. seq: 0

But also expect the message of seq 3 should be received.

producer log shown txn status is 4, 5, 4 2024/02/27 12:37:56 TxInfo 1 {0 64} 2024/02/27 12:37:56 Send msg 17.589083ms, 862:3:0, 2024/02/27 12:37:56 Txn be acked. 37.285584ms 4 {MostSigBits:0 LeastSigBits:64} 2024/02/27 12:37:56 Flush 4 {0 64} 2024/02/27 12:37:56 LastSequenceID 1709008676 2024/02/27 12:37:57 TxInfo 1 {0 65} 2024/02/27 12:37:57 Send msg 12.656042ms, 862:5:0, 2024/02/27 12:37:57 Txn be acked. 37.830083ms 5 {MostSigBits:0 LeastSigBits:65} 2024/02/27 12:37:57 Flush 5 {0 65} 2024/02/27 12:37:57 LastSequenceID 1709008677 2024/02/27 12:37:58 TxInfo 1 {0 66} 2024/02/27 12:37:58 Send msg 8.361125ms, 862:6:0, 2024/02/27 12:37:58 Txn be acked. 23.730334ms 4 {MostSigBits:0 LeastSigBits:66} 2024/02/27 12:37:58 Flush 4 {0 66} 2024/02/27 12:37:58 LastSequenceID 1709008678 INFO[0003] Closing producer producerID=1 producer_name=p-name topic="persistent://public/default/topic" INFO[0003] Closed producer producerID=1 producer_name=p-name topic="persistent://public/default/topic"

  1. Send 3 normal messages Msg received: Transaction msg with abort. seq: 1 ** // <----- should not be received ** Msg received: Transaction msg with commit. seq: 2 Msg received: Normal msg. seq: 0 Msg received: Normal msg. seq: 1 Msg received: Normal msg. seq: 2

Steps to reproduce

// pseudo code

  • Consumer:
client := pulsar.NewClient({EnableTransaction: true})
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
		Name:                              "consumer",
		Topic:                             tp,
		SubscriptionName:                  "consumer-sub-name", // consumeArgs.SubscriptionName,
		EnableBatchIndexAcknowledgment:    true,                // consumeArgs.EnableBatchIndexAck,
		EnableAutoScaledReceiverQueueSize: false,               // consumeArgs.EnableAutoScaledReceiverQueueSize,
		Type:                              pulsar.Exclusive,    // consumeArgs.SubscriptionType,
		SubscriptionMode:                  pulsar.NonDurable,   // consumeArgs.SubscriptionMode,
		SubscriptionInitialPosition:       pulsar.SubscriptionPositionEarliest,
		AckWithResponse:                   true,
	})
for msg := range consumer.Chan() {
log.Infof("Msg received: %v", string(msg.Payload()))
} 
  • Producer:
producer, err := client.CreateProducer(pulsar.ProducerOptions{
		Name:                    "p-name",
		Topic:                   tp,                     // produceArgs.Topic,
		MaxPendingMessages:      10,                     // produceArgs.ProducerQueueSize,
		BatchingMaxPublishDelay: 500 * time.Millisecond, // time.Millisecond * time.Duration(produceArgs.BatchingTimeMillis),
		BatchingMaxSize:         uint(1024 * 1024 * 10), // produceArgs.BatchingMaxSize * 1024,
		BatchingMaxMessages:     1024,                   // produceArgs.BatchingNumMessages,
		SendTimeout:             0 * time.Second,
		CompressionType:         pulsar.LZ4,
	})
isInTx := true
ack := "commit" // // means: err = tx.Commit(context.TODO()) 
sendMsg(client, producer, isInTx, ack, "0")

ack = "abort" // means: err = tx.Abort(context.TODO()) 
sendMsg(client, producer, isInTx, ack, "1")

ack = "commit"
sendMsg(client, producer, isInTx, ack, "2")

System configuration

Pulsar version: Current version of pulsar is: 3.2.0 (standalone)

SmileYun avatar Feb 27 '24 10:02 SmileYun