iceberg icon indicating copy to clipboard operation
iceberg copied to clipboard

Flink: flink read iceberg upsert data use streaming mode

Open hameizi opened this issue 4 years ago • 25 comments

Now, flink read iceberg use streaming mode ignore 'overwrite' snapshot, so user can't read the delete data real-time. This PR first emit the eqdelete as delete rowdata(-U), and then emit the rowdata(+I) that join adddata and posdelete by function applyPosdelete. And then flink can keep one primary table from read iceberg primary table.

hameizi avatar Sep 10 '21 02:09 hameizi

@openinx @stevenzwu Could you help take a look?

hameizi avatar Sep 10 '21 02:09 hameizi

Thanks for @hameizi for contribution, I will take a look tomorrow.

openinx avatar Sep 13 '21 11:09 openinx

@openinx Could you help take a look again?

hameizi avatar Oct 18 '21 08:10 hameizi

Thanks for the work @hameizi , I think I will take a look at this tomorrow !

openinx avatar Oct 18 '21 11:10 openinx

@hameizi I merged this PR and did some testing and when I update a record, it actually inserts a new record

Flink SQL> CREATE TABLE iceberg_table_upsert8(

id   BIGINT,
data STRING,
PRIMARY KEY (id) NOT ENFORCED

) WITH ( 'connector'='iceberg', 'format-version' = '2', 'catalog-name'='hive_prod', 'catalog-type'='hive', 'write.upsert.enable'='true', 'uri'='thrift://localhost:9083', 'warehouse'='file:///tmp/iceberg/' );

INSERT INTO iceberg_table_upsert8 VALUES(1,'qq');

INSERT INTO iceberg_table_upsert8 VALUES(1,'aa');

Flink SQL> SELECT * FROM iceberg_table_upsert8 /+ OPTIONS('streaming'='true', 'monitor-interval'='1s')/ ; +----+----------------------+--------------------------------+ | op | id | data | +----+----------------------+--------------------------------+ | -D | 1 | qq | | +I | 1 | qq | | -D | 1 | aa | | +I | 1 | aa |

When I use Flink Batch query table ,it has tow records Flink SQL> select * from iceberg_table_upsert8; +----+----------------------+--------------------------------+ | op | id | data | +----+----------------------+--------------------------------+ | +I | 1 | qq | | +I | 1 | aa | +----+----------------------+--------------------------------+ Received a total of 2 rows

MOBIN-F avatar Oct 25 '21 08:10 MOBIN-F

@MOBIN-F It's true, this PR translate update to -D and +I.

hameizi avatar Oct 25 '21 08:10 hameizi

@MOBIN-F It's true, this PR translate update to -D and +I.

It will query the record(1,qq). but this record is from before the update, Is that right?

MOBIN-F avatar Oct 25 '21 08:10 MOBIN-F

@MOBIN-F It's true, this PR translate update to -D and +I.

It will query the record(1,qq). but this record is from before the update, Is that right?

If you don't query op of data, you will get only one result (1,aa).

hameizi avatar Oct 25 '21 08:10 hameizi

Hi @hameizi, thanks for the great work! I'm really interested in this patch to support overwrite snapshots. In my company, 90% of the write to iceberg is overwrite by Spark, so it makes really hard/impossible to streaming read from these tables in Flink. I wonder if this patch requires Flink 1.13, or it'll also work for 1.12?

@openinx do you know if this PR can make it to the next release? if so, roughly when will be the next release or does it depends on the progress on the priority 1 items in the roadmap?

xinbinhuang avatar Nov 03 '21 19:11 xinbinhuang

I wonder if this patch requires Flink 1.13, or it'll also work for 1.12?

@xinbinhuang It work for both Flink 1.12 and 1.13.

hameizi avatar Nov 04 '21 02:11 hameizi

@MOBIN-F It's true, this PR translate update to -D and +I.

It will query the record(1,qq). but this record is from before the update, Is that right?

If you don't query op of data, you will get only one result (1,aa).

Sorry, I don't understand how to get only one result(1,aa). Could you tell me how to do that?

https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sqlclient/

client mode can help you to do that

Initial-neko avatar Nov 10 '21 09:11 Initial-neko

@MOBIN-F It's true, this PR translate update to -D and +I.

It will query the record(1,qq). but this record is from before the update, Is that right?

If you don't query op of data, you will get only one result (1,aa).

Sorry, I don't understand how to get only one result(1,aa). Could you tell me how to do that?

https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sqlclient/

client mode can help you to do that

Thanks ,After testing, the PR is problematic, there will be duplicate data in batch read, not suitable for Retract stream and join scenario also have problems, I'm trying to implement this feature according to the design documentation

MOBIN-F avatar Nov 12 '21 06:11 MOBIN-F

Thanks ,After testing, the PR is problematic, there will be duplicate data in batch read, not suitable for Upsert stream and join scenario also have problems, I'm trying to implement this feature according to the design documentation

This PR is not relate to bath mode. And can you descript the detail of Upsert and join problem?

hameizi avatar Nov 12 '21 07:11 hameizi

@hameizi , would you mind to resolve the conflicts ? I think I can take a look for the first round later.

openinx avatar Nov 12 '21 09:11 openinx

@hameizi , would you mind to resolve the conflicts ? I think I can take a look for the first round later.

@openinx I will resolve the conflicts , but mayby take some time because there is many change in flink moudle. When do you have time to review? today?

hameizi avatar Nov 12 '21 09:11 hameizi

I merged this PR and did some testing. i used flink cdc to consume binlog and wrote to iceberg table, then run some flink sql query(streaming) and compared with mysql original query , then the results were not matched.

