[Bug] aggregation options are not verified when executing DDL
Purpose
Linked issue: close #5079
validate aggregation functions that doesn't exists before creating sink and source, instead of throw exception when insert and reading data
Tests
PreAggregationITCase.NotExistAggregationFunctionITCase PartiallUpdateITCase.testSequenceGroupWithNotExistAgg
API and Format
Documentation
@liming30 could you help review this pr?
The error message may be more clear, e.g. "The not_exist aggregation function is not existing" Or "We can't find function 'not_exist' in current class path.
The error message may be more clear, e.g. "The not_exist aggregation function is not existing" Or "We can't find function 'not_exist' in current class path.
@leaves12138 The full stack error message will be something like this. In the root trace, there will be something like " Could not find any factory for identifier 'not_exist' that implements 'org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory' in the classpath" ,do you think it's clear enough?
java.lang.RuntimeException: org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'PAIMON.default.test_not_exist'.
Table options are:
'fields.f0.aggregate-function'='not_exist'
'merge-engine'='aggregation'
'scan.infer-parallelism'='false'
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:338)
at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:450)
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:227)
at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:177)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:177)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1308)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:874)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1107)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
at org.apache.paimon.flink.CatalogITCaseBase.sql(CatalogITCaseBase.java:147)
... 68 more
Caused by: org.apache.paimon.factories.FactoryException: Could not find any factory for identifier 'not_exist' that implements 'org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory' in the classpath.
Available factory identifiers are:
custom
bool_and
bool_or
collect
first_non_null_value
first_not_null_value
first_value
hll_sketch
last_non_null_value
last_value
listagg
max
merge_map
min
nested_update
primary-key
product
rbm32
rbm64
sum
theta_sketch
at org.apache.paimon.factories.FactoryUtil.discoverFactory(FactoryUtil.java:66)
at org.apache.paimon.table.PrimaryKeyTableUtils.validateMergeFunctionFactory(PrimaryKeyTableUtils.java:88)
at org.apache.paimon.flink.AbstractFlinkTableFactory.buildPaimonTable(AbstractFlinkTableFactory.java:229)
at org.apache.paimon.flink.AbstractFlinkTableFactory.createDynamicTableSink(AbstractFlinkTableFactory.java:127)
at org.apache.paimon.flink.FlinkTableFactory.createDynamicTableSink(FlinkTableFactory.java:70)
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:335)
... 87 more
+1