paimon icon indicating copy to clipboard operation
paimon copied to clipboard

[api] Add support for delimited identifiers

Open atallahade opened this issue 5 months ago • 3 comments

Purpose

Linked issue: close #6039

As explained in the issue, Flink procedures don't parse delimited identifiers correctly when they contain a dot. This change correctly extract the database and table name when using the Identifier.fromString method.

This is the list of all places I found to use the method:

$ grep -r "Identifier\.fromString" --include="*.java" --exclude="*Test*.java" --exclude-dir="test" .
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CreateOrReplaceTagBaseProcedure.java:        Table table = catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java:        Identifier identifier = Identifier.fromString(tableId);
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java:        Identifier identifier = Identifier.fromString(tableId);
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java:        Identifier identifier = Identifier.fromString(tableId);
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedure.java:        Table table = catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java:        Identifier identifier = Identifier.fromString(targetTableId);
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RollbackToTimestampProcedure.java:        Table table = catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java:        Table table = catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java:        Table table = catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java:        Identifier sourceTableId = Identifier.fromString(sourceTablePath);
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java:        Identifier targetTableId = Identifier.fromString(targetTablePath);
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java:        Identifier sourceTableId = Identifier.fromString(sourceTablePath);
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java:        Identifier targetTableId = Identifier.fromString(targetTablePath);
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java:        ((FileStoreTable) catalog.getTable(Identifier.fromString(tableId))).purgeFiles();
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java:                (FileStoreTable) catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java:                (FileStoreTable) catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedure.java:        Table table = catalog.getTable(Identifier.fromString(sourceTablePath));
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java:                (FileStoreTable) catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java:        Table table = catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/ogg/OggRecordParser.java:        return Identifier.fromString(node.asText());
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java:        Identifier identifier = Identifier.fromString(record.getValue().f1);
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializer.java:                    Identifier.fromString(new String(serializedKey, StandardCharsets.UTF_8));
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateOrReplaceTagBaseProcedure.java:        Table table = catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java:        Identifier identifier = Identifier.fromString(tableId);
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/TriggerTagAutomaticCreationProcedure.java:        ((FileStoreTable) catalog.getTable(Identifier.fromString(tableId)))
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/AlterViewDialectProcedure.java:        Identifier identifier = Identifier.fromString(view);
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java:        Identifier identifier = Identifier.fromString(tableId);
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropFunctionProcedure.java:        Identifier identifier = Identifier.fromString(function);
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/FastForwardProcedure.java:        Table table = catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/AlterColumnDefaultValueProcedure.java:        Identifier identifier = Identifier.fromString(table);
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java:        Table table = catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RescaleProcedure.java:        Identifier identifier = Identifier.fromString(tableId);
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java:        Identifier identifier = Identifier.fromString(tableId);
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveUnexistingFilesProcedure.java:        Identifier identifier = Identifier.fromString(tableId);
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedure.java:        Table table = catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java:        Identifier identifier = Identifier.fromString(targetTableId);
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToTimestampProcedure.java:        Table table = catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateFunctionProcedure.java:        Identifier identifier = Identifier.fromString(function);
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java:        Table table = catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java:        Table table = catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java:        Identifier sourceTableId = Identifier.fromString(sourceTablePath);
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java:        Identifier targetTableId = Identifier.fromString(targetPaimonTablePath);
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RenameTagProcedure.java:        Table table = catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java:        Identifier sourceTableId = Identifier.fromString(sourceTable);
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java:                Identifier.fromString(
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java:        return catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java:        ((FileStoreTable) catalog.getTable(Identifier.fromString(tableId))).purgeFiles();
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java:                (FileStoreTable) catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RewriteFileIndexProcedure.java:        Table table = catalog.getTable(Identifier.fromString(sourceTablePath));
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java:                (FileStoreTable) catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/AlterFunctionProcedure.java:        Identifier identifier = Identifier.fromString(function);
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java:        Table table = catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/QueryServiceProcedure.java:        Table table = catalog.getTable(Identifier.fromString(tableId));
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java:        Identifier identifier = Identifier.fromString(tableId);
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyFilesUtil.java:                        return ((FileStoreTable) catalog.getTable(Identifier.fromString(key)))
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyFilesUtil.java:                        return pathOfTable(catalog.getTable(Identifier.fromString(key)));
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyManifestFileOperator.java:        Identifier sourceIdentifier = Identifier.fromString(copyFileInfo.getSourceIdentifier());
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/SnapshotHintOperator.java:                    (FileStoreTable) targetCatalog.getTable(Identifier.fromString(identifier));
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyMetaFilesFunction.java:        Identifier sourceIdentifier = Identifier.fromString(sourceIdentifierStr);
./paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyMetaFilesFunction.java:        Identifier targetIdentifier = Identifier.fromString(targetIdentifierStr);
./paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java:            identifier = org.apache.paimon.catalog.Identifier.fromString(tableId);
./paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java:                    org.apache.paimon.catalog.Identifier.fromString(
./paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveUnexistingFilesProcedure.java:                org.apache.paimon.catalog.Identifier.fromString(
./paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java:        Identifier sourceTableId = Identifier.fromString(sourceTable);
./paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java:                        ? Identifier.fromString(sourceTable + TMP_TBL_SUFFIX)
./paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java:                        : Identifier.fromString(targetTable);

AFAICT, this will affect:

  • Flink procedures
  • The Flink CDC OggRecordParser parser
  • The Flink MultiTablesReadOperator source operator
  • The Flink WrappedManifestCommittableSerializer
  • Flink copy related methods
  • Spark procedures

Both Flink and Spark use backticks for delimited identifiers.

Relates to #5390.

Tests

I have added unit tests for the Identifier.fromString method.

Manual test: Setup: Using Flink 1.20.2 sql-client:

CREATE CATALOG paimon WITH (
    'type'='paimon',
    'warehouse'='file:<some_path>'
);
USE CATALOG paimon;

CREATE DATABASE `my.database`;
Flink SQL> CREATE TABLE `my.database`.`my.table` (id int, val int, PRIMARY KEY (id) NOT ENFORCED);
[INFO] Execute statement succeeded.

Flink SQL> INSERT INTO `my.database`.`my.table` VALUES (0, 0);
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: c44d2080ba9515a581d492ffda49fed1


Flink SQL> INSERT INTO `my.database`.`my.table` VALUES (0, 1);
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 2099de090e8f0c9ecf93a8929608ec8b

Two datafiles prior to the compaction:

$ ls my.database.db/my.table/bucket-0/
data-5f35bc70-65f1-4b6d-a20a-4aee1814aa3c-0.parquet  data-82f9e2fb-92bd-4564-a715-b5204eea4f7c-0.parquet

Before:

Flink SQL> CALL sys.compact('`my.database`.`my.table`');
[ERROR] Could not execute SQL statement. Reason:
org.apache.paimon.catalog.Catalog$TableNotExistException: Table `my.database`.`my.table` does not exist.

After:

Flink SQL> CALL sys.compact('`my.database`.`my.table`');
+----------------------------------------+
|                                 result |
+----------------------------------------+
| JobID=a7e46b7d80ec5b7b8af2b2024e8d26b4 |
+----------------------------------------+
1 row in set

A third datafile got created after the compaction, showing the procedure ran:

$ ls my.database.db/my.table/bucket-0/
data-5a6b72a7-53ce-4e8b-982b-a5fca5780e6e-0.parquet  data-5f35bc70-65f1-4b6d-a20a-4aee1814aa3c-0.parquet  data-82f9e2fb-92bd-4564-a715-b5204eea4f7c-0.parquet

API and Format

Documentation

atallahade avatar Aug 07 '25 11:08 atallahade

Relation pr https://github.com/apache/paimon/pull/5390

LinMingQiang avatar Aug 18 '25 00:08 LinMingQiang

@LinMingQiang is there anything we can do to have this PR reviewed / merged?

atallahade avatar Oct 13 '25 13:10 atallahade

@JingsongLi would you be able to have a look please?

atallahade avatar Nov 11 '25 14:11 atallahade