paimon icon indicating copy to clipboard operation
paimon copied to clipboard

[Bug] aggregation options are not verified when executing DDL

Open qingfei1994 opened this issue 9 months ago • 3 comments

Purpose

Linked issue: close #5079

validate aggregation functions that doesn't exists before creating sink and source, instead of throw exception when insert and reading data

Tests

PreAggregationITCase.NotExistAggregationFunctionITCase PartiallUpdateITCase.testSequenceGroupWithNotExistAgg

API and Format

Documentation

qingfei1994 avatar Mar 13 '25 05:03 qingfei1994

@liming30 could you help review this pr?

qingfei1994 avatar Mar 17 '25 02:03 qingfei1994

The error message may be more clear, e.g. "The not_exist aggregation function is not existing" Or "We can't find function 'not_exist' in current class path.

leaves12138 avatar Mar 31 '25 05:03 leaves12138

The error message may be more clear, e.g. "The not_exist aggregation function is not existing" Or "We can't find function 'not_exist' in current class path.

@leaves12138 The full stack error message will be something like this. In the root trace, there will be something like " Could not find any factory for identifier 'not_exist' that implements 'org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory' in the classpath" ,do you think it's clear enough?

java.lang.RuntimeException: org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'PAIMON.default.test_not_exist'.
Table options are:

'fields.f0.aggregate-function'='not_exist'
'merge-engine'='aggregation'
'scan.infer-parallelism'='false'
	at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:338)
	at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:450)
	at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:227)
	at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:177)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
	at scala.collection.Iterator.foreach(Iterator.scala:937)
	at scala.collection.Iterator.foreach$(Iterator.scala:937)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
	at scala.collection.IterableLike.foreach(IterableLike.scala:70)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike.map(TraversableLike.scala:233)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:177)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1308)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:874)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1107)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
	at org.apache.paimon.flink.CatalogITCaseBase.sql(CatalogITCaseBase.java:147)
	... 68 more
Caused by: org.apache.paimon.factories.FactoryException: Could not find any factory for identifier 'not_exist' that implements 'org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory' in the classpath.

Available factory identifiers are:

custom
bool_and
bool_or
collect
first_non_null_value
first_not_null_value
first_value
hll_sketch
last_non_null_value
last_value
listagg
max
merge_map
min
nested_update
primary-key
product
rbm32
rbm64
sum
theta_sketch
	at org.apache.paimon.factories.FactoryUtil.discoverFactory(FactoryUtil.java:66)
	at org.apache.paimon.table.PrimaryKeyTableUtils.validateMergeFunctionFactory(PrimaryKeyTableUtils.java:88)
	at org.apache.paimon.flink.AbstractFlinkTableFactory.buildPaimonTable(AbstractFlinkTableFactory.java:229)
	at org.apache.paimon.flink.AbstractFlinkTableFactory.createDynamicTableSink(AbstractFlinkTableFactory.java:127)
	at org.apache.paimon.flink.FlinkTableFactory.createDynamicTableSink(FlinkTableFactory.java:70)
	at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:335)
	... 87 more

qingfei1994 avatar Apr 01 '25 11:04 qingfei1994

+1

JingsongLi avatar Jul 07 '25 07:07 JingsongLi