[SUPPORT] How to configure spark and flink to write mor tables using bucket indexes?
I want to use flink and spark to write to the mor table, and use bucket CONSISTENT_HASHING for the index, but I find that spark is very fast to write the full amount and flink is very slow(flink write 100record/s) to write increments. spark sql:
CREATE TABLE test.tableA ()
USING hudi
TBLPROPERTIES (
'connector' = 'hudi',
'index.type'='BUCKET',
'hoodie.index.type'='BUCKET',
'hoodie.index.bucket.engine'='CONSISTENT_HASHING',
'hoodie.datasource.write.recordkey.field' = '',
'path' = '',
'preCombineField' = 'create_time',
'precombine.field' = 'create_time',
'primaryKey' = '',
'table.type' = 'MERGE_ON_READ',
'write.rate.limit'='10000',--flink配置
'write.tasks'='2',--flink配置
'write.utc-timezone'='false',
'type' = 'mor');
How to optimize?
Did you have chance to check the thread dump of the operators?
"consistent_bucket_write: test.fin_ipr_inmaininfo_test (1/2)#0" Id=89 TIMED_WAITING on java.util.LinkedList@37d9fd7 at java.lang.Object.wait(Native Method) - waiting on java.util.LinkedList@37d9fd7 at org.apache.hadoop.hdfs.DataStreamer.waitForAckedSeqno(DataStreamer.java:924) at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:692) at org.apache.hadoop.hdfs.DFSOutputStream.hsync(DFSOutputStream.java:587) at org.apache.hadoop.fs.FSDataOutputStream.hsync(FSDataOutputStream.java:145) at org.apache.hadoop.fs.FSDataOutputStream.hsync(FSDataOutputStream.java:145) at org.apache.hudi.common.table.log.HoodieLogFormatWriter.flush(HoodieLogFormatWriter.java:261) at org.apache.hudi.common.table.log.HoodieLogFormatWriter.appendBlocks(HoodieLogFormatWriter.java:194) at org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:479) at org.apache.hudi.io.HoodieAppendHandle.doAppend(HoodieAppendHandle.java:450) at org.apache.hudi.table.action.commit.delta.BaseFlinkDeltaCommitActionExecutor.handleUpdate(BaseFlinkDeltaCommitActionExecutor.java:54) at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.handleUpsertPartition(BaseFlinkCommitActionExecutor.java:191) at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.execute(BaseFlinkCommitActionExecutor.java:109) at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.execute(BaseFlinkCommitActionExecutor.java:71) at org.apache.hudi.table.action.commit.FlinkWriteHelper.write(FlinkWriteHelper.java:77) at org.apache.hudi.table.action.commit.delta.FlinkUpsertDeltaCommitActionExecutor.execute(FlinkUpsertDeltaCommitActionExecutor.java:51) at org.apache.hudi.table.HoodieFlinkMergeOnReadTable.upsert(HoodieFlinkMergeOnReadTable.java:73) at org.apache.hudi.client.HoodieFlinkWriteClient.upsert(HoodieFlinkWriteClient.java:148) at org.apache.hudi.sink.StreamWriteFunction.lambda$initWriteFunction$1(StreamWriteFunction.java:192) at org.apache.hudi.sink.StreamWriteFunction$$Lambda$1275/530894042.apply(Unknown Source) at org.apache.hudi.sink.bucket.ConsistentBucketStreamWriteFunction.lambda$writeBucket$0(ConsistentBucketStreamWriteFunction.java:80) at org.apache.hudi.sink.bucket.ConsistentBucketStreamWriteFunction$$Lambda$1599/372489614.apply(Unknown Source) at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267) at java.util.Collections$2.tryAdvance(Collections.java:4717) at java.util.Collections$2.forEachRemaining(Collections.java:4725) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.hudi.sink.bucket.ConsistentBucketStreamWriteFunction.writeBucket(ConsistentBucketStreamWriteFunction.java:81) at org.apache.hudi.sink.StreamWriteFunction.lambda$flushRemaining$7(StreamWriteFunction.java:467) at org.apache.hudi.sink.StreamWriteFunction$$Lambda$1593/379154034.accept(Unknown Source) at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) at org.apache.hudi.sink.StreamWriteFunction.flushRemaining(StreamWriteFunction.java:463) at org.apache.hudi.sink.StreamWriteFunction.snapshotState(StreamWriteFunction.java:137) at org.apache.hudi.sink.bucket.ConsistentBucketStreamWriteFunction.snapshotState(ConsistentBucketStreamWriteFunction.java:69) at org.apache.hudi.sink.common.AbstractStreamWriteFunction.snapshotState(AbstractStreamWriteFunction.java:167) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:88) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:222) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:336) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:715) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:350) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1299) ...
Is hdfs write performance problematic? If I use simple index for spark, flink uses bucket index very quickly 1k-2krecord/s
The performance should be very near for consistent hashing and simple hashing, but from the stacktrace, it looks like the appending to files takes time.
So how can this be optimized? This speed is too slow
@beyond1920 can you help with the performance issue here?
@xiaofan2022 Did you schedule the clustering for expanding the consistent hashing ring already? Did you check the tablePath/.bucket_index/consistent_hashing_metadata for the number of consistent hashing nodes?
hdfs dfs -cat hdfs://nameservice1/apps/spark/warehouse/test.db/file_test/.hoodie/.bucket_index/consistent_hashing_metadata/00000000000000.hashing_meta | grep "value" | wc -l
result=>>>256
So you have 256 initial buckets?
Yes, I set up 'hoodie. Bucket. Index. Max. Num. Buckets' =' 32 ', 'the hoodie. Bucket. Index. Min. Num. Buckets' =' 4 ', but found that there are still 256 buckets
yeah, let's figure out the reason, too many buckets would not perform well for streaming write.
@xiaofan2022 Any updates on this ticket? Were you able to find out the reason why we see 256 buckets?
I first create tables through spark and import full data. Then flink updates incremental data in real time, but the default bucket in spark is 4