pulsar-client-go
pulsar-client-go copied to clipboard
[Bug] Cannot receive txn message after previous message be aborted
Expected behavior
- Receive committed messages
- Even if the producer send a normal message, consumer cannot receive an abort message
Actual behavior
-
Start a consumer Output:
-
Send 3 normal messages (seq: 0,1,2) Msg received: Normal msg. seq: 0 Msg received: Normal msg. seq: 1 Msg received: Normal msg. seq: 2
-
Send 3 txn messages (seq: 0: commit,1: abort, 2: commit) Msg received: Transaction msg with commit. seq: 0
But also expect the message of seq 3 should be received.
producer log shown txn status is 4, 5, 4
2024/02/27 12:37:56 TxInfo 1 {0 64}
2024/02/27 12:37:56 Send msg 17.589083ms, 862:3:0,
- Send 3 normal messages Msg received: Transaction msg with abort. seq: 1 ** // <----- should not be received ** Msg received: Transaction msg with commit. seq: 2 Msg received: Normal msg. seq: 0 Msg received: Normal msg. seq: 1 Msg received: Normal msg. seq: 2
Steps to reproduce
// pseudo code
- Consumer:
client := pulsar.NewClient({EnableTransaction: true})
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Name: "consumer",
Topic: tp,
SubscriptionName: "consumer-sub-name", // consumeArgs.SubscriptionName,
EnableBatchIndexAcknowledgment: true, // consumeArgs.EnableBatchIndexAck,
EnableAutoScaledReceiverQueueSize: false, // consumeArgs.EnableAutoScaledReceiverQueueSize,
Type: pulsar.Exclusive, // consumeArgs.SubscriptionType,
SubscriptionMode: pulsar.NonDurable, // consumeArgs.SubscriptionMode,
SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
AckWithResponse: true,
})
for msg := range consumer.Chan() {
log.Infof("Msg received: %v", string(msg.Payload()))
}
- Producer:
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Name: "p-name",
Topic: tp, // produceArgs.Topic,
MaxPendingMessages: 10, // produceArgs.ProducerQueueSize,
BatchingMaxPublishDelay: 500 * time.Millisecond, // time.Millisecond * time.Duration(produceArgs.BatchingTimeMillis),
BatchingMaxSize: uint(1024 * 1024 * 10), // produceArgs.BatchingMaxSize * 1024,
BatchingMaxMessages: 1024, // produceArgs.BatchingNumMessages,
SendTimeout: 0 * time.Second,
CompressionType: pulsar.LZ4,
})
isInTx := true
ack := "commit" // // means: err = tx.Commit(context.TODO())
sendMsg(client, producer, isInTx, ack, "0")
ack = "abort" // means: err = tx.Abort(context.TODO())
sendMsg(client, producer, isInTx, ack, "1")
ack = "commit"
sendMsg(client, producer, isInTx, ack, "2")
System configuration
Pulsar version: Current version of pulsar is: 3.2.0 (standalone)