[Bug] aggregation options are not verified when executing DDL
Search before asking
- [x] I searched in the issues and found nothing similar.
Paimon version
master : [7f01330]
Compute Engine
Flink
Minimal reproduce step
Basically executing a Test case in PreAggregationITCase.java If you run a ddl like this, since aggregation-function avg is not in the classpath, nothing happens and it run successfully.
@Test
public void testWrongOption() {
// VALUES does not guarantee order, but order is important for list aggregations.
// So we need to sort the input data.
batchSql(
"CREATE TABLE IF NOT EXISTS T4 ("
+ "j INT, k INT, "
+ "a INT, "
+ "b INT,"
+ "PRIMARY KEY (j,k) NOT ENFORCED)"
+ " WITH ('merge-engine'='aggregation', "
+ "'fields.a.aggregate-function'='avg', "
+ "'fields.b.aggregate-function'='last_non_null_value'"
+ ");");
}
You won't get error until executing an insert like this.
@Test
public void testWrongOption() {
// VALUES does not guarantee order, but order is important for list aggregations.
// So we need to sort the input data.
batchSql(
"CREATE TABLE IF NOT EXISTS T4 ("
+ "j INT, k INT, "
+ "a INT, "
+ "b INT,"
+ "PRIMARY KEY (j,k) NOT ENFORCED)"
+ " WITH ('merge-engine'='aggregation', "
+ "'fields.a.aggregate-function'='avg', "
+ "'fields.b.aggregate-function'='last_non_null_value'"
+ ");");
batchSql("INSERT INTO T4 VALUES (1, 2, 1, 2)");
}
The error message is as below:
Caused by: org.apache.paimon.factories.FactoryException: Could not find any factory for identifier 'avg' that implements 'org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory' in the classpath.
Available factory identifiers are:
custom
bool_and
bool_or
collect
first_non_null_value
first_not_null_value
first_value
hll_sketch
last_non_null_value
last_value
listagg
max
merge_map
min
nested_update
primary-key
product
rbm32
rbm64
sum
theta_sketch
at org.apache.paimon.factories.FactoryUtil.discoverFactory(FactoryUtil.java:66)
at org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory.create(FieldAggregatorFactory.java:38)
at org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction$Factory.create(AggregateMergeFunction.java:145)
at org.apache.paimon.mergetree.compact.MergeFunctionFactory.create(MergeFunctionFactory.java:30)
at org.apache.paimon.operation.KeyValueFileStoreWrite.createWriter(KeyValueFileStoreWrite.java:222)
at org.apache.paimon.operation.KeyValueFileStoreWrite.createWriter(KeyValueFileStoreWrite.java:95)
at org.apache.paimon.operation.AbstractFileStoreWrite.createWriterContainer(AbstractFileStoreWrite.java:445)
at org.apache.paimon.operation.AbstractFileStoreWrite.lambda$getWriterWrapper$5(AbstractFileStoreWrite.java:407)
at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
at org.apache.paimon.operation.AbstractFileStoreWrite.getWriterWrapper(AbstractFileStoreWrite.java:406)
at org.apache.paimon.operation.AbstractFileStoreWrite.write(AbstractFileStoreWrite.java:157)
at org.apache.paimon.table.sink.TableWriteImpl.writeAndReturn(TableWriteImpl.java:187)
at org.apache.paimon.flink.sink.StoreSinkWriteImpl.write(StoreSinkWriteImpl.java:195)
at org.apache.paimon.flink.sink.DynamicBucketRowWriteOperator.processElement(DynamicBucketRowWriteOperator.java:54)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
What doesn't meet your expectations?
I think it's suppose to validate the options and throw an exception when we executing a ddl with wrong options instead of throw an exception when we executing the insert.
Anything else?
No response
Are you willing to submit a PR?
- [x] I'm willing to submit a PR!
@qingfei1994 hi, I think that there is no need to validate the existence of aggregate-functions during the DDL stage because paimon already support user-defined aggregate-functions.
In our case, tables are managed centrally on Platform A (e.g., for creation and deletion) and only include the basic paimon dependencies. If we enforce the validation of user-defined aggregate-functions at the DDL stage, the platform would need to frequently update its paimon dependencies.
In fact, user-defined aggregate function dependencies only need to be present at runtime. For example, Flink and Spark both support dynamically adding user dependencies.
@qingfei1994 hi, I think that there is no need to validate the existence of
aggregate-functionsduring the DDL stage because paimon already supportuser-defined aggregate-functions.In our case, tables are managed centrally on Platform A (e.g., for creation and deletion) and only include the basic paimon dependencies. If we enforce the validation of
user-defined aggregate-functionsat the DDL stage, the platform would need to frequently update its paimon dependencies.In fact, user-defined aggregate function dependencies only need to be present at runtime. For example, Flink and Spark both support dynamically adding user dependencies.
Our case is that we mistakenly config an aggregation function that doesn't exist, and only get exception when we execute the insert. That's why I raised this. What you pointed out makes sense but I'm wondering if we can precheck more as when we use partially update there're validation like this.
Caused by: java.lang.IllegalArgumentException: Must use sequence group for aggregation functions.
I agree that user-defined aggregate-functions should be validated in advance, rather than during the actual write stage. Perhaps we could add this validation when creating the Source / Sink, so that it can be detected during the job submission phase.
I agree that
user-defined aggregate-functionsshould be validated in advance, rather than during the actual write stage. Perhaps we could add this validation when creating theSource / Sink, so that it can be detected during the job submission phase.
After investigation, I think the reason it's validated in the write stage is that the MergeFunctionFactory has been lazily created during the write stage.
KeyValueFileStoreWrite.java
protected MergeTreeWriter createWriter() {
.....
return new MergeTreeWriter(
bufferSpillable(),
options.writeBufferSpillDiskSize(),
options.localSortMaxNumFileHandles(),
options.spillCompressOptions(),
ioManager,
compactManager,
restoredMaxSeqNumber,
keyComparator,
mfFactory.create(),
writerFactory,
options.commitForceCompact(),
options.changelogProducer(),
restoreIncrement,
UserDefinedSeqComparator.create(valueType, options));
}
@liming30 Do you think it's appropriate to add a validate function in MergeFunctionFactory something like this, to validate the creation of aggregators in the constructors of KeyValueFileStoreWrite? please suggest.
public interface MergeFunctionFactory<T> extends Serializable {
....
default void validate() {
// TODO
}
@qingfei1994 Sorry for the delayed response. Not only the writer will use it, but the reader also needs to merge. I think we can perform a check on MergeFunctionFactory before submitting the job, and throw an exception in advance if there is an exception. How about adding a validateMergeFunctionFactory method in PrimaryKeyTableUtils? MergeFunctionFactory is created using PrimaryKeyTableUtils.