iceberg
iceberg copied to clipboard
Flink: flink read iceberg upsert data use streaming mode
Now, flink read iceberg use streaming mode ignore 'overwrite' snapshot, so user can't read the delete data real-time. This PR first emit the eqdelete as delete rowdata(-U), and then emit the rowdata(+I) that join adddata and posdelete by function applyPosdelete. And then flink can keep one primary table from read iceberg primary table.
@openinx @stevenzwu Could you help take a look?
Thanks for @hameizi for contribution, I will take a look tomorrow.
@openinx Could you help take a look again?
Thanks for the work @hameizi , I think I will take a look at this tomorrow !
@hameizi I merged this PR and did some testing and when I update a record, it actually inserts a new record
Flink SQL> CREATE TABLE iceberg_table_upsert8(
id BIGINT, data STRING, PRIMARY KEY (id) NOT ENFORCED) WITH ( 'connector'='iceberg', 'format-version' = '2', 'catalog-name'='hive_prod', 'catalog-type'='hive', 'write.upsert.enable'='true', 'uri'='thrift://localhost:9083', 'warehouse'='file:///tmp/iceberg/' );
INSERT INTO iceberg_table_upsert8 VALUES(1,'qq');
INSERT INTO iceberg_table_upsert8 VALUES(1,'aa');
Flink SQL> SELECT * FROM iceberg_table_upsert8 /+ OPTIONS('streaming'='true', 'monitor-interval'='1s')/ ; +----+----------------------+--------------------------------+ | op | id | data | +----+----------------------+--------------------------------+ | -D | 1 | qq | | +I | 1 | qq | | -D | 1 | aa | | +I | 1 | aa |
When I use Flink Batch query table ,it has tow records Flink SQL> select * from iceberg_table_upsert8; +----+----------------------+--------------------------------+ | op | id | data | +----+----------------------+--------------------------------+ | +I | 1 | qq | | +I | 1 | aa | +----+----------------------+--------------------------------+ Received a total of 2 rows
@MOBIN-F It's true, this PR translate update to -D and +I.
@MOBIN-F It's true, this PR translate update to -D and +I.
It will query the record(1,qq). but this record is from before the update, Is that right?
@MOBIN-F It's true, this PR translate update to -D and +I.
It will query the record(1,qq). but this record is from before the update, Is that right?
If you don't query op of data, you will get only one result (1,aa).
Hi @hameizi, thanks for the great work! I'm really interested in this patch to support overwrite snapshots. In my company, 90% of the write to iceberg is overwrite by Spark, so it makes really hard/impossible to streaming read from these tables in Flink. I wonder if this patch requires Flink 1.13, or it'll also work for 1.12?
@openinx do you know if this PR can make it to the next release? if so, roughly when will be the next release or does it depends on the progress on the priority 1 items in the roadmap?
I wonder if this patch requires Flink 1.13, or it'll also work for 1.12?
@xinbinhuang It work for both Flink 1.12 and 1.13.
@MOBIN-F It's true, this PR translate update to -D and +I.
It will query the record(1,qq). but this record is from before the update, Is that right?
If you don't query op of data, you will get only one result (1,aa).
Sorry, I don't understand how to get only one result(1,aa). Could you tell me how to do that?
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sqlclient/
client mode can help you to do that
@MOBIN-F It's true, this PR translate update to -D and +I.
It will query the record(1,qq). but this record is from before the update, Is that right?
If you don't query op of data, you will get only one result (1,aa).
Sorry, I don't understand how to get only one result(1,aa). Could you tell me how to do that?
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sqlclient/
client mode can help you to do that
Thanks ,After testing, the PR is problematic, there will be duplicate data in batch read, not suitable for Retract stream and join scenario also have problems, I'm trying to implement this feature according to the design documentation
Thanks ,After testing, the PR is problematic, there will be duplicate data in batch read, not suitable for Upsert stream and join scenario also have problems, I'm trying to implement this feature according to the design documentation
This PR is not relate to bath mode. And can you descript the detail of Upsert and join problem?
@hameizi , would you mind to resolve the conflicts ? I think I can take a look for the first round later.
@hameizi , would you mind to resolve the conflicts ? I think I can take a look for the first round later.
@openinx I will resolve the conflicts , but mayby take some time because there is many change in flink moudle. When do you have time to review? today?
I merged this PR and did some testing. i used flink cdc to consume binlog and wrote to iceberg table, then run some flink sql query(streaming) and compared with mysql original query , then the results were not matched.
After some debug I found 2 issues.
- flink writing cdc data to iceberg will ignore UPDATE_BEFORE and treat UPDATE_AFTER as retract message, it's completely not correct. The relevant codes are as follows:
`
//org.apache.iceberg.flink.sink.BaseDeltaTaskWriter
switch (row.getRowKind()) {
case INSERT:
case UPDATE_AFTER:
if (upsert) {
writer.delete(row);
}
writer.write(row);
break;
case UPDATE_BEFORE:
if (upsert) {
break; // UPDATE_BEFORE is not necessary for UPDATE, we do nothing to prevent delete one row twice
}
writer.delete(row);
break;
case DELETE:
writer.delete(row);
break;
default:
throw new UnsupportedOperationException("Unknown row kind: " + row.getRowKind());
}
`
- when there is only delete operation during the period of one snapshot,the snapshot of iceberg will contains only equality delete files and no data file(data file is deleted), flink will ignore equality delete files and miss all -D . function org.apache.iceberg.ManifestGroup.planStreamingFiles() create FileScanTask only when data file exists, so flink can not process equality delete files.
After fixed above 2 issues the test is passed.
- flink writing cdc data to iceberg will ignore UPDATE_BEFORE and treat UPDATE_AFTER as retract message, it's completely not correct. The relevant codes are as follows:
For this issue, config write.upsert.enabled is not suitable for CDC. So if your scene is CDC you can config it false that result upsert false.
2. when there is only delete operation during the period of one snapshot,the snapshot of iceberg will contains only equality delete files and no data file(data file is deleted), flink will ignore equality delete files and miss all -D . function org.apache.iceberg.ManifestGroup.planStreamingFiles() create FileScanTask only when data file exists, so flink can not process equality delete files.
I will do more test for this issue. Thanks for your feedback.
For this issue, config
write.upsert.enabledis not suitable for CDC. So if your scene is CDC you can config it false that resultupsertfalse. Thanks , I also merged change-delete-logic , I think it's a better way to write delete rows. I am confused why delete row writing to postition delete file also write to equality delete file.
I merged this PR and did some testing. i used flink cdc to consume binlog and wrote to iceberg table, then run some flink sql query(streaming) and compared with mysql original query , then the results were not matched.
After some debug I found 2 issues.
- flink writing cdc data to iceberg will ignore UPDATE_BEFORE and treat UPDATE_AFTER as retract message, it's completely not correct. The relevant codes are as follows:
`
//org.apache.iceberg.flink.sink.BaseDeltaTaskWriter switch (row.getRowKind()) { case INSERT: case UPDATE_AFTER: if (upsert) { writer.delete(row); } writer.write(row); break; case UPDATE_BEFORE: if (upsert) { break; // UPDATE_BEFORE is not necessary for UPDATE, we do nothing to prevent delete one row twice } writer.delete(row); break; case DELETE: writer.delete(row); break; default: throw new UnsupportedOperationException("Unknown row kind: " + row.getRowKind()); }`
- when there is only delete operation during the period of one snapshot,the snapshot of iceberg will contains only equality delete files and no data file(data file is deleted), flink will ignore equality delete files and miss all -D . function org.apache.iceberg.ManifestGroup.planStreamingFiles() create FileScanTask only when data file exists, so flink can not process equality delete files.
After fixed above 2 issues the test is passed.
BaseDeltaTaskWriter
I think this code will be more clear if written like this
public void write(RowData row) throws IOException {
RowDataDeltaWriter writer = route(row);
switch (row.getRowKind()) {
case INSERT:
if (upsert) {
writer.delete(row);
}
writer.write(row);
break;
case UPDATE_AFTER:
writer.write(row);
break;
case UPDATE_BEFORE:
case DELETE:
writer.delete(row);
break;
default:
throw new UnsupportedOperationException("Unknown row kind: " + row.getRowKind());
}
}
@hameizi @openinx @rdblue hi,boys,We are currently using the flink upsert function. In which version is this PR planned to be released?
Another issue, After run rewrite data file action and expire snapshot action(retain one Last snapshot), and then started a new flink streaming job, it did not read the old data. I checked metadata files , there was only one snapshot and all manifest file had a smaller sequenceNumber. what should i do if i want to read all data.
@stevenzwu I will perfect this PR after https://github.com/apache/iceberg/pull/4580 finished, because there is some correlation between them.
@hameizi hihi, is there any plan for this PR? i saw the PR #4580 already get merge d into master,
@hameizi hihi, is there any plan for this PR? i saw the PR #4580 already get merge d into master,
I will work on this these days.
@hameizi hihi, is there any plan for this PR? i saw the PR #4580 already get merge d into master,
I will work on this these days. hameizi how about this PR?
@hameizi @openinx Hello guys,what's the status of the issue now, please? Any blocking things? Thanks~