[Bug] first_value semantic conflict
Search before asking
- [X] I searched in the issues and found nothing similar.
Paimon version
0.8
Compute Engine
Flink
Minimal reproduce step
I have read this issues: https://github.com/apache/paimon/issues/3020
I discovered this phenomenon:
CREATE TABLE T (
k INT,
a INT,
b VARCHAR,
c VARCHAR,
d VARCHAR,
PRIMARY KEY (k) NOT ENFORCED)
WITH ('merge-engine'='aggregation',
'fields.b.aggregate-function'='first_value',
'fields.c.aggregate-function'='first_non_null_value',
'fields.d.aggregate-function'='first_not_null_value',
'sequence.field' = 'a'
);
INSERT INTO T VALUES (1, 4, '4', '4', '4')
INSERT INTO T VALUES (1, 2, '2', '2', '2')
INSERT INTO T VALUES (1, 3, '3', '3', '3')
The result is [1, 4, 2, 2, 2] , as expected
If I manually trigger a compaction before the third insert, like:
INSERT INTO T VALUES (1, 4, '4', '4', '4')
INSERT INTO T VALUES (1, 2, '2', '2', '2')
CALL sys.compact('default.t', '', '', '', 'sink.parallelism=4')
INSERT INTO T VALUES (1, 3, '3', '3', '3')
The result is [1, 4, 3, 3, 3] , and there is a semantic problem.
For local debugging, in this case, there are only two values merged ([1, 3, 3, 3, 3], [1, 4, 2, 2, 2]) when querying.
It seems that after compaction, the previous sequence information was lost.
What doesn't meet your expectations?
The Doc. description is vague:
I'm not sure if there's something wrong with my understanding. Should the oldest sequence be maintained? Otherwise the compaction will lead to inconsistent first_value semantics
Anything else?
No response
Are you willing to submit a PR?
- [X] I'm willing to submit a PR!
Thanks @luowanghaoyun for report this. IMO, the aggregate function for first, first_non_null_value, last_non_null_value whose value are not align with the latest sequence may encounter this problem, because the actual "version number " of the value is not kept.
I try to introduce an exclusive sequence for these three kind aggregation field, it should solve this problem, as posted in https://github.com/apache/paimon/pull/3101 But I think it's a bit burden for end user, especially for the aggregation table, the default aggregation function is last_non_null_value. And the exclusive sequence enforce user to config an individual sequence field for these three kind aggregation field.
@Aitozi Why not keep the earliest sequence under the mentioned scenarios? I think sequence.field should also be constrainted with the aggregation function.
@liyubin117 If we have serval aggregation function fields, a single sequence.field is not enough. The first_value need earliest sequence, but last_value need the latest sequence
@Aitozi Thanks for claring that! it is actually more complex than I thought.
@Aitozi hi, when is this issue expected to be fixed?
@luowanghaoyun I tested the case in org.apache.paimon.flink.PartialUpdateITCase, and the result is [1, 4, 2, 2, 2] , as expected. I wonder if this issue's(and #3020) status should be updated.
@Test
public void testFirstValueSemantic() {
sql(
"CREATE TABLE t1 (" +
" k INT," +
" a INT," +
" b VARCHAR," +
" c VARCHAR," +
" d VARCHAR," +
"PRIMARY KEY (k) NOT ENFORCED)" +
" WITH ('merge-engine'='aggregation', " +
"'fields.b.aggregate-function'='first_value'," +
"'fields.c.aggregate-function'='first_non_null_value'," +
"'fields.d.aggregate-function'='first_not_null_value'," +
"'sequence.field' = 'a'" +
");");
sql("INSERT INTO t1 VALUES (1, 4, '4', '4', '4')");
sql("INSERT INTO t1 VALUES (1, 2, '2', '2', '2')");
sql("CALL sys.compact('default.t1', '', '', '', 'sink.parallelism=4')");
sql("INSERT INTO t1 VALUES (1, 3, '3', '3', '3')");
assertThat(sql("SELECT * FROM t1")).containsExactly(Row.of(1, 4, "2", "2", "2"));
}