iceberg icon indicating copy to clipboard operation
iceberg copied to clipboard

flink1.15.2 or 1.15.1 stream read ,it is pending

Open zixiao144 opened this issue 3 years ago • 9 comments

Apache Iceberg version

0.14.0 (latest release)

Query engine

Flink

Please describe the bug 🐞

SET execution.runtime-mode = streaming; select * from test /+ OPTIONS('streaming'='true', 'monitor-interval'='1s')/

no result,it is always pending, bath is normal

zixiao144 avatar Sep 20 '22 09:09 zixiao144

image it is always pending, no result

zixiao144 avatar Sep 20 '22 09:09 zixiao144

2022-09-20 21:15:43,205 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - class org.apache.iceberg.flink.source.FlinkInputSplit does not contain a getter for field task 2022-09-20 21:15:43,206 INFO org.apache.flink.api.java.typeutil

zixiao144 avatar Sep 20 '22 13:09 zixiao144

@zixiao144 maybe a dumb question. is there any snapshots after the startSnapshotId?

also can you enable DEBUG level logging for StreamingMonitorFunction class? then we can see the split discovery logs. e.g.

      LOG.debug(
          "Start discovering splits from {} (exclusive) to {} (inclusive)",
          lastSnapshotId,
          snapshotId);
      long start = System.currentTimeMillis();
      FlinkInputSplit[] splits =
          FlinkSplitPlanner.planInputSplits(table, newScanContext, workerPool);
      LOG.debug(
          "Discovered {} splits, time elapsed {}ms",
          splits.length,
          System.currentTimeMillis() - start);

stevenzwu avatar Sep 20 '22 15:09 stevenzwu

BTW, the FLIP-27 Flink Iceberg source is also merged in the master branch. it is available in 0.14.0 (except for the SQL connector commit and a metrics fix commit). If you are interested in giving it a try, you can refer to the usage in the unit test code.

The benefit of FLIP-27 source for streaming is that it uses pull-based split assignment, which is good for dynamic split assignment and work sharing. Current FlinkSource would assign all discovered splits eagerly to all reader subtasks in a round-robin fashion.

stevenzwu avatar Sep 20 '22 15:09 stevenzwu

@stevenzwu
I use flink1.15.2's sqlclient.sh .Do simple test. SET execution.runtime-mode = streaming; SET table.dynamic-table-options.enabled=true; or (SET table.exec.iceberg.use-flip27-source = true;)

select * from test /+ OPTIONS('streaming'='true', 'monitor-interval'='1s')/ ;

streaming result: image

SET execution.runtime-mode = batch; batch result: image

batch is normal ,but streaming is not normal.

zixiao144 avatar Sep 21 '22 00:09 zixiao144

I must be use sql,I use pyflink。How to do deal with it. Thanks

zixiao144 avatar Sep 21 '22 01:09 zixiao144

@zixiao144 stream read will discover new snapshots with data files appended. if you launch a Flink SQL job with streaming read, you would need to insert new rows to the table for the streaming SQL jobs to discover. By default, the streaming read starts from the latest snapshot. if there are no rows/snapshots appended to the table after the job started, there won't be new data to be read.

stevenzwu avatar Sep 21 '22 03:09 stevenzwu

thanks @stevenzwu I use hadoop catelog . step1 stream read step2 appended data not work,no result.

zixiao144 avatar Sep 21 '22 04:09 zixiao144

begin : image image

and

image image

image

zixiao144 avatar Sep 21 '22 04:09 zixiao144

Can you provide the log file for Flink jobmanager with DEBUG enabled for org.apache.iceberg.flink.source.StreamingMonitorFunction? Then we can check the periodical split discovery for the streaming execution.

For FLIP-27 source, SQL is not available in 0.14.1. Did you try it with build from master branch? If yes, we also need jobmanager log file (with INFO level) to investigate.

stevenzwu avatar Sep 26 '22 05:09 stevenzwu

the jobmanager log doesn't contain any log lines from org.apache.iceberg. is the logging level set properly?

stevenzwu avatar Sep 26 '22 17:09 stevenzwu

in flink-root-taskexecutor log lines from org.apache.iceberg .

2022-09-27 00:19:28,670 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask [] - Legacy source Source: Iceberg table (iotCatalog.iotdw.test) monitor (1/1)#0 (d77ec0c3c3c1493cb4a6a506b8648be3) skip execution since the task is finished on restore DEBUG org.apache.iceberg.flink.source.StreamingMonitorFunction [] - Discovered 0 splits, time elapsed 11883ms

zixiao144 avatar Sep 27 '22 00:09 zixiao144

image I opened debug

zixiao144 avatar Sep 27 '22 00:09 zixiao144

2022-09-27 08:19:21,864 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - class org.apache.iceberg.flink.source.FlinkInputSplit does not contain a getter for field task 2022-09-27 08:19:21,864 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - class org.apache.iceberg.flink.source.FlinkInputSplit does not contain a setter for field task 2022-09-27 08:19:21,864 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - Class class org.apache.iceberg.flink.source.FlinkInputSplit cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.

zixiao144 avatar Sep 27 '22 00:09 zixiao144

Is it working well in flink 1.15.0 instead of 1.15.2? The default flink version for iceberg 0.14.0 is 1.15.0. And I have another wired issue with 1.14.3 in iceberg 0.13.2. The small version fix might cause a problem. Can some developer answer the question?

kingwind94 avatar Sep 27 '22 01:09 kingwind94

