iceberg icon indicating copy to clipboard operation
iceberg copied to clipboard

Why does FlinkSink writes position deletes in append-mode if identifier fields are specified?

Open tibercus opened this issue 1 year ago • 2 comments

Query engine

Flink

Question

FlinkSink in append-mode deduplicates records that came in one checkpoint by writing position deletes, if identifier fields are specified on V2 table. E.g. stream like

id value
checkpoint checkpoint
1 value 1
checkpoint checkpoint
1 value 2
1 value 3
checkpoint checkpoint

written into table with identifier fields = [id] will result in

id value
1 value 1
1 value 3

I find this behaviour confusing. I'd expect all three records to be appended into table in my example. Do I miss some Iceberg concepts? Why does FlinkSink work that way?

The source code I'm asking about in particular: https://github.com/apache/iceberg/blob/0c8703078443a3c73a5aa5a6bd1cf904e0b5ce09/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java#L141-L144

tibercus avatar Feb 22 '24 09:02 tibercus

What do you mean by

FlinkSink in append-mode

Could you please share your sink definition?

The code snippet highlighted by you is part of the BaseEqualityDeltaWriter, which means, that it will only write positional delete rows, if it is also writing out equality delete rows...

Could you please confirm that you have all of the expected types of files in your table:

  • data files for new rows (value 1, value 2, value 3)
  • equality delete files for the deleted records (value 1, value 2)
  • positional delete files for the records inserted and deleted in the same checkpoint (value 2)

If you have these files, the issue is most probably how you read the data. Could you please share that part of the code too?

Thanks, Peter

pvary avatar Feb 27 '24 06:02 pvary

@pvary Hi!

What do you mean by

FlinkSink in append-mode

I mean appending data with DataStream: https://iceberg.apache.org/docs/latest/flink-writes/#appending-data

Could you please confirm that you have all of the expected types of files in your table:

  • data files for new rows (value 1, value 2, value 3)
  • equality delete files for the deleted records (value 1, value 2)
  • positional delete files for the records inserted and deleted in the same checkpoint (value 2)
  • Yes, there are data files for these three rows
  • No, there are no equality delete files
  • Yes, there is position delete for value 2

What you are asking about is upseting data (https://iceberg.apache.org/docs/latest/flink-writes/#upsert-data), but there is no upsert-enabled configuration set to true in both FlinkSink and Iceberg metadata.

I created a repo to demonstrate the case. Please, take a look. You can find both source code and Iceberg files there. I made it a bit simpler (I only reproduce the checkpoint with two records being appended). https://github.com/tibercus/flink-iceberg-append-question

Please note the example with Flink SQL in the repo. INSERT INTO ... VALUES (1, 'value 1'), (1, 'value 2') shows the same strange behavior.

Thanks, Victor.

tibercus avatar Feb 28 '24 06:02 tibercus

Hi Victor,

In the example you have provided, you created a table with a primary key:

        tEnv.executeSql(
                "CREATE TABLE IF NOT EXISTS catalog.db.flink_sink_append (id int primary key, some_value string) " +
                        "WITH ('format-version'='2')"
        );

BTW, I had to change the code to run with 1.18 Flink to:

        tEnv.executeSql(
            "CREATE TABLE IF NOT EXISTS catalog.db.flink_sink_append (id int primary key NOT ENFORCED, some_value string) " +
                "WITH ('format-version'='2')"
        );

In this case, the table has a primary key, and it is a valid behaviour to prevent inserting 2 rows with the same key, that is why you do not see 2 rows in the result.

The meaning of upsert is: The Sink will not receive the -U, +U records, just +U records, and it should be handle that. See: https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/data_stream_api/#examples-for-fromchangelogstream

So if the upsert is false (default), then we expect the caller to send not only the +U (after update) records, but also the -U (before update) records too. In this case based on the -U records we write the equality deletes, and based on the +U records we write the inserts.

If the upsert is true, then we expect only the +U records, so for every new record we also write an equality delete to removed the previous version, to ensure that - if exists - the old record is removed.

Receiving two +U records without the corresponding -U record for the same primary key is valid for the upsert mode, but it is invalid for the non-upsert mode. So in this case the stream is invalid (does not match the configuration), so the behaviour is undefined.

I hope this makes sense.

Peter

So

pvary avatar Feb 28 '24 11:02 pvary

@pvary Hi Peter!

Thanks, now that I know about changelog stream in Flink, the behaviour makes perfect sense.

tibercus avatar Mar 05 '24 05:03 tibercus