[Bug] aggregate function in partial update inconsistency
Search before asking
- [x] I searched in the issues and found nothing similar.
Paimon version
1.0.0
Compute Engine
Flink
Minimal reproduce step
CREATE TABLE t1 ( k INT, a INT, b INT, g_1 INT, PRIMARY KEY (k) NOT ENFORCED ) WITH ( 'merge-engine' = 'partial-update', 'fields.a.aggregate-function' = 'sum', 'fields.b.aggregate-function' = 'last_non_null_value', 'fields.g_1.sequence-group' = 'a,b' );
INSERT INTO t1 VALUES (1, 1, 1, 1); INSERT INTO t1 VALUES (1, 2, 2, 2); INSERT INTO t1 VALUES (1, 3, 3, 1);
What doesn't meet your expectations?
when g_1 of third record is smaller, third record should be ignored。 I expect the result to be: 1,3,2,2
Anything else?
No response
Are you willing to submit a PR?
- [ ] I'm willing to submit a PR!
"SELECT * FROM t1" current output: 1,6,2,2 expect output: 1,3,2,2
It is sum... should be 6...
It is sum... should be 6...
sum and last_non_null_value are both aggregate functions. The result of sum is 6, why is the result of last_non_null_value not 3?
It is sum... should be 6...
sum and last_non_null_value are both aggregate functions. The result of sum is 6, why is the result of last_non_null_value not 3?
sum agg does not care about the order, but last_non_null_value needs.
My understanding of aggregation for partial update is as follows: aggregate the specified fields using the aggregate function (fields.<field-name>.aggregate-function) after reordering the rows based on the sequence-group field (fields.<field-name>.sequence-group). If the sequence-group field is NULL, discard the corresponding record. And, when the value of the sequence-group fields is equal to that of the preceding row, the corresponding field value to aggregate takes precedence over the previous one. A typical example of this behavior is the first_value function. Perhaps the documentation and test cases should be updated.
It is sum... should be 6...
sum and last_non_null_value are both aggregate functions. The result of sum is 6, why is the result of last_non_null_value not 3?
sumagg does not care about the order, butlast_non_null_valueneeds.
One of the aggregate functions (sum) does not care about order, but the other one (last_non_null_value) does.
There is no mention in the documentation of which aggregate functions do not care about order.
This inconsistency in the aggregate functions is confusing to the user.
The ideal approach is to make the behavior of aggregate functions consistent.
@fantasy2100 @LinMingQiang the case +I(1, 1, 1, 1); +I(1, 2, 2, 2); +I(1, 3, 3, 1); can be considered as aggregation in this order. +I(1, 3, 3, 1); +I(1, 1, 1, 1); +I(1, 2, 2, 2); Thus, the result is +I(1, 6, 2, 2)
I have tested the logic in org.apache.paimon.flink.PartialUpdateITCase, FYI:
@Test
public void testSequenceGroupAggregationFunction() {
for (int i = 1; i <= 2; i++) {
sql(
"CREATE TABLE t" + i +
"(k INT, vsum INT, vproduct INT," +
" vmax INT, vmin INT, vlast_value INT, vlast_non_null_value INT, vlistagg VARCHAR(10), " +
"vbool_and BOOLEAN, vbool_or BOOLEAN, " +
"vfirst_value INT, vfirst_non_null_value INT, " +
"vnested_update Array<Row<nested_id INT, nested_a INT>>, " +
"g_1 INT, " +
"PRIMARY KEY (k) NOT ENFORCED" +
") WITH (" +
"'merge-engine' = 'partial-update'," +
"'fields.vsum.aggregate-function' = 'sum'," +
"'fields.vproduct.aggregate-function' = 'product'," +
"'fields.vmax.aggregate-function' = 'max'," +
"'fields.vmin.aggregate-function' = 'min'," +
"'fields.vlast_value.aggregate-function' = 'last_value'," +
"'fields.vlast_non_null_value.aggregate-function' = 'last_non_null_value'," +
"'fields.vlistagg.aggregate-function' = 'listagg'," +
"'fields.vbool_and.aggregate-function' = 'bool_and'," +
"'fields.vbool_or.aggregate-function' = 'bool_or'," +
"'fields.vfirst_value.aggregate-function' = 'first_value'," +
"'fields.vfirst_non_null_value.aggregate-function' = 'first_non_null_value'," +
"'fields.vnested_update.aggregate-function' = 'nested_update'," +
"'fields.vnested_update.nested-key' = 'nested_id'," +
"'fields.g_1.sequence-group' = 'vsum,vproduct," +
"vmax,vmin,vlast_value,vlast_non_null_value,vlistagg,vbool_and,vbool_or," +
"vfirst_value,vfirst_non_null_value," +
"vnested_update');");
sql("INSERT INTO t" + i + " VALUES (1, 1, 1, " +
"1, 1, 1, 1, '1', " +
"true, false," +
"1, 1, " +
"Array[ROW(1, 1)]," +
"1);");
sql("INSERT INTO t" + i + " VALUES (1, 2, 2, " +
"2, 2, 2, 2, '2', " +
"true, false," +
"2, 2, " +
"Array[ROW(2, 2)]," +
"2);");
}
sql("INSERT INTO t1 VALUES (1, 4, 4, " +
"4, 0, 4, 4, '4', " +
"false, true," +
"3, 3, " +
"Array[ROW(1, 3)]," +
"1);");
sql("INSERT INTO t2 VALUES (1, 4, 4, " +
"4, 0, 4, 4, '4', " +
"false, true," +
"3, 3, " +
"Array[ROW(1, 3)]," +
"CAST(NULL AS INT));");
Row row = Row.of(2, 2);
assertThat(sql("SELECT vlast_value, vlast_non_null_value FROM t1")).containsExactly(row);
assertThat(sql("SELECT vlast_value, vlast_non_null_value FROM t2")).containsExactly(row);
List<Row> expected = new ArrayList<>();
expected.add(Row.of(1, 1));
expected.add(Row.of(2, 2));
assertThat(sql("SELECT nested_id, nested_a FROM t1, UNNEST(vnested_update)"))
.containsExactlyInAnyOrderElementsOf(expected);
assertThat(sql("SELECT nested_id, nested_a FROM t2, UNNEST(vnested_update)"))
.containsExactlyInAnyOrderElementsOf(expected);
Row t1Row = Row.of(7, 8, 4, 0, "4,1,2", false, true, 3, 3);
Row t2Row = Row.of(3, 2, 2, 1, "1,2", true, false, 1, 1);
assertThat(sql("SELECT vsum, vproduct, vmax, vmin, vlistagg, vbool_and, " +
"vbool_or, vfirst_value, vfirst_non_null_value FROM t1")).containsExactly(t1Row);
assertThat(sql("SELECT vsum, vproduct, vmax, vmin, vlistagg, vbool_and, " +
"vbool_or, vfirst_value, vfirst_non_null_value FROM t2")).containsExactly(t2Row);
}
The following logic is consistent across all aggregation functions(except count).
The records having same primary-key will be aggregated, and the logic is:
1. when all of the `sequence-group` fields are NULL, discard the record.
2. when some of the `sequence-group` fields are not NULL and those non-NULL fields valued smaller/greater than that of the preceding row, performs aggregation on the specified fields using the aggregate function (fields.<field-name>.aggregate-function) after reordering the rows based on the sequence-group field (fields.<field-name>.sequence-group).
3. when some of the `sequence-group` fields are not NULL and those non-NULL fields valued equal to that of the preceding row, the corresponding field value to aggregate takes precedence over the previous one. A typical example of this behavior is the first_value function.
@dark2momo Thank you very much for your reply, your explanation is very clear.
I have one more question about reordering the rows based on the sequence-group field.
Consider the following scenario:
CREATE TABLE t1 ( k INT, a INT, b INT, g_1 INT, PRIMARY KEY (k) NOT ENFORCED ) WITH ( 'merge-engine' = 'partial-update', 'fields.a.aggregate-function' = 'sum', 'fields.b.aggregate-function' = 'first_value', 'fields.g_1.sequence-group' = 'a,b' );
INSERT INTO t1 VALUES (1, 1, 1, 1); INSERT INTO t1 VALUES (1, 2, 2, 2); INSERT INTO t1 VALUES (1, 3, 3, 1);
SELECT * FROM t1 output 1,6,3,2。
From the output it is inferred that the order of the records after reordering is +I(1, 3, 3, 1); +I(1, 1, 1, 1); +I(1, 2, 2, 2);
My question is, why is the order of records after reordering not +I(1, 1, 1, 1); +I(1, 3, 3, 1); +I(1, 2, 2, 2);
SELECT * FROM t1 will output 1,6,1,2 according to the second reordering method.
@fantasy2100 +1, i have the same question. It may be related to the implementation… I’m not sure if it’s by design and would like to understand how it was considered.
@fantasy2100 +1, i have the same question. It may be related to the implementation… I’m not sure if it’s by design and would like to understand how it was considered.
Me too.