paimon icon indicating copy to clipboard operation
paimon copied to clipboard

[Bug] first_value semantic conflict

Open luowanghaoyun opened this issue 1 year ago • 6 comments

Search before asking

  • [X] I searched in the issues and found nothing similar.

Paimon version

0.8

Compute Engine

Flink

Minimal reproduce step

I have read this issues: https://github.com/apache/paimon/issues/3020

I discovered this phenomenon:

CREATE TABLE T (
    k INT,
    a INT,
    b VARCHAR,
    c VARCHAR,
    d VARCHAR,
PRIMARY KEY (k) NOT ENFORCED)
 WITH ('merge-engine'='aggregation', 
'fields.b.aggregate-function'='first_value',
'fields.c.aggregate-function'='first_non_null_value',
'fields.d.aggregate-function'='first_not_null_value',
 'sequence.field' = 'a'
);
INSERT INTO T VALUES (1, 4, '4', '4', '4')
INSERT INTO T VALUES (1, 2, '2', '2', '2')
INSERT INTO T VALUES (1, 3, '3', '3', '3')

The result is [1, 4, 2, 2, 2] , as expected

If I manually trigger a compaction before the third insert, like:

INSERT INTO T VALUES (1, 4, '4', '4', '4')
INSERT INTO T VALUES (1, 2, '2', '2', '2')
CALL sys.compact('default.t', '', '', '', 'sink.parallelism=4')
INSERT INTO T VALUES (1, 3, '3', '3', '3')

The result is [1, 4, 3, 3, 3] , and there is a semantic problem. For local debugging, in this case, there are only two values merged ​​([1, 3, 3, 3, 3], [1, 4, 2, 2, 2]) when querying. It seems that after compaction, the previous sequence information was lost.

What doesn't meet your expectations?

The Doc. description is vague:

1716547483757

I'm not sure if there's something wrong with my understanding. Should the oldest sequence be maintained? Otherwise the compaction will lead to inconsistent first_value semantics

Anything else?

No response

Are you willing to submit a PR?

  • [X] I'm willing to submit a PR!

luowanghaoyun avatar May 24 '24 10:05 luowanghaoyun

Thanks @luowanghaoyun for report this. IMO, the aggregate function for first, first_non_null_value, last_non_null_value whose value are not align with the latest sequence may encounter this problem, because the actual "version number " of the value is not kept.

I try to introduce an exclusive sequence for these three kind aggregation field, it should solve this problem, as posted in https://github.com/apache/paimon/pull/3101 But I think it's a bit burden for end user, especially for the aggregation table, the default aggregation function is last_non_null_value. And the exclusive sequence enforce user to config an individual sequence field for these three kind aggregation field.

Aitozi avatar May 29 '24 01:05 Aitozi

@Aitozi Why not keep the earliest sequence under the mentioned scenarios? I think sequence.field should also be constrainted with the aggregation function.

liyubin117 avatar May 29 '24 03:05 liyubin117

@liyubin117 If we have serval aggregation function fields, a single sequence.field is not enough. The first_value need earliest sequence, but last_value need the latest sequence

Aitozi avatar May 29 '24 03:05 Aitozi

@Aitozi Thanks for claring that! it is actually more complex than I thought.

liyubin117 avatar May 29 '24 04:05 liyubin117

@Aitozi hi, when is this issue expected to be fixed?

luowanghaoyun avatar May 31 '24 03:05 luowanghaoyun

@luowanghaoyun I tested the case in org.apache.paimon.flink.PartialUpdateITCase, and the result is [1, 4, 2, 2, 2] , as expected. I wonder if this issue's(and #3020) status should be updated.

    @Test
    public void testFirstValueSemantic() {
        sql(
                "CREATE TABLE t1 (" +
                        "    k INT," +
                        "    a INT," +
                        "    b VARCHAR," +
                        "    c VARCHAR," +
                        "    d VARCHAR," +
                        "PRIMARY KEY (k) NOT ENFORCED)" +
                        " WITH ('merge-engine'='aggregation', " +
                        "'fields.b.aggregate-function'='first_value'," +
                        "'fields.c.aggregate-function'='first_non_null_value'," +
                        "'fields.d.aggregate-function'='first_not_null_value'," +
                        "'sequence.field' = 'a'" +
                        ");");

        sql("INSERT INTO t1 VALUES (1, 4, '4', '4', '4')");

        sql("INSERT INTO t1 VALUES (1, 2, '2', '2', '2')");

        sql("CALL sys.compact('default.t1', '', '', '', 'sink.parallelism=4')");

        sql("INSERT INTO t1 VALUES (1, 3, '3', '3', '3')");

        assertThat(sql("SELECT * FROM t1")).containsExactly(Row.of(1, 4, "2", "2", "2"));
    }

dark2momo avatar Mar 06 '25 09:03 dark2momo