flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-23633][FLIP-150][connector/common] HybridSource: Support dynamic stop position in FileSource

Open xinbinhuang opened this issue 3 years ago • 12 comments

What is the purpose of the change

Support dynamic stop position in FileSource

Brief change log

  • Added a new interface DynamicHybridSourceReader interface to allow implementor to participate dynamic initialization
    • implementation is added for SourceReaderBase so most reader can participate automatically
  • The high-level mechanism works as follow:
    • On each checkpoint, HybridSourceReader retrieve the finished states (marked with HybridSourceSplit.isFinished = true) from the underlying reader and checkpoint them for persistent along with the unfinished states.
    • Upon source switch, HybridSourceReader will send all the finished splits in SourceReaderFinishedEvent to the enumerator.
    • Enumerator will pass along those finished splits to in SourceSwitchContext to the next source.

Verifying this change

  • Current test cases pass
  • Test split serialization backward compatibility
  • Adjust one existing test case to check SourceSwitchContext has the correct finished splits from the previous source
  • TODO: plan to add more tests

Does this pull request potentially affect one of the following parts:

  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes
  • The serializers: yes

Documentation

  • If yes, how is the feature documented? : should add documentation after change signed off

xinbinhuang avatar Jul 17 '22 03:07 xinbinhuang

CI report:

  • 470583f9a46bf94ec00b4e5d77c79b942b2d377a Azure: SUCCESS
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Jul 17 '22 03:07 flinkbot

@flinkbot run azure

xinbinhuang avatar Jul 18 '22 01:07 xinbinhuang

cc: @tweise This is the first draft of the implementation. PTAL! (i'm planning to add more tests in the next a few days)

xinbinhuang avatar Jul 18 '22 01:07 xinbinhuang

@flinkbot run azure

xinbinhuang avatar Jul 19 '22 00:07 xinbinhuang

@flinkbot run azure

xinbinhuang avatar Jul 20 '22 19:07 xinbinhuang

@xinbinhuang looking at all the modifications to HybridSource itself I think it is necessary to take a step back here and discuss the design aspects first.

The underlying sources are a sequence of bounded sources, optionally followed by an unbounded source. Therefore, there should be no need to have a "dynamic reader" that does special things. The enumerator knows upfront which splits need to be processed and when it is finished.

The HybridSource already has the support to transfer the end position to the next enumerator. That was part of the FLIP and the details can be found https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source and you can find an example in the tests: HybridSourceITCase.sourceWithDynamicSwitchPosition

tweise avatar Jul 27 '22 01:07 tweise

@tweise Thank you so much for reviewing the PR! I just realized that I might have misread the JIRA issue as HybridSource: Support dynamic stop position in HybridSource instead of HybridSource: Support dynamic stop position in FileSource. So this PR actually aimed to design an generic interface to allow any sources to participate in dynamic source switch. With that in mind. Let me explain how I came up with the current design & implementation.

After reviewing the current logic of the hybrid source (it's amazing work 🎉 🎉 !!), I understand that the current implementation support transferring the end position to the next enumerator. However, it lacks the mechanism to know where is the end position (i.e. offset for a kafka partition). And these "end positions" are probably unknown beforehand, or it would be the same as fixed start position? Therefore, I think the key is to transfer the "end position" to the enumerator from source reader during source switch. There are a few points to consider:

  1. What would be the "end position" to transfer to the next source?

I believe this varied by use cases. Some may find it enough to use split.path, while some may require to derive the timestamp or offset from the content of the file (i.e. kafka archive, and the implementation can vary by companies.). Since we can't anticipate all use cases, passing all finished splits seem to be a reasonable solution here and let the developer to decide how to derive the position from them.

  1. Where to make the changes?

I aimed to implement this s.t. most existing sources can benefit from it out of the box with minimal changes and no breaking changes to them.

  1. How to store the "finished splits" before source switch?

Per FLIP-27, the enumerator only knows what splits to consume but not the actual progress - only source reader knows about it. So we need to store them and transfer them to the enumerator during source switch. However, most existing sources implements SourceReaderBase and it purges them from state once it reaches the end position. One naive solution would be to adjust SourceReaderBase to also store finished splits into the state. However, this'll affect all sources running in production and is probably a big backward incompatible changes right away. Therefore, I decided to store them only in the HybridSourceReader, and the existing sources only need to implement one method (DynamicHybridSourceReader::getFinishedSplits) that allows the finished splits to be extracted during checkpoint and source switch. This process is transparent to all existing sources, and only happens when used with the HybridSource.

With the above points, the current implementation works as follow:

  • On each checkpoint, HybridSourceReader retrieve the finished states (marked with HybridSourceSplit.isFinished = true) from the underlying reader and checkpoint them for persistent along with the unfinished states.
  • Upon source switch, HybridSourceReader will send all the finished splits in SourceReaderFinishedEvent to the enumerator.
  • Enumerator will pass along those finished splits in SourceSwitchContext to the next source. And the next source can use the splits to derive the start positions.

Changes required on existing non hybrid sources:

  • Implements DynamicHybridSourceReader::getFinishedSplits on SourceReaderBase.

API changes on HybridSource

  • Added SourceSwitchContext::getPreviousSplits which returns finished splits from the previous source.

It's a lot of words, so really appreciate your patience for reading this. Let me know if there are anything unclear, I'm happy to chat more about this! (Also, I'm happy to start discussion thread on the dev mail list if you think it's necessary.)

xinbinhuang avatar Jul 27 '22 05:07 xinbinhuang

@xinbinhuang thanks for describing the thought process.

As you already mentioned, the goal of the JIRA was to add the passing of end position to file source and when we implemented FLIP-150 we presumably already added everything that is required to achieve that goal to the HybridSource.

I think we need to zoom in why or why not the enumerator knows the actual stop position without involvement of the reader.

It is correct that we do not know the stop position at graph construction time or otherwise we would not need any runtime behavior. However, the enumerator already knows what splits have been processed because it has passed those to the readers and the readers have finished the splits that they got assigned. Remember that we are dealing with bounded sources. So there really should be no need to pass splits back to the enumerator. Now it will depend on the specific type of source how the enumerator communicates the end position to the next enumerator. In most typical cases that will simply come from the partition metadata (iceberg, files).

tweise avatar Jul 27 '22 15:07 tweise

@tweise was distracted by other works. Let me get back to this.

I think we need to zoom in why or why not the enumerator knows the actual stop position without involvement of the reader.

Our use case is to expose end offset or timestamp based on the content of the file. We're archiving out-of-retention messages into S3 using a long-running job. Normally there are multiple messages inside the files, and the timestamp of the last message may not align with the file metadata. So we'll need to actually parse the file content to find out either the last timestamp or offset. That's why I think sending back the split would make sense, since it's already processed there.

Do you have any recommendations around this? Or do you think this's too complex to implement?

xinbinhuang avatar Oct 07 '22 20:10 xinbinhuang

@xinbinhuang @tweise What's the status of this PR? Can we somehow bring this forward and resolve it for 1.17?

MartijnVisser avatar Dec 15 '22 15:12 MartijnVisser

@MartijnVisser I would love to move it forward 😄 @tweise would you live to take a look again?

xinbinhuang avatar Dec 16 '22 17:12 xinbinhuang

How is this going?

waywtdcc avatar Dec 26 '23 06:12 waywtdcc