hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT]Flink Streaming Read hudi table which is in clustering,encounterd file not exists.

Open weitianpei opened this issue 1 year ago • 19 comments

A hudi table ods_itp_tss_hudi is written by a flinksql program A whose version is flink1.16.0-hudi13.0. The program A's sql is : """ create table tss_odp_kafka_source ( data string, ts timestamp(3) metadata from 'timestamp', partition int metadata from 'partition', offset bigint metadata from 'offset' ) with ( 'connector' = 'kafka', 'topic' = 'ods_vgc_itp_tss', 'value.format' = 'raw', 'scan.startup.mode' = 'group-offsets', 'properties.auto.offset.reset' = 'earliest', 'properties.bootstrap.servers' = '${kafka.servers}', 'properties.group.id' = 'g-tss-ods-hudi' );

create table ods_itp_tss_hudi ( type string, country string, source string, brand string, version string, vin string, updated_date string, etl_date string, data string, recv_time timestamp(3), kafka_partition int, kafka_offset bigint, kafka_ts timestamp(3), sync_date string ) partitioned by (sync_date, type) with ( 'connector' = 'hudi', 'table.type' = 'COPY_ON_WRITE', 'path' = '/hudi/ods_vgc/ods_vgc_itp_tss_rli', 'write.tasks' = '1', 'write.operation' = 'insert', 'write.task.max.size' = '512' 'clustering.schedule.enabled' = 'true', 'clustering.async.enabled' = 'true', 'clustering.delta_commits' = '6', 'clean.async.enabled'='false', 'clean.retain_commits'='1' );

insert into ods_itp_tss_hudi (type, country, source, brand, version, vin, updated_date, etl_date, data, recv_time, kafka_partition, kafka_offset, kafka_ts, sync_date) select tss.type, tss.country, tss.source, tss.brand, tss.version, tss.vin, tss.updated_date, tss.etl_date, tss.data, to_timestamp(convert_tz(date_format(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss'), 'GMT+08:00', 'UTC')) as recv_time, kafka_partition, kafka_offset, kafka_ts, sync_date from ( select decompress_tss(data) as tss, to_timestamp(convert_tz(date_format(ts, 'yyyy-MM-dd HH:mm:ss'), 'GMT+08:00', 'UTC')) as kafka_ts, partition as kafka_partition, offset as kafka_offset, date_format(ts, 'yyyyMMdd') as sync_date from tss_odp_kafka_source

);

Then I started a new flink program B to streaming read the table ods_itp_tss_hudi created by program A,but i encounted the cause:

2024-04-19 03:45:27,925 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: TSS[1] -> Calc[2] -> ConstraintEnforcer[3] -> TableToDataSteam -> Filter -> Map -> Filter -> Map -> Sink: StdStreamToClickHouse (2/10) (87754f6aa528b72165577ce077304447_cbc357ccb763df2852fee8c4fc7d55f2_1_1) switched from RUNNING to FAILED on container_e70_1702372395802_74102_01_000010 @ 10.91.144.87 (dataPort=37053). java.io.FileNotFoundException: File does not exist: hdfs://HDFS8000151/hudi/ods_vgc/ods_vgc_itp_tss_rli/20240418/combi/7be53036-cdd9-4388-bd53-ce35edd303c3-0_0-1-0_20240419025522427.parquet at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1587) ~[hadoop-hdfs-client-3.1.2.jar:?] at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1580) ~[hadoop-hdfs-client-3.1.2.jar:?] at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) ~[hadoop-common-3.1.2.jar:?] at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1595) ~[hadoop-hdfs-client-3.1.2.jar:?] at org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:39) ~[hudi-flink1.16-bundle-0.13.1.jar:0.13.1] at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:469) ~[hudi-flink1.16-bundle-0.13.1.jar:0.13.1] at org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader.(ParquetColumnarRowSplitReader.java:130) ~[hudi-flink1.16-bundle-0.13.1.jar:0.13.1] at org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.genPartColumnarRowReader(ParquetSplitReaderUtil.java:148) ~[hudi-flink1.16-bundle-0.13.1.jar:0.13.1] at org.apache.hudi.table.format.RecordIterators.getParquetRecordIterator(RecordIterators.java:56) ~[hudi-flink1.16-bundle-0.13.1.jar:0.13.1] at org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.open(CopyOnWriteInputFormat.java:132) ~[hudi-flink1.16-bundle-0.13.1.jar:0.13.1] at org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.open(CopyOnWriteInputFormat.java:66) ~[hudi-flink1.16-bundle-0.13.1.jar:0.13.1] at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:84) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333) ~[flink-dist-1.16.0.jar:1.16.0

The program A has configued the parameter 'clustering.schedule.enabled' = 'true' to cluster the parquet file asynchronously. And parquets would be clusted until by 600Mb. When my program B was reading the parquet which was been clustered ,then I would met the file not exists exception. Anyone who can help me?

weitianpei avatar Apr 25 '24 07:04 weitianpei

You may need to tweak the clean.retain_commits option.

danny0405 avatar Apr 25 '24 08:04 danny0405

这个参数是没有效果的。那天我们看到,几天前的分区里面的小文件,在不停的原地滚动变更名字。我们的分区是按照本地系统时间对应的年月日来分区的。所以几天前的分区里面不会再有新的文件写入了。而那些没有被合并的小文件会不停的滚动变更文件名。一旦变更文件名,我就无法读取。

---Original--- From: "Danny @.> Date: Thu, Apr 25, 2024 16:55 PM To: @.>; Cc: @.@.>; Subject: Re: [apache/hudi] [SUPPORT]Flink Streaming Read hudi table which isin clustering,encounterd file not exists. (Issue #11090)

You may need to tweak the clean.retain_commits option.

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you are subscribed to this thread.Message ID: @.***>

weitianpei avatar Apr 25 '24 14:04 weitianpei

Is it beause they are being clustered continuously? And do you already skip reading the clustered files?

danny0405 avatar Apr 25 '24 23:04 danny0405

i have added the parameter to skip clustering file,but it did not work.

---Original--- From: "Danny @.> Date: Fri, Apr 26, 2024 07:50 AM To: @.>; Cc: @.@.>; Subject: Re: [apache/hudi] [SUPPORT]Flink Streaming Read hudi table which isin clustering,encounterd file not exists. (Issue #11090)

Is it beause they are being clustered continuously? And do you already skip reading the clustered files?

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you are subscribed to this thread.Message ID: @.***>

weitianpei avatar Apr 25 '24 23:04 weitianpei

There are some logs that reports the reader progress in the monitor operator, you can check that to see if the reader lags too much from the producer.

danny0405 avatar Apr 26 '24 03:04 danny0405

跟这个参数没有关系,文件不断的持续的在原地进行创建,并删除旧文件

2024年4月26日 11:11,Danny Chan @.***> 写道:

There are some logs that reports the reader progress in the monitor operator, you can check that to see if the reader lags too much from the producer.

— Reply to this email directly, view it on GitHub https://github.com/apache/hudi/issues/11090#issuecomment-2078557010, or unsubscribe https://github.com/notifications/unsubscribe-auth/AHH2Q2QOFHREAUGXUJIQCYTY7HAXJAVCNFSM6AAAAABGYIXP6OVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDANZYGU2TOMBRGA. You are receiving this because you are subscribed to this thread.

weitianpei avatar May 07 '24 01:05 weitianpei

If the job is not executing rollback repetitively, these files should be just a replacing of "COW" of files, for "COW", we create a new base file to replace the old one while the old one would be cleaned based on the cleaning configurations.

danny0405 avatar May 07 '24 01:05 danny0405

7057026C-8A58-428C-BFD3-E2F75085E25D please look this pic, April 17th,we found the little files in partition 20240411 were clustered still. And the downstream flink program read these files would met FileNOTEXTIES exception. The upstream program enabled asynchronous compression configuration.

weitianpei avatar May 07 '24 02:05 weitianpei

@codope when will we solve this problem?

weitianpei avatar May 07 '24 02:05 weitianpei

clustered still. And the downstream flink program read these files would met FileNOTEXTIES exception.

Either clustering and compaction can be skipped in flink streaming read.

danny0405 avatar May 07 '24 02:05 danny0405

I added the skip parameter in my downstream flink program,but the same problem happend again

weitianpei avatar May 07 '24 02:05 weitianpei

Would you mind addind a test to solve this problem?

weitianpei avatar May 07 '24 02:05 weitianpei

We did have the tests already in the repo for clustering and compaction skipping read, can you ensure the option takes effect and increase the numbers of retained commits before cleaning with option clean.retain_commits.

danny0405 avatar May 07 '24 02:05 danny0405

I did not increase the clean.retain_commits,the clean.retain_commits was 1.

By the way,how to configue the clean.async.enabled https://hudi.apache.org/docs/configurations#cleanasyncenabled, true or false?

2024年5月7日 10:14,Danny Chan @.***> 写道:

We did have the tests already in the repo for clustering and compaction skipping read, can you ensure the option takes effect and increase the numbers of retained commits before cleaning with option clean.retain_commits.

— Reply to this email directly, view it on GitHub https://github.com/apache/hudi/issues/11090#issuecomment-2097276769, or unsubscribe https://github.com/notifications/unsubscribe-auth/AHH2Q2WGNRNJRN7XAT2YIJDZBA2HJAVCNFSM6AAAAABGYIXP6OVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDAOJXGI3TMNZWHE. You are receiving this because you are subscribed to this thread.

weitianpei avatar May 07 '24 02:05 weitianpei

小文件会在后台不停的被合并,直到600M为止。你确定我们这样做,会不会导致下游程序读数据少读,重复读呢?

2024年5月7日 10:14,Danny Chan @.***> 写道:

We did have the tests already in the repo for clustering and compaction skipping read, can you ensure the option takes effect and increase the numbers of retained commits before cleaning with option clean.retain_commits.

— Reply to this email directly, view it on GitHub https://github.com/apache/hudi/issues/11090#issuecomment-2097276769, or unsubscribe https://github.com/notifications/unsubscribe-auth/AHH2Q2WGNRNJRN7XAT2YIJDZBA2HJAVCNFSM6AAAAABGYIXP6OVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDAOJXGI3TMNZWHE. You are receiving this because you are subscribed to this thread.

weitianpei avatar May 07 '24 02:05 weitianpei

Small files are continuously merged in the background until 600M. Are you sure that if we do this, will the downstream program read less data or read repeatedly?

For example, my program is reading a newly generated, only 30M file, after a while this file will be merged with other files to 600M large file. So when my program reads this big file, doesn't it duplicate the data?

weitianpei avatar May 07 '24 02:05 weitianpei

clean.retain_commits was 1

That means each time a new version of file generated, the old one would be deleted, for "COW" table, there is very high possibility you would encouter file missing exception because files are being deleted for every new commits. Can you just keep the clean.retain_commits as default to give the streaming reader some buffer time to read the new files.

By the way,how to configue the clean.async.enabled

Should be true if you do not want redundant legacy files on the filesystem.

danny0405 avatar May 07 '24 03:05 danny0405

Then why are there always small files in the partition a few days ago that are constantly rebuilt and deleted? There is no more data written to these partitions. I think increasing the parameters of retention cleanup will probably generate more files

weitianpei avatar May 07 '24 03:05 weitianpei

Are you enabling the clustering then? The clustering would rewrite all the partitions.

I think increasing the parameters of retention cleanup will probably generate more files

The small files are not affected by the cleaning strategy.

danny0405 avatar May 07 '24 04:05 danny0405

We have increase the parameter,and the downstream flink program does not encounter error.

But I found the file 20240509163048129.replacecommit http://10.89.208.45:13000/hue/filebrowser/view=/hudi/ods_vgc/ods_vgc_itp_tss_rli/.hoodie/20240509163048129.replacecommit in .hoodie directory, it’s intent is below.

{ "partitionToWriteStats" : { "20240207/clamp" : [ { "fileId" : "6849d48e-8141-4774-bc16-4020b039cc3d-0", "path" : "20240207/clamp/6849d48e-8141-4774-bc16-4020b039cc3d-0_0-1-0_20240509163048129.parquet", "cdcStats" : null, "prevCommit" : "null", "numWrites" : 97, "numDeletes" : 0, "numUpdateWrites" : 0, "numInserts" : 97, "totalWriteBytes" : 452622, "totalWriteErrors" : 0, "tempPath" : null, "partitionPath" : "20240207/clamp", "totalLogRecords" : 0, "totalLogFilesCompacted" : 0, "totalLogSizeCompacted" : 0, "totalUpdatedRecordsCompacted" : 0, "totalLogBlocks" : 0, "totalCorruptLogBlock" : 0, "totalRollbackBlocks" : 0, "fileSizeInBytes" : 452622, "minEventTime" : null, "maxEventTime" : null, "runtimeStats" : { "totalScanTime" : 0, "totalUpsertTime" : 0, "totalCreateTime" : 61 } } ], <>

Will my downstream application consume data repeatedly?

I am afraid my application would consume data repeatedly.

2024年5月7日 12:35,Danny Chan @.***> 写道:

Are you enabling the clustering then? The clustering would rewrite all the partitions.

I think increasing the parameters of retention cleanup will probably generate more files

The small files are not affected by the cleaning strategy.

— Reply to this email directly, view it on GitHub https://github.com/apache/hudi/issues/11090#issuecomment-2097428203, or unsubscribe https://github.com/notifications/unsubscribe-auth/AHH2Q2SLW3RF6LSJBEZES73ZBBKYXAVCNFSM6AAAAABGYIXP6OVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDAOJXGQZDQMRQGM. You are receiving this because you are subscribed to this thread.

weitianpei avatar May 09 '24 09:05 weitianpei

This is a replace commit, you can choose to skip it with option read.skip_clustering or read.skip_insertoverride enabled.

danny0405 avatar May 10 '24 00:05 danny0405

my downstream flink did not encounter file not exception while the upstream increase the clean.commits parameter.   I checked if my downstream program was re-reading old partition data yesterday and I found no rereading, at this time I was not configured to skip the cluster file.

---Original--- From: "Danny @.> Date: Fri, May 10, 2024 08:29 AM To: @.>; Cc: @.@.>; Subject: Re: [apache/hudi] [SUPPORT]Flink Streaming Read hudi table which isin clustering,encounterd file not exists. (Issue #11090)

This is a replace commit, you can choose to skip it with option read.skip_clustering or read.skip_insertoverride enabled.

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you are subscribed to this thread.Message ID: @.***>

weitianpei avatar May 10 '24 02:05 weitianpei

thank you 非常感谢你,陈兄。应该解决问题了

---Original--- From: "Danny @.> Date: Fri, May 10, 2024 08:29 AM To: @.>; Cc: @.@.>; Subject: Re: [apache/hudi] [SUPPORT]Flink Streaming Read hudi table which isin clustering,encounterd file not exists. (Issue #11090)

This is a replace commit, you can choose to skip it with option read.skip_clustering or read.skip_insertoverride enabled.

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you are subscribed to this thread.Message ID: @.***>

weitianpei avatar May 10 '24 08:05 weitianpei

Thanks for the feedback, feel free to reopen it if you still think it is a problem.

danny0405 avatar May 10 '24 09:05 danny0405

thanks dude,so.nice you are.

---Original--- From: "Danny @.> Date: Fri, May 10, 2024 17:03 PM To: @.>; Cc: @.@.>; Subject: Re: [apache/hudi] [SUPPORT]Flink Streaming Read hudi table which isin clustering,encounterd file not exists. (Issue #11090)

Thanks for the feedback, feel free to reopen it if you still think it is a problem.

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you are subscribed to this thread.Message ID: @.***>

weitianpei avatar May 10 '24 09:05 weitianpei

是否必须加合格参数呢?我在下游读的时候 read.skip_clustering

2024年5月10日 17:03,Danny Chan @.***> 写道:

Thanks for the feedback, feel free to reopen it if you still think it is a problem.

— Reply to this email directly, view it on GitHub https://github.com/apache/hudi/issues/11090#issuecomment-2104233217, or unsubscribe https://github.com/notifications/unsubscribe-auth/AHH2Q2WSFT3DGF2DSL7SW6DZBSENDAVCNFSM6AAAAABGYIXP6OVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDCMBUGIZTGMRRG4. You are receiving this because you are subscribed to this thread.

weitianpei avatar May 16 '24 02:05 weitianpei

是否必须加read.streaming.skip_clustering = true,当我读 上游开启了异步cluster的数据? 就是我前几天遇到的问题。我发现没有开启,貌似数据没有重复读。

2024年5月10日 17:03,Danny Chan @.***> 写道:

Closed #11090 https://github.com/apache/hudi/issues/11090 as completed.

— Reply to this email directly, view it on GitHub https://github.com/apache/hudi/issues/11090#event-12767246972, or unsubscribe https://github.com/notifications/unsubscribe-auth/AHH2Q2UM4EJQY6B52UZC4QLZBSENDAVCNFSM6AAAAABGYIXP6OVHI2DSMVQWIX3LMV45UABCJFZXG5LFIV3GK3TUJZXXI2LGNFRWC5DJN5XDWMJSG43DOMRUGY4TOMQ. You are receiving this because you are subscribed to this thread.

weitianpei avatar May 16 '24 02:05 weitianpei