flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-37900][rocksdb] configurable jitter in rocksdb state uploader

Open HuangZhenQiu opened this issue 5 months ago • 3 comments

What is the purpose of the change

Support configurable jitter in rocksdb state uploader

Brief change log

  • Add a configurable jitter for rocksdb state uploader

Verifying this change

  • *Added testJitterApply in RocksDBStateUploaderTest.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (docs)

HuangZhenQiu avatar Jun 05 '25 14:06 HuangZhenQiu

CI report:

  • fe41135690ff54e57f6a983747dc7c86e4e258e7 Azure: SUCCESS
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Jun 05 '25 14:06 flinkbot

@mxm As you suggested the random should be initialized without parameter, so the seed will be different for each task manager. PR was not up to date.

HuangZhenQiu avatar Jun 17 '25 13:06 HuangZhenQiu

Rebased master

HuangZhenQiu avatar Jun 18 '25 17:06 HuangZhenQiu

Rebased master again.

HuangZhenQiu avatar Jun 23 '25 01:06 HuangZhenQiu

@mxm @HuangZhenQiu my remaining concern is that if this parameter is not set correctly with respect to the checkpoint interval, there may be integrity issues relating to checkpoints being processed out of order. If the framework prevents this or we can add config validation to avoid this, then I can approve this. What do you both think about this risk?

Maybe we could you add a validation check that the jitter should be at least 60 times less than the checkpoint interval and document this. @HuangZhenQiu Do you think this would keep us in the risk free zone?

davidradl avatar Jun 23 '25 12:06 davidradl

@davidradl Could you explain how the checkpoints would be processed out of order? How would one checkpoint overtake the other? IMHO this would merely slow down or time out checkpointing, but it shouldn't lead to data integrity issues.

mxm avatar Jun 23 '25 18:06 mxm

Hi @mxm, I am not an expert at this area - so was just asking the question. It sounds like the framework prevents any integrity issue. As I have no evidence around this tat anything untoward will happen and based on your reassurance, I will approve,

davidradl avatar Jun 25 '25 13:06 davidradl

@mxm @davidradl Thanks for the comments. Updated accordingly.

HuangZhenQiu avatar Jun 28 '25 05:06 HuangZhenQiu

@Zakelly

Thanks for sharing your thoughts on this PR. The primary goal here is to address a bottleneck related to the remote object store. Interestingly, the issue isn't with upload speed, but rather with the metadata database (MDDB) used to fetch tokens for each new file. In our largest job, we have around 1,000 TaskManagers (TMs), and each TM attempts to upload roughly 50 state files simultaneously. This creates a significant QPS load on the MDDB for a single bucket. As a result, not only does this lead to checkpoint failures in the affected job, but it also causes interference with other jobs — effectively acting as a noisy neighbor. After apply jitter configured with 10 seconds, we find the QPS to MDDB reduced an order of magnitude. Given this scenario, I believe introducing jitter is a more reasonable approach. It allows us to mitigate the MDDB pressure without sacrificing upload performance by reducing the number of upload threads.

HuangZhenQiu avatar Jul 01 '25 13:07 HuangZhenQiu

@HuangZhenQiu

Thanks for your answer.

I see your point about the metadata controller (e.g. NameNode) being the bottleneck. However this is still under the speed control story, given QPS/RPS and BPS are two aspects of the limit. This essentially is the QPS by jitter is lower than the case without jitter, mitigating the pressure on NN, but also slower the checkpoint. This is kind of sacrificing the upload performance, and I believe it can also be achieved effectively by shrinking the thread pool. 4 threads to 2 cuts down the QPS by half for example. Did you try this in your environment?

I'm not doubting the value of jitter, it is a nice mechanism for backoff when some limit is reached or waiting for retry. It can mitigate the issue at hand but not the most suitable one. So I'd prefer holding this PR and considering a jitter in the comprehensive speed control design.

I will kickoff a FLIP discussion about the design I proposed above, targeting next release. And we may cooperate on that if you are interested.

Zakelly avatar Jul 02 '25 06:07 Zakelly

It will take longer end to end time for single TM to finish the state upload if reduce thread num from 4 to 2. It would cause checkpoint timeout for the largest job in worst case. I guess you want to generalize the solution for ForStStateBackend. If so, I agree to use a FLIP to discuss a more comprehensive solution, if you want to. But honestly, the speed control in ForStStateBackend is different with the jitter for rocksdb state backend as they will use different Flink configurations. If there is no side effect, shall we merge the diff first?

HuangZhenQiu avatar Jul 02 '25 06:07 HuangZhenQiu

It will take longer end to end time for single TM to finish the state upload if reduce thread num from 4 to 2. It would cause checkpoint timeout for the largest job in worst case. I guess you want to generalize the solution for ForStStateBackend. If so, I agree to use a FLIP to discuss a more comprehensive solution, if you want to. But honestly, the speed control in ForStStateBackend is different with the jitter for rocksdb state backend as they will use different Flink configurations. If there is no side effect, shall we merge the diff first?

Does a jitter slow down the end to end time of checkpoint? In current implementation, one takes the executor but do a sleep for a while, so eventually not all the threads are working at the same time. Thus what is the essential difference of current plan compared with shrinking the thread pool.

I'm not proposing to do something with ForSt or any specified state backends. I'm suggesting controlling that in CheckpointStreamFactory and its CheckpointStateOutputStream, and let JM coordinate all the TM to achieve a overall QPS or BPS. That applies for all the state backend at one time. It is a huge problem that we should resolve, and I suggest not doing it in a fragmented way. It is not easy to use or maintain two mechanism, also retire one, so we'd better be cautious introducing one.

Zakelly avatar Jul 02 '25 07:07 Zakelly