[Bug] The partition expire was not correctly triggered during the commit execution.
Search before asking
- [X] I searched in the issues and found nothing similar.
Paimon version
0.8
Compute Engine
JavaAPI
Minimal reproduce step
According to the examples in the documentation, this is how we perform the commit. Taking BatchWrite as an example, after writing the data, we need to create a new instance of the BatchTableCommit class each time to execute the commit.
// 3. Collect all CommitMessages to a global node and commit
BatchTableCommit commit = writeBuilder.newCommit();
commit.commit(messages);
Based on the following call chain, we can infer that each time a BatchTableCommit instance is created, a TableCommitImpl instance is ultimately created within it. In its constructor, a PartitionExpire instance is passed as a parameter.
- org.apache.paimon.table.sink.BatchWriteBuilderImpl#newCommit
- org.apache.paimon.table.AbstractFileStoreTable#newCommit(java.lang.String)
- org.apache.paimon.table.AbstractFileStoreTable#newCommit(java.lang.String, java.lang.String)
Based on the following code, we can infer that each time a TableCommitImpl instance is created, the org.apache.paimon.AbstractFileStore#newPartitionExpire method is called to create a new PartitionExpire instance.
return new TableCommitImpl(
store().newCommit(commitUser, branchName),
createCommitCallbacks(),
snapshotExpire,
// creates a new PartitionExpire instance
options.writeOnly() ? null : store().newPartitionExpire(commitUser),
options.writeOnly() ? null : store().newTagCreationManager(),
catalogEnvironment.lockFactory().create(),
CoreOptions.fromMap(options()).consumerExpireTime(),
new ConsumerManager(fileIO, path),
coreOptions().snapshotExpireExecutionMode(),
name(),
coreOptions().forceCreatingSnapshot());
Based on the constructor of the PartitionExpire class, we can infer that when the instance is initialized, lastCheck is set to the current time.
public PartitionExpire(
RowType partitionType,
Duration expirationTime,
Duration checkInterval,
String timePattern,
String timeFormatter,
FileStoreScan scan,
FileStoreCommit commit) {
this.partitionKeys = partitionType.getFieldNames();
this.toObjectArrayConverter = new RowDataToObjectArrayConverter(partitionType);
this.expirationTime = expirationTime;
this.checkInterval = checkInterval;
this.timeExtractor = new PartitionTimeExtractor(timePattern, timeFormatter);
this.scan = scan;
this.commit = commit;
// Initialize lastCheck to the current time.
this.lastCheck = LocalDateTime.now();
}
After BatchTableCommit is created, based on the example, we immediately start the commit. When the commit is completed, the org.apache.paimon.operation.PartitionExpire#expire(long) method of the PartitionExpire instance is called, as shown in the following code, to check for partition expiration.
public void expire(long commitIdentifier) {
expire(LocalDateTime.now(), commitIdentifier);
}
@VisibleForTesting
void expire(LocalDateTime now, long commitIdentifier) {
if (now.isAfter(lastCheck.plus(checkInterval))) {
doExpire(now.minus(expirationTime), commitIdentifier);
lastCheck = now;
}
}
But at this time, lastCheck is set to now because it was just initialized. Using the default value checkInterval=1h as an example, lastCheck.plus(checkInterval) would be one hour later. Therefore, now.isAfter(lastCheck.plus(checkInterval)) always results in false, causing the partition expiration to be skipped. And because the BatchTableCommit can only perform a single commit, the next time we execute a commit, we will use a brand new PartitionExpire instance. This causes our commits to always fail to trigger the partition expiration check.
Please help me check if my logic is correct or if there is an issue with my usage.
What doesn't meet your expectations?
The partition expiration parameters set on the table did not take effect because they were not correctly triggered during the commit.
Anything else?
No response
Are you willing to submit a PR?
- [X] I'm willing to submit a PR!
If my logic is correct, then I believe this issue is caused by the PartitionExpire instance not being reused. It can be resolved by recording the instance object in the upper layer AbstractFileStoreTable and reusing this instance object each time a TableCommitImpl instance is created.
I look forward to your response and feedback.
in a stream env, StoreCommitter holds TableCommitImpl, so expiration of partition works as expected。But in a batch env, if you create TableCommitImpl every time, sure there will be a new PartitionExpire every time and PartitionExpire#expire() will not be executed。but in both stream env and batch env, paimon uses TableCommitImpl to commit changes, there's no other way to differ it's a batch env or stream env in TableCommitImpl. But it does have a isStreamingMode which is used in table.newWrite() but not used in table.newCommit(), see withExecutionMode
# BatchWriteBuilderImpl
@Override
public BatchTableWrite newWrite() {
return table.newWrite(commitUser)
.withIgnorePreviousFiles(staticPartition != null)
.withExecutionMode(false);
}
@Override
public BatchTableCommit newCommit() {
InnerTableCommit commit = table.newCommit(commitUser).withOverwrite(staticPartition);
commit.ignoreEmptyCommit(true);
return commit;
}
maybe isStreamingMode should be set when create a TableCommitImpl so a TableCommitImpl can differ stream and batch env and do diffrent things when expire a partition.
Overall, Paimon is a stream-first computing engine, and some designs do not take batch processing into consideration. or maybe they want you to use a seperate Partition Expiration to doExpire. just like what they do in PartitionExpireTest, to new a Expire to do Expiration. And maybe there should be a PartitionExpirationProcedure for spark engine too.
Just need add parameter isEndInput for committer to distinguish batch or streaming mode.
Just need add parameter
isEndInputfor committer to distinguish batch or streaming mode.只需要添加参数isEndInput供提交者区分批处理或流式处理模式。
My issue is not that the submitter cannot distinguish between batch processing and streaming modes, but rather that the BatchWrite I am using cannot be triggered at all. Your commit did not resolve my problem.