@zixiao144 yes, saw the logs in taskmanager log file. Sorry, I forgot the the StreamingMonitorFunction runs on taskmanagers. Flink streaming read only discovers append snapshot. How did you append data to the table? it seems that Flink source see the snapshots but didn't find any splits from snapshots with append operation.

Can you run java API to check the snapshot history and the operation type of the Snapshot? Alternatively, you can run Spark SQL to query the metadata table: https://iceberg.apache.org/docs/latest/spark-queries/#snapshots.

2022-09-27 00:19:28,682 DEBUG org.apache.iceberg.flink.source.StreamingMonitorFunction     [] - Start discovering splits from -1 (exclusive) to 6496244702617183271 (inclusive)
2022-09-27 00:19:40,565 DEBUG org.apache.iceberg.flink.source.StreamingMonitorFunction     [] - Discovered 0 splits, time elapsed 11883ms
2022-09-27 00:19:40,566 DEBUG org.apache.iceberg.flink.source.StreamingMonitorFunction     [] - Forwarded 0 splits, time elapsed 1ms
2022-09-27 00:19:57,703 DEBUG org.apache.iceberg.flink.source.StreamingMonitorFunction     [] - Start discovering splits from 6496244702617183271 (exclusive) to 2877339576078753525 (inclusive)
2022-09-27 00:19:57,704 DEBUG org.apache.iceberg.flink.source.StreamingMonitorFunction     [] - Discovered 0 splits, time elapsed 1ms
2022-09-27 00:19:57,705 DEBUG org.apache.iceberg.flink.source.StreamingMonitorFunction     [] - Forwarded 0 splits, time elapsed 0ms
2022-09-27 00:20:02,759 DEBUG org.apache.iceberg.flink.source.StreamingMonitorFunction     [] - Start discovering splits from 2877339576078753525 (exclusive) to 9205452674806088860 (inclusive)
2022-09-27 00:20:02,759 DEBUG org.apache.iceberg.flink.source.StreamingMonitorFunction     [] - Discovered 0 splits, time elapsed 0ms
2022-09-27 00:20:02,759 DEBUG org.apache.iceberg.flink.source.StreamingMonitorFunction     [] - Forwarded 0 splits, time elapsed 0ms
2022-09-27 00:20:52,053 DEBUG org.apache.iceberg.flink.source.StreamingMonitorFunction     [] - Start discovering splits from 9205452674806088860 (exclusive) to 85589348609116719 (inclusive)
2022-09-27 00:20:52,070 DEBUG org.apache.iceberg.flink.source.StreamingMonitorFunction     [] - Discovered 0 splits, time elapsed 17ms
2022-09-27 00:20:52,070 DEBUG org.apache.iceberg.flink.source.StreamingMonitorFunction     [] - Forwarded 0 splits, time elapsed 0ms
2022-09-27 00:21:42,325 DEBUG org.apache.iceberg.flink.source.StreamingMonitorFunction     [] - Start discovering splits from 85589348609116719 (exclusive) to 519180963154444509 (inclusive)
2022-09-27 00:21:42,345 DEBUG org.apache.iceberg.flink.source.StreamingMonitorFunction     [] - Discovered 0 splits, time elapsed 20ms
2022-09-27 00:21:42,345 DEBUG org.apache.iceberg.flink.source.StreamingMonitorFunction     [] - Forwarded 0 splits, time elapsed 0ms
2022-09-27 00:22:32,575 DEBUG org.apache.iceberg.flink.source.StreamingMonitorFunction     [] - Start discovering splits from 519180963154444509 (exclusive) to 2413530983076443232 (inclusive)
2022-09-27 00:22:32,595 DEBUG org.apache.iceberg.flink.source.StreamingMonitorFunction     [] - Discovered 0 splits, time elapsed 20ms
2022-09-27 00:22:32,595 DEBUG org.apache.iceberg.flink.source.StreamingMonitorFunction     [] - Forwarded 0 splits, time elapsed 0ms
2022-09-27 00:23:21,804 DEBUG org.apache.iceberg.flink.source.StreamingMonitorFunction     [] - Start discovering splits from 2413530983076443232 (exclusive) to 5161337709432232128 (inclusive)
2022-09-27 00:23:21,819 DEBUG org.apache.iceberg.flink.source.StreamingMonitorFunction     [] - Discovered 0 splits, time elapsed 15ms
2022-09-27 00:23:21,819 DEBUG org.apache.iceberg.flink.source.StreamingMonitorFunction     [] - Forwarded 0 splits, time elapsed 0ms

stevenzwu avatar Sep 27 '22 05:09 stevenzwu

I use flink cdc to iceberg ,I test it .before insert, snapshots is 19. image and I insert one row ,it is 19. image

zixiao144 avatar Sep 27 '22 06:09 zixiao144

image create table by the way

zixiao144 avatar Sep 27 '22 06:09 zixiao144

Ah. You are doing CDC write to Iceberg. For the streaming read out of Iceberg CDC table, you want to read rows out with proper row type (like insert, delete, etc.). Then this is expected as the current Flink streaming read doesn't support upsert/row-level deletes.

Anton has been working on supporting CDC reads from Iceberg core and Spark. iceberg-flink can leverage the new APIs in Iceberg-api/core. don't think there is anyone picking it up from Flink side yet.

stevenzwu avatar Sep 27 '22 16:09 stevenzwu

thanks. cdc doesn't supprot upsert.I disable iceberg's option . it can work.

zixiao144 avatar Sep 28 '22 00:09 zixiao144

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

github-actions[bot] avatar Mar 28 '23 00:03 github-actions[bot]

This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'

github-actions[bot] avatar Apr 12 '23 00:04 github-actions[bot]