[Feature][Txn] Support for commit idempotency.
PIP: https://github.com/apache/pulsar/issues/19744
Motivation
-
Message queues such as Kafka and Pulsar can only guarantee the exactly-once semantics provided by the transaction feature under the specific use scenario of
consume-transform-producepattern, that is, a transaction contains both production and consumption. The operations in the transaction include the production on the sink side and the offset submission on the source side. Using the atomicity of the transaction, these two operations are either completed at the same time or not completed at the same time. It does not need to worry about whether the transaction is committed successfully, because regardless of whether it is successful or not, the end-to-end state is consistent before and after. Therefore, transaction feature implemented by Kafka and Pulsar only support commit or abort once, and it is illegal to repeatedly submit commit or abort requests afterwards, that is, they do not support the idempotence of commit operations. -
But in many other use cases, which is different from
consume-transform-producepattern, we need to know the accurate state of the transaction after the commit operation is submitted. For example,-
In the case of
produce-only, the transaction only contains the production operation, and the offset submission operation is not included, which is simillar to RocketMQ. -
The exactly-once semantics guaranteed by Flink is based on the
Two-Phase Commitprotocol implemented by Flink itself. When connecting to an external system, Flink has requirements for external system to ensure the exact once semantics:- Provide transaction functionality
- The transaction commit operation should ensure idempotence.
The details can be found in the following link: https://www.ververica.com/blog/end-to-end-exactly-once-processing-apache-flink-apache-kafka
-
-
Though Kafka do not support for idempotence of commit operations, but Flink-Kafka-Connector do some tricks to achieve the idempotence of commit operations for the last transaction, so that Flink+Kafka can guarantee the exactly-once semantics in most of the cases, but still with some risks.
-
But for Pulsar, it is impossible to achieve any idempotence of commit operations currently, because the implementation of transaction in pulsar is quite different from kafka. I have post a blog to analyze the difference between Pulsar and Kafka. https://blog.csdn.net/m0_43406494/article/details/130344399
Modifications
- Provide the idempotence of commit operations for the transaction in Pulsar, which is disabled by default.
- We will introduce a
TransactionMetadataPreserverto store the terminated transaction metadata which is a component of TC. - Once we catch the
TrsansactionNotFoundexception, we will query theTransactionMetadataPreserverto know the state of the transaction. - Client will attach the
clientNameto the transaction, andTransactionMetadataPreserverwill preserveTransactionMetaPersistCountnumber of transaction metadata for each client.
- We will introduce a
API Changes
- wire protocol change
message CommandNewTxn {
required uint64 request_id = 1;
optional uint64 txn_ttl_seconds = 2 [default = 0];
optional uint64 tc_id = 3 [default = 0];
**optional string client_name = 4;**
}
message CommandEndTxn {
required uint64 request_id = 1;
optional uint64 txnid_least_bits = 2 [default = 0];
optional uint64 txnid_most_bits = 3 [default = 0];
optional TxnAction txn_action = 4;
**optional string client_name = 5;**
}
message TransactionMetadataEntry {
...
**optional string clientName = 13;**
}
enum ServerError {
...
**TransactionPreserverClosed = 26; // Transaction metadata preserver is closed**
}
- client configuration change
@ApiModelProperty(
name = "clientName",
value = "Client name that is used to save transaction metadata."
)
private String clientName;
- broker configuration change
@FieldContext(
category = CATEGORY_TRANSACTION,
doc = "Max number of txnMeta of aborted transaction to persist in each TC."
+ "If the number of terminated transaction is greater than this value, the oldest terminated transaction will be "
+ "removed from the cache and persisted in the store."
+ "default value is 0, disable persistence of terminated transaction."
)
private int TransactionMetaPersistCount = 0;
@FieldContext(
category = CATEGORY_TRANSACTION,
doc = "Time in hour to persist the transaction metadata in TransactionMetadataPreserver."
)
private long TransactionMetaPersistTimeInHour = 72;
@FieldContext(
category = CATEGORY_TRANSACTION,
doc = "Interval in seconds to check the expired transaction metadata in TransactionMetadataPreserver."
)
private long TransactionMetaExpireCheckIntervalInSecond = 300;
Verifying this change
- [x] Make sure that the change passes the CI checks.
(Please pick either of the following options)
This change added tests and can be verified as follows:
(example:)
- Added integration tests for end-to-end deployment with large payloads (10MB)
- Extended integration test for recovery after broker failure
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
- [ ] Dependencies (add or upgrade a dependency)
- [x] The public API
- [ ] The schema
- [ ] The default values of configurations
- [ ] The threading model
- [x] The binary protocol
- [ ] The REST endpoints
- [ ] The admin CLI options
- [ ] The metrics
- [ ] Anything that affects deployment
Documentation
- [ ]
doc - [x]
doc-required - [ ]
doc-not-needed - [ ]
doc-complete
Matching PR in forked repository
PR in forked repository: https://github.com/thetumbled/pulsar/pull/21
Please keep this PR separate so that it can help committers review it. thanks :)
Please keep this PR separate so that it can help committers review it. thanks :)
fixed, this is the solely PR related to PIP-255. https://github.com/apache/pulsar/issues/19744
The pr had no activity for 30 days, mark with Stale label.