[FLINK-36112][Connector/Filesystem].Add Support for CreateFlag.NO_LOCAL_WRITE in FLINK on YARN's File Creation to Manage Disk Space and Network Load in Labeled YARN Nodes
Add Support for CreateFlag.NO_LOCAL_WRITE in FLINK on YARN's File Creation to Manage Disk Space and Network Load in Labeled YARN Nodes
Description
I am currently using Apache Flink on Yarn to write files into Hadoop. The Flink application runs on a labeled YARN queue.
During operation, it has been observed that the local disks on these labeled nodes get filled up quickly, and the network load is significantly high. This issue arises because Hadoop prioritizes writing files to the local node first, and the number of these labeled nodes is quite limited.
Problem
The current behavior leads to inefficient disk space utilization and high network traffic on these few labeled nodes, which could potentially affect the performance and reliability of the application. As shown in the picture, the host I circled have a average net_bytes_sent speed 1.2GB/s while the others are just 50MB/s, this imbalance in network and disk space nearly destroyed the whole cluster.
this patch is to solve FLINK-36112 Also this modification has already been discussed in Hadoop's pull request
What is the purpose of the change
This pull request makes FileSink that writes to Hadoop Filesystem able to choose whether write the first replica on the local machine or choose to write to other dataNodes of the cluster randomly. This way we avoid the host that have both taskManager process and DataNode process having an extremely high network load average. (especially in the case that we run Flink on a labeled yarn queue)
Brief change log
- add a new Interface in IFileSystem and FileSystem to support the config whether we want to write the first replica to other machines
- add a new Function of FileSink to be able to set the value that we want to write to other nodes for the first replica.
- modify HadoopRecoverableWriter and HadoopRecoverableFsDataOutputStream to implement the create interface that can set EnumSet of CreateFlag of DistributedHadoopFileSystem of Hadoop project
Verifying this change
Please make sure both new and modified tests in this PR follow the conventions for tests defined in our code quality guide.
This change added tests and can be verified as follows:
- modified unit tests for flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriterTest.java to check if we can set the value of NoLocalWrite correctly.
- Tested on our local cluster and run for three days, the dataNodes now have a more average disk use and network load average.
- I did't add IT for HadoopRecoverableFsDataOutputStream because we start the mini cluster on our local machine, which means that nameNode isn't able to decide which dataNode is not running on the same machine.
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): ( no)
- The public API, i.e., is any changed class annotated with
@Public(Evolving): (no) - The serializers: ( no)
- The runtime per-record code paths (performance sensitive): (don't know)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
- The S3 file system connector: ( no)
Documentation
- Does this pull request introduce a new feature? (yes / no)
- If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
CI report:
- fa7409179ea4cd68116418fd30d31394983e9d59 UNKNOWN
- 224c99cc3b81e2ec6166e1e94f02906c801f0e28 Azure: SUCCESS
Bot commands
The @flinkbot bot supports the following commands:@flinkbot run azurere-run the last Azure build
@xuyangzhong Hi, would you please help me check this issue? thanks
@xintongsong Hi, would you please help me check this issue? Thanks
Hi @liangyu-1,
Sorry for the late reply. I must have overlooked the email notification.
Thanks for working on this PR. I've left some comments. PTAL. @xintongsong Thanks for your reply, I have modified my code as you suggest.
But in this pr I have to use Mockito as I explained in the code reviews, I didn't find a way to make sure that miniCluster returns exactly one dataNode to be the LocalNode each time it attempt to write a new block.
@liangyu-1 Thanks for addressing my comments. I think the PR is very close to a mergable state. I left only one minor inline comment. In addition, the CI is failing due to the usage of Mockito. I personally would be fine with using Mockito in this case. In order to fix the CI, I think we need to append the file that imports Mockito to the suppress file list for the IllegalImport check in
tools/maven/suppressions.xml.
@xintongsong
Thanks for the suggestion, I am sorry that I didn't notice this before.
I have just changed file suppressions.xml and it should be ok this time.
@flinkbot run azure re-run the last Azure build