flink-cdc icon indicating copy to clipboard operation
flink-cdc copied to clipboard

[postgres] Remove unnecessary schema fresh to improve performance.

Open lzshlzsh opened this issue 2 years ago • 13 comments

PR of #2570

Motivation

It's very time-consuming for postgres to refresh schema if there are many tables to read. According to our testing, refreshing 400 tables takes 15 minutes. Because it takes a long time to refresh the current table's schema when reading a chunk in the scan stage, the data rate shows the following sawtooth pattern. Therefore, we need to minimize unnecessary shema refreshes as much as possible。

image

Solution

Firstly, the origin schema of postgres cdc is the schema filed of PostgresSourceFetchTaskContext, which is created and refreshed when PostgresSourceFetchTaskContext#configure is called, and both the schema refresh of scan stage and stream stage are refreshing the schema of PostgresSourceFetchTaskContext. A new PostgresSourceFetchTaskContext is created in IncrementalSourceSplitReader#checkSplitOrStartNext for each split (both SnapshotSplit and StreamSplit). For snapshot splits, even with the condition of whether the currentFetcher is equal to null, in many cases it still leads to the re creation of the PostgresSourceFetchTaskContext because the IncrementalSourceSplitReader is often discarded by the Flink kernal when a snapshot chunk is finished and become idle. See

  1. SourceReaderBase#pollNext -> SourceReaderBase#finishedOrAvailableLater -> SplitFetcherManager#maybeShutdownFinishedFetchers -> SplitFetcher#shutdown and SplitFetcher is removed.
  2. IncrementalSourceSplitReader(implements SplitReader) is SplitFetcher#splitReader and removed also.

Based on the analysis of the above, we get two optimizations.

  1. It's enough to refresh the schema when PostgresSourceFetchTaskContext#configure is called, and there is no need to refresh the schema afterwards.
  2. Reuse PostgresSourceFetchTaskContext between SnapshotSplits based on sourceConfig, as PostgresSourceFetchTaskContext is created for almost every SnapshotSplit.

lzshlzsh avatar Oct 21 '23 16:10 lzshlzsh

@loserwang1024 @ruanhang1993 @leonardBang @minchowang Would you help to have a review when you have time ?

lzshlzsh avatar Oct 21 '23 16:10 lzshlzsh

Add another schema optimization to optimize the time consumption of schema acquisition in multi table and whole database synchronization scenarios. When the getTableSchema(TableId tableId) function retrieves the schema of a table, the pg API actually returns all the matched table schemas. Currently, only the schema of the current table to be retrieved is cached. This will result in another call to the pg API when retrieving the schema of the next table, as it does not exist in the cache. n tables will be called n times. The call to the pg API is very time-consuming, resulting in the problem of obtaining all table schemas in the incremental phase taking longer. The more tables, the longer the time.

lzshlzsh avatar Oct 25 '23 13:10 lzshlzsh

@lzshlzsh, thank you for your PR. It helps a lot. I completely agree with your schema optimization proposal to reduce the time taken for schema acquisition in scenarios involving multi-table and whole database synchronization.

However, I have a different opinion regarding your schema cache design. From what you have explained, the main issue seems to be that another snapshot split is requested only after the completion of a previous split. During this period, the fetcher thread becomes idle and is recycled.

So, if we can prevent the fetcher thread from being recycled, the problem will be resolved. This also solves the problem of incomplete consideration in the design of IncrementalSourceSplitReader#checkSplitOrStartNext, which not considered fetcher recycle.

We can achieve this by overriding the SplitFetcherManager#maybeShutdownFinishedFetchers method. Since SplitFetcherManager is currently internal, I have opened an issue to make it public in the Flink community. You can find it here: FLINK-33465 Make SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving.. We can continue to push after this Jira.

This is just my personal opinion, and I would be happy to discuss it further with you. @leonardBang , @PatrickRen , @ruanhang1993 , CC, WDYT ?

loserwang1024 avatar Nov 07 '23 03:11 loserwang1024

Is there any update on this issue? Now each splitReader is relatively independent. If the fetcher is no longer recycled, which states will be reset and which states will be retained?

cobolbaby avatar Dec 21 '23 07:12 cobolbaby

@cobolbaby

Is there any update on this issue?

I am pushing forward FLIP-389 Annotate SingleThreadFetcherManager as PublicEvolving now.

which states will be reset and which states will be retained?

IncrementalSourceSplitReader#checkSplitOrStartNext already do it, not to recycle ScanFetchTask(but each generate new Snapshot Split).When it turns to stream split, recycle ScanFetchTask and renew StreamFetchTask

loserwang1024 avatar Dec 21 '23 08:12 loserwang1024

Hi @lzshlzsh, could you please rebase this PR with latest master branch before it could be merged?

Also cc @loserwang1024

yuxiqian avatar Apr 26 '24 05:04 yuxiqian

This pull request has been automatically marked as stale because it has not had recent activity for 60 days. It will be closed in 30 days if no further activity occurs.

github-actions[bot] avatar Jul 17 '24 00:07 github-actions[bot]

@lzshlzsh FLINK-33465 Make SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving has already done in Flink. Maybe we can push forward this job again.

loserwang1024 avatar Aug 02 '24 01:08 loserwang1024