[api] Add support for delimited identifiers
Purpose
Linked issue: close #6039
As explained in the issue, Flink procedures don't parse delimited identifiers correctly when they contain a dot.
This change correctly extract the database and table name when using the Identifier.fromString method.
This is the list of all places I found to use the method:
$ grep -r "Identifier\.fromString" --include="*.java" --exclude="*Test*.java" --exclude-dir="test" .
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CreateOrReplaceTagBaseProcedure.java: Table table = catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java: Identifier identifier = Identifier.fromString(tableId);
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java: Identifier identifier = Identifier.fromString(tableId);
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java: Identifier identifier = Identifier.fromString(tableId);
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedure.java: Table table = catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java: Identifier identifier = Identifier.fromString(targetTableId);
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RollbackToTimestampProcedure.java: Table table = catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java: Table table = catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java: Table table = catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java: Identifier sourceTableId = Identifier.fromString(sourceTablePath);
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java: Identifier targetTableId = Identifier.fromString(targetTablePath);
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java: Identifier sourceTableId = Identifier.fromString(sourceTablePath);
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java: Identifier targetTableId = Identifier.fromString(targetTablePath);
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java: ((FileStoreTable) catalog.getTable(Identifier.fromString(tableId))).purgeFiles();
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java: (FileStoreTable) catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java: (FileStoreTable) catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedure.java: Table table = catalog.getTable(Identifier.fromString(sourceTablePath));
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java: (FileStoreTable) catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java: Table table = catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/ogg/OggRecordParser.java: return Identifier.fromString(node.asText());
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java: Identifier identifier = Identifier.fromString(record.getValue().f1);
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializer.java: Identifier.fromString(new String(serializedKey, StandardCharsets.UTF_8));
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateOrReplaceTagBaseProcedure.java: Table table = catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java: Identifier identifier = Identifier.fromString(tableId);
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/TriggerTagAutomaticCreationProcedure.java: ((FileStoreTable) catalog.getTable(Identifier.fromString(tableId)))
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/AlterViewDialectProcedure.java: Identifier identifier = Identifier.fromString(view);
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java: Identifier identifier = Identifier.fromString(tableId);
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropFunctionProcedure.java: Identifier identifier = Identifier.fromString(function);
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/FastForwardProcedure.java: Table table = catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/AlterColumnDefaultValueProcedure.java: Identifier identifier = Identifier.fromString(table);
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java: Table table = catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RescaleProcedure.java: Identifier identifier = Identifier.fromString(tableId);
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java: Identifier identifier = Identifier.fromString(tableId);
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveUnexistingFilesProcedure.java: Identifier identifier = Identifier.fromString(tableId);
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedure.java: Table table = catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java: Identifier identifier = Identifier.fromString(targetTableId);
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToTimestampProcedure.java: Table table = catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateFunctionProcedure.java: Identifier identifier = Identifier.fromString(function);
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java: Table table = catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java: Table table = catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java: Identifier sourceTableId = Identifier.fromString(sourceTablePath);
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java: Identifier targetTableId = Identifier.fromString(targetPaimonTablePath);
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RenameTagProcedure.java: Table table = catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java: Identifier sourceTableId = Identifier.fromString(sourceTable);
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java: Identifier.fromString(
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java: return catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java: ((FileStoreTable) catalog.getTable(Identifier.fromString(tableId))).purgeFiles();
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java: (FileStoreTable) catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedure.java: Table table = catalog.getTable(Identifier.fromString(sourceTablePath));
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java: (FileStoreTable) catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/AlterFunctionProcedure.java: Identifier identifier = Identifier.fromString(function);
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java: Table table = catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/QueryServiceProcedure.java: Table table = catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java: Identifier identifier = Identifier.fromString(tableId);
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyFilesUtil.java: return ((FileStoreTable) catalog.getTable(Identifier.fromString(key)))
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyFilesUtil.java: return pathOfTable(catalog.getTable(Identifier.fromString(key)));
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyManifestFileOperator.java: Identifier sourceIdentifier = Identifier.fromString(copyFileInfo.getSourceIdentifier());
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/SnapshotHintOperator.java: (FileStoreTable) targetCatalog.getTable(Identifier.fromString(identifier));
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyMetaFilesFunction.java: Identifier sourceIdentifier = Identifier.fromString(sourceIdentifierStr);
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyMetaFilesFunction.java: Identifier targetIdentifier = Identifier.fromString(targetIdentifierStr);
./paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java: identifier = org.apache.paimon.catalog.Identifier.fromString(tableId);
./paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java: org.apache.paimon.catalog.Identifier.fromString(
./paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveUnexistingFilesProcedure.java: org.apache.paimon.catalog.Identifier.fromString(
./paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java: Identifier sourceTableId = Identifier.fromString(sourceTable);
./paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java: ? Identifier.fromString(sourceTable + TMP_TBL_SUFFIX)
./paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java: : Identifier.fromString(targetTable);
AFAICT, this will affect:
- Flink procedures
- The Flink CDC
OggRecordParserparser - The Flink
MultiTablesReadOperatorsource operator - The Flink
WrappedManifestCommittableSerializer - Flink copy related methods
- Spark procedures
Both Flink and Spark use backticks for delimited identifiers.
Relates to #5390.
Tests
I have added unit tests for the Identifier.fromString method.
Manual test: Setup: Using Flink 1.20.2 sql-client:
CREATE CATALOG paimon WITH (
'type'='paimon',
'warehouse'='file:<some_path>'
);
USE CATALOG paimon;
CREATE DATABASE `my.database`;
Flink SQL> CREATE TABLE `my.database`.`my.table` (id int, val int, PRIMARY KEY (id) NOT ENFORCED);
[INFO] Execute statement succeeded.
Flink SQL> INSERT INTO `my.database`.`my.table` VALUES (0, 0);
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: c44d2080ba9515a581d492ffda49fed1
Flink SQL> INSERT INTO `my.database`.`my.table` VALUES (0, 1);
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 2099de090e8f0c9ecf93a8929608ec8b
Two datafiles prior to the compaction:
$ ls my.database.db/my.table/bucket-0/
data-5f35bc70-65f1-4b6d-a20a-4aee1814aa3c-0.parquet data-82f9e2fb-92bd-4564-a715-b5204eea4f7c-0.parquet
Before:
Flink SQL> CALL sys.compact('`my.database`.`my.table`');
[ERROR] Could not execute SQL statement. Reason:
org.apache.paimon.catalog.Catalog$TableNotExistException: Table `my.database`.`my.table` does not exist.
After:
Flink SQL> CALL sys.compact('`my.database`.`my.table`');
+----------------------------------------+
| result |
+----------------------------------------+
| JobID=a7e46b7d80ec5b7b8af2b2024e8d26b4 |
+----------------------------------------+
1 row in set
A third datafile got created after the compaction, showing the procedure ran:
$ ls my.database.db/my.table/bucket-0/
data-5a6b72a7-53ce-4e8b-982b-a5fca5780e6e-0.parquet data-5f35bc70-65f1-4b6d-a20a-4aee1814aa3c-0.parquet data-82f9e2fb-92bd-4564-a715-b5204eea4f7c-0.parquet
API and Format
Documentation
Relation pr https://github.com/apache/paimon/pull/5390
@LinMingQiang is there anything we can do to have this PR reviewed / merged?
@JingsongLi would you be able to have a look please?