After some debug I found 2 issues.

  1. flink writing cdc data to iceberg will ignore UPDATE_BEFORE and treat UPDATE_AFTER as retract message, it's completely not correct. The relevant codes are as follows:

`

//org.apache.iceberg.flink.sink.BaseDeltaTaskWriter
 switch (row.getRowKind()) {
  case INSERT:
  case UPDATE_AFTER:
    if (upsert) {
      writer.delete(row);
    }
    writer.write(row);
    break;

  case UPDATE_BEFORE:
    if (upsert) {
      break;  // UPDATE_BEFORE is not necessary for UPDATE, we do nothing to prevent delete one row twice
    }
    writer.delete(row);
    break;
  case DELETE:
    writer.delete(row);
    break;

  default:
    throw new UnsupportedOperationException("Unknown row kind: " + row.getRowKind());
}

`

  1. when there is only delete operation during the period of one snapshot,the snapshot of iceberg will contains only equality delete files and no data file(data file is deleted), flink will ignore equality delete files and miss all -D . function org.apache.iceberg.ManifestGroup.planStreamingFiles() create FileScanTask only when data file exists, so flink can not process equality delete files.

After fixed above 2 issues the test is passed.

xianyouQ avatar Jan 19 '22 07:01 xianyouQ

  1. flink writing cdc data to iceberg will ignore UPDATE_BEFORE and treat UPDATE_AFTER as retract message, it's completely not correct. The relevant codes are as follows:

For this issue, config write.upsert.enabled is not suitable for CDC. So if your scene is CDC you can config it false that result upsert false.

2. when there is only delete operation during the period of one snapshot,the snapshot of iceberg will contains only equality delete files and no data file(data file is deleted), flink will ignore equality delete files and miss all -D . function org.apache.iceberg.ManifestGroup.planStreamingFiles() create FileScanTask only when data file exists, so flink can not process equality delete files.

I will do more test for this issue. Thanks for your feedback.

hameizi avatar Jan 20 '22 08:01 hameizi

For this issue, config write.upsert.enabled is not suitable for CDC. So if your scene is CDC you can config it false that result upsert false. Thanks , I also merged change-delete-logic , I think it's a better way to write delete rows. I am confused why delete row writing to postition delete file also write to equality delete file.

xianyouQ avatar Jan 24 '22 03:01 xianyouQ

I merged this PR and did some testing. i used flink cdc to consume binlog and wrote to iceberg table, then run some flink sql query(streaming) and compared with mysql original query , then the results were not matched.

After some debug I found 2 issues.

  1. flink writing cdc data to iceberg will ignore UPDATE_BEFORE and treat UPDATE_AFTER as retract message, it's completely not correct. The relevant codes are as follows:

`

//org.apache.iceberg.flink.sink.BaseDeltaTaskWriter
 switch (row.getRowKind()) {
  case INSERT:
  case UPDATE_AFTER:
    if (upsert) {
      writer.delete(row);
    }
    writer.write(row);
    break;

  case UPDATE_BEFORE:
    if (upsert) {
      break;  // UPDATE_BEFORE is not necessary for UPDATE, we do nothing to prevent delete one row twice
    }
    writer.delete(row);
    break;
  case DELETE:
    writer.delete(row);
    break;

  default:
    throw new UnsupportedOperationException("Unknown row kind: " + row.getRowKind());
}

`

  1. when there is only delete operation during the period of one snapshot,the snapshot of iceberg will contains only equality delete files and no data file(data file is deleted), flink will ignore equality delete files and miss all -D . function org.apache.iceberg.ManifestGroup.planStreamingFiles() create FileScanTask only when data file exists, so flink can not process equality delete files.

After fixed above 2 issues the test is passed.

BaseDeltaTaskWriter

I think this code will be more clear if written like this

  public void write(RowData row) throws IOException {
    RowDataDeltaWriter writer = route(row);

    switch (row.getRowKind()) {
      case INSERT:
        if (upsert) {
          writer.delete(row);
        }
        writer.write(row);
        break;
      case UPDATE_AFTER:
        writer.write(row);
        break;
      case UPDATE_BEFORE:
      case DELETE:
        writer.delete(row);
        break;

      default:
        throw new UnsupportedOperationException("Unknown row kind: " + row.getRowKind());
    }
  }

kingeasternsun avatar Jan 27 '22 09:01 kingeasternsun

@hameizi @openinx @rdblue hi,boys,We are currently using the flink upsert function. In which version is this PR planned to be released?

liubo1022126 avatar Feb 18 '22 03:02 liubo1022126

Another issue, After run rewrite data file action and expire snapshot action(retain one Last snapshot), and then started a new flink streaming job, it did not read the old data. I checked metadata files , there was only one snapshot and all manifest file had a smaller sequenceNumber. what should i do if i want to read all data.

xianyouQ avatar Mar 16 '22 07:03 xianyouQ

@stevenzwu I will perfect this PR after https://github.com/apache/iceberg/pull/4580 finished, because there is some correlation between them.

hameizi avatar Apr 19 '22 05:04 hameizi

@hameizi hihi, is there any plan for this PR? i saw the PR #4580 already get merge d into master,

GavinH1984 avatar May 16 '22 08:05 GavinH1984

@hameizi hihi, is there any plan for this PR? i saw the PR #4580 already get merge d into master,

I will work on this these days.

hameizi avatar May 16 '22 09:05 hameizi

@hameizi hihi, is there any plan for this PR? i saw the PR #4580 already get merge d into master,

I will work on this these days. hameizi how about this PR?

iflytek-hmwang5 avatar Aug 02 '22 03:08 iflytek-hmwang5

@hameizi @openinx Hello guys,what's the status of the issue now, please? Any blocking things? Thanks~

Xiangakun avatar Jun 09 '23 06:06 Xiangakun