hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT] How to configure spark and flink to write mor tables using bucket indexes?

Open xiaofan2022 opened this issue 1 year ago • 11 comments

I want to use flink and spark to write to the mor table, and use bucket CONSISTENT_HASHING for the index, but I find that spark is very fast to write the full amount and flink is very slow(flink write 100record/s) to write increments. spark sql:

CREATE TABLE test.tableA ()
USING hudi
TBLPROPERTIES (
'connector' = 'hudi',
'index.type'='BUCKET',
'hoodie.index.type'='BUCKET',
'hoodie.index.bucket.engine'='CONSISTENT_HASHING',
'hoodie.datasource.write.recordkey.field' = '',
'path' = '',
'preCombineField' = 'create_time',
'precombine.field' = 'create_time',
'primaryKey' = '',
'table.type' = 'MERGE_ON_READ',
'write.rate.limit'='10000',--flink配置
'write.tasks'='2',--flink配置
'write.utc-timezone'='false',
 'type' = 'mor');

flink_slow How to optimize?

xiaofan2022 avatar Sep 14 '24 15:09 xiaofan2022

Did you have chance to check the thread dump of the operators?

danny0405 avatar Sep 15 '24 01:09 danny0405

"consistent_bucket_write: test.fin_ipr_inmaininfo_test (1/2)#0" Id=89 TIMED_WAITING on java.util.LinkedList@37d9fd7 at java.lang.Object.wait(Native Method) - waiting on java.util.LinkedList@37d9fd7 at org.apache.hadoop.hdfs.DataStreamer.waitForAckedSeqno(DataStreamer.java:924) at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:692) at org.apache.hadoop.hdfs.DFSOutputStream.hsync(DFSOutputStream.java:587) at org.apache.hadoop.fs.FSDataOutputStream.hsync(FSDataOutputStream.java:145) at org.apache.hadoop.fs.FSDataOutputStream.hsync(FSDataOutputStream.java:145) at org.apache.hudi.common.table.log.HoodieLogFormatWriter.flush(HoodieLogFormatWriter.java:261) at org.apache.hudi.common.table.log.HoodieLogFormatWriter.appendBlocks(HoodieLogFormatWriter.java:194) at org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:479) at org.apache.hudi.io.HoodieAppendHandle.doAppend(HoodieAppendHandle.java:450) at org.apache.hudi.table.action.commit.delta.BaseFlinkDeltaCommitActionExecutor.handleUpdate(BaseFlinkDeltaCommitActionExecutor.java:54) at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.handleUpsertPartition(BaseFlinkCommitActionExecutor.java:191) at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.execute(BaseFlinkCommitActionExecutor.java:109) at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.execute(BaseFlinkCommitActionExecutor.java:71) at org.apache.hudi.table.action.commit.FlinkWriteHelper.write(FlinkWriteHelper.java:77) at org.apache.hudi.table.action.commit.delta.FlinkUpsertDeltaCommitActionExecutor.execute(FlinkUpsertDeltaCommitActionExecutor.java:51) at org.apache.hudi.table.HoodieFlinkMergeOnReadTable.upsert(HoodieFlinkMergeOnReadTable.java:73) at org.apache.hudi.client.HoodieFlinkWriteClient.upsert(HoodieFlinkWriteClient.java:148) at org.apache.hudi.sink.StreamWriteFunction.lambda$initWriteFunction$1(StreamWriteFunction.java:192) at org.apache.hudi.sink.StreamWriteFunction$$Lambda$1275/530894042.apply(Unknown Source) at org.apache.hudi.sink.bucket.ConsistentBucketStreamWriteFunction.lambda$writeBucket$0(ConsistentBucketStreamWriteFunction.java:80) at org.apache.hudi.sink.bucket.ConsistentBucketStreamWriteFunction$$Lambda$1599/372489614.apply(Unknown Source) at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267) at java.util.Collections$2.tryAdvance(Collections.java:4717) at java.util.Collections$2.forEachRemaining(Collections.java:4725) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.hudi.sink.bucket.ConsistentBucketStreamWriteFunction.writeBucket(ConsistentBucketStreamWriteFunction.java:81) at org.apache.hudi.sink.StreamWriteFunction.lambda$flushRemaining$7(StreamWriteFunction.java:467) at org.apache.hudi.sink.StreamWriteFunction$$Lambda$1593/379154034.accept(Unknown Source) at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) at org.apache.hudi.sink.StreamWriteFunction.flushRemaining(StreamWriteFunction.java:463) at org.apache.hudi.sink.StreamWriteFunction.snapshotState(StreamWriteFunction.java:137) at org.apache.hudi.sink.bucket.ConsistentBucketStreamWriteFunction.snapshotState(ConsistentBucketStreamWriteFunction.java:69) at org.apache.hudi.sink.common.AbstractStreamWriteFunction.snapshotState(AbstractStreamWriteFunction.java:167) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:88) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:222) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:336) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:715) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:350) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1299) ...

Is hdfs write performance problematic? If I use simple index for spark, flink uses bucket index very quickly 1k-2krecord/s

xiaofan2022 avatar Sep 15 '24 03:09 xiaofan2022

The performance should be very near for consistent hashing and simple hashing, but from the stacktrace, it looks like the appending to files takes time.

danny0405 avatar Sep 15 '24 23:09 danny0405

So how can this be optimized? This speed is too slow

xiaofan2022 avatar Sep 18 '24 02:09 xiaofan2022

@beyond1920 can you help with the performance issue here?

danny0405 avatar Sep 19 '24 01:09 danny0405

@xiaofan2022 Did you schedule the clustering for expanding the consistent hashing ring already? Did you check the tablePath/.bucket_index/consistent_hashing_metadata for the number of consistent hashing nodes?

danny0405 avatar Sep 19 '24 03:09 danny0405

hdfs dfs -cat hdfs://nameservice1/apps/spark/warehouse/test.db/file_test/.hoodie/.bucket_index/consistent_hashing_metadata/00000000000000.hashing_meta | grep "value" | wc -l

result=>>>256

xiaofan2022 avatar Sep 19 '24 07:09 xiaofan2022

So you have 256 initial buckets?

danny0405 avatar Sep 19 '24 23:09 danny0405

Yes, I set up 'hoodie. Bucket. Index. Max. Num. Buckets' =' 32 ', 'the hoodie. Bucket. Index. Min. Num. Buckets' =' 4 ', but found that there are still 256 buckets

xiaofan2022 avatar Sep 20 '24 03:09 xiaofan2022

yeah, let's figure out the reason, too many buckets would not perform well for streaming write.

danny0405 avatar Sep 20 '24 04:09 danny0405

@xiaofan2022 Any updates on this ticket? Were you able to find out the reason why we see 256 buckets?

ad1happy2go avatar Oct 17 '24 10:10 ad1happy2go

I first create tables through spark and import full data. Then flink updates incremental data in real time, but the default bucket in spark is 4

xiaofan2022 avatar Nov 14 '24 07:11 xiaofan2022