flink-cdc icon indicating copy to clipboard operation
flink-cdc copied to clipboard

[FLINK-34820][postgressql] Not recycle but reuse fetcher for all data to improve performant

Open Mrart opened this issue 8 months ago • 11 comments

PR from https://github.com/apache/flink-cdc/pull/2571

Ignore unnecessary fetcher shutdowns To hide the element queue from the connector developers and make SplitFetcherManager the only owner class of the queue It's enough to refresh the schema when PostgresSourceFetchTaskContext#configure is called, and there is no need to refresh the schema afterwards.

Mrart avatar Apr 10 '25 05:04 Mrart

@Mrart @loserwang1024 i have a question. i think the performance issue is that, when add new table to config, the stream split will change to snapshot split , and when every snapshot split finished , it will add stream split once for every snapshot, and the stream split reader will be set null, so the stream split reader will be recreated. the schema will reload for the config tablefilter. and the change code how to performance the logic ? in IncrementalSourceSplitReader.java , method pollSplitRecords have follow code :

private ChangeEventRecords pollSplitRecords() throws InterruptedException {
        Iterator<SourceRecords> dataIt = null;
        if (currentFetcher == null) {
            // (1) Reads stream split firstly and then read snapshot split
            if (streamSplits.size() > 0) {
                // the stream split may come from:
                // (a) the initial stream split
                // (b) added back stream-split in newly added table process
                StreamSplit nextSplit = streamSplits.poll();
                submitStreamSplit(nextSplit);
            } else if (snapshotSplits.size() > 0) {
                submitSnapshotSplit(snapshotSplits.poll());
            } else {
                LOG.info("No available split to read.");
            }

            if (currentFetcher != null) {
                dataIt = currentFetcher.pollSplitRecords();
            } else {
                currentSplitId = null;
            }
            return dataIt == null ? finishedSplit() : forRecords(dataIt);
        } else if (currentFetcher instanceof IncrementalSourceScanFetcher) {
            // (2) try to switch to stream split reading util current snapshot split finished
            dataIt = currentFetcher.pollSplitRecords();
            if (dataIt != null) {
                // first fetch data of snapshot split, return and emit the records of snapshot split
                ChangeEventRecords records;
                if (context.isHasAssignedStreamSplit()) {
                    records = forNewAddedTableFinishedSplit(currentSplitId, dataIt);
                    closeScanFetcher();
                    closeStreamFetcher();
                } else {
                    records = forRecords(dataIt);
                    SnapshotSplit nextSplit = snapshotSplits.poll();
                    if (nextSplit != null) {
                        checkState(reusedScanFetcher != null);
                        submitSnapshotSplit(nextSplit);
                    } else {
                        closeScanFetcher();
                    }
                }
                return records;
            } else {
                return finishedSplit();
            }
        } else if (currentFetcher instanceof IncrementalSourceStreamFetcher) {
            // (3) switch to snapshot split reading if there are newly added snapshot splits
            dataIt = currentFetcher.pollSplitRecords();
            if (dataIt != null) {
                // try to switch to read snapshot split if there are new added snapshot
                SnapshotSplit nextSplit = snapshotSplits.poll();
                if (nextSplit != null) {
                    closeStreamFetcher();
                    LOG.info("It's turn to switch next fetch reader to snapshot split reader");
                    submitSnapshotSplit(nextSplit);
                }
                return ChangeEventRecords.forRecords(STREAM_SPLIT_ID, dataIt);
            } else {
                // null will be returned after receiving suspend stream event
                // finish current stream split reading
                closeStreamFetcher();
                return finishedSplit();
            }
        } else {
            throw new IllegalStateException("Unsupported reader type.");
        }
    }

    private void closeStreamFetcher() {
        if (reusedStreamFetcher != null) {
            LOG.debug("Close stream reader {}", reusedStreamFetcher.getClass().getCanonicalName());
            reusedStreamFetcher.close();
            if (currentFetcher == reusedStreamFetcher) {
                currentFetcher = null;
            }
            reusedStreamFetcher = null;
        }
    }

hql0312 avatar Apr 10 '25 06:04 hql0312

@Mrart @loserwang1024 i have a question. i think the performance issue is that, when add new table to config, the stream split will change to snapshot split , and when every snapshot split finished , it will add stream split once for every snapshot, and the stream split reader will be set null, so the stream split reader will be recreated. the schema will reload for the config tablefilter. and the change code how to performance the logic ? in IncrementalSourceSplitReader.java , method pollSplitRecords have follow code :

private ChangeEventRecords pollSplitRecords() throws InterruptedException {
        Iterator<SourceRecords> dataIt = null;
        if (currentFetcher == null) {
            // (1) Reads stream split firstly and then read snapshot split
            if (streamSplits.size() > 0) {
                // the stream split may come from:
                // (a) the initial stream split
                // (b) added back stream-split in newly added table process
                StreamSplit nextSplit = streamSplits.poll();
                submitStreamSplit(nextSplit);
            } else if (snapshotSplits.size() > 0) {
                submitSnapshotSplit(snapshotSplits.poll());
            } else {
                LOG.info("No available split to read.");
            }

            if (currentFetcher != null) {
                dataIt = currentFetcher.pollSplitRecords();
            } else {
                currentSplitId = null;
            }
            return dataIt == null ? finishedSplit() : forRecords(dataIt);
        } else if (currentFetcher instanceof IncrementalSourceScanFetcher) {
            // (2) try to switch to stream split reading util current snapshot split finished
            dataIt = currentFetcher.pollSplitRecords();
            if (dataIt != null) {
                // first fetch data of snapshot split, return and emit the records of snapshot split
                ChangeEventRecords records;
                if (context.isHasAssignedStreamSplit()) {
                    records = forNewAddedTableFinishedSplit(currentSplitId, dataIt);
                    closeScanFetcher();
                    closeStreamFetcher();
                } else {
                    records = forRecords(dataIt);
                    SnapshotSplit nextSplit = snapshotSplits.poll();
                    if (nextSplit != null) {
                        checkState(reusedScanFetcher != null);
                        submitSnapshotSplit(nextSplit);
                    } else {
                        closeScanFetcher();
                    }
                }
                return records;
            } else {
                return finishedSplit();
            }
        } else if (currentFetcher instanceof IncrementalSourceStreamFetcher) {
            // (3) switch to snapshot split reading if there are newly added snapshot splits
            dataIt = currentFetcher.pollSplitRecords();
            if (dataIt != null) {
                // try to switch to read snapshot split if there are new added snapshot
                SnapshotSplit nextSplit = snapshotSplits.poll();
                if (nextSplit != null) {
                    closeStreamFetcher();
                    LOG.info("It's turn to switch next fetch reader to snapshot split reader");
                    submitSnapshotSplit(nextSplit);
                }
                return ChangeEventRecords.forRecords(STREAM_SPLIT_ID, dataIt);
            } else {
                // null will be returned after receiving suspend stream event
                // finish current stream split reading
                closeStreamFetcher();
                return finishedSplit();
            }
        } else {
            throw new IllegalStateException("Unsupported reader type.");
        }
    }

    private void closeStreamFetcher() {
        if (reusedStreamFetcher != null) {
            LOG.debug("Close stream reader {}", reusedStreamFetcher.getClass().getCanonicalName());
            reusedStreamFetcher.close();
            if (currentFetcher == reusedStreamFetcher) {
                currentFetcher = null;
            }
            reusedStreamFetcher = null;
        }
    }

This pr is preventing the fetcher from reclaiming when it is idle

Mrart avatar Apr 10 '25 11:04 Mrart

and the change code how to performance the logic ?

Hi, @Mrart , @hql0312 . this pr is mainly focused on most situations. As I discussed in https://github.com/apache/flink-cdc/pull/2571, In snapshot split phase, after each snapshot split is finished, the fetcher will be closed. And newly added snapshot will open a new fetcher, and need lookup schema again. Snipaste_2025-04-10_19-56-51

What you say is a special case when enabling newly added table. To be honest, it's not a good idea for that a reader fequently switch between streaming split and snapshot split(unless just 1 parrellism). The switch costs a lot. I have too ideas:

  1. the first idea: do not assign newly added snapshot split to the reader which handling binlog. I will recently push forward a FLIP to let enumerator knows currently split distributions.
  2. the second idea: The splitReader can reading snapshot split and stream split into a queue in the same time, is no need to just read one in a time. It will need to change current thread model.

But this PR still can improve a lot, we can let it in at first.

loserwang1024 avatar Apr 10 '25 11:04 loserwang1024

and the change code how to performance the logic ?

Hi, @Mrart , @hql0312 . this pr is mainly focused on most situations. As I discussed in #2571, In snapshot split phase, after each snapshot split is finished, the fetcher will be closed. And newly added snapshot will open a new fetcher, and need lookup schema again. Snipaste_2025-04-10_19-56-51

What you say is a special case when enabling newly added table. To be honest, it's not a good idea for that a reader fequently switch between streaming split and snapshot split(unless just 1 parrellism). The switch costs a lot. I have too ideas:

  1. the first idea: do not assign newly added snapshot split to the reader which handling binlog. I will recently push forward a FLIP to let enumerator knows currently split distributions.
  2. the second idea: The splitReader can reading snapshot split and stream split into a queue in the same time, is no need to just read one in a time. It will need to change current thread model.

But this PR still can improve a lot, we can let it in at first.

if can hold streamsplit context when snapshot come ,and not recycle . the performance can improve.

in stream phase, the pg connector will costs too much time for load schema. if we can hold schema in cache ,the cost will reduce.

the logic is right? @loserwang1024

hql0312 avatar Apr 11 '25 00:04 hql0312

and the change code how to performance the logic ?

Hi, @Mrart , @hql0312 . this pr is mainly focused on most situations. As I discussed in #2571, In snapshot split phase, after each snapshot split is finished, the fetcher will be closed. And newly added snapshot will open a new fetcher, and need lookup schema again. Snipaste_2025-04-10_19-56-51 What you say is a special case when enabling newly added table. To be honest, it's not a good idea for that a reader fequently switch between streaming split and snapshot split(unless just 1 parrellism). The switch costs a lot. I have too ideas:

  1. the first idea: do not assign newly added snapshot split to the reader which handling binlog. I will recently push forward a FLIP to let enumerator knows currently split distributions.
  2. the second idea: The splitReader can reading snapshot split and stream split into a queue in the same time, is no need to just read one in a time. It will need to change current thread model.

But this PR still can improve a lot, we can let it in at first.

if can hold streamsplit context when snapshot come ,and not recycle . the performance can improve.

in stream phase, the pg connector will costs too much time for load schema. if we can hold schema in cache ,the cost will reduce.

the logic is right? @loserwang1024 Looking forward to your flip, we can discuss it in dev email list. @hql0312

Mrart avatar Apr 14 '25 08:04 Mrart

and the change code how to performance the logic ?

Hi, @Mrart , @hql0312 . this pr is mainly focused on most situations. As I discussed in #2571, In snapshot split phase, after each snapshot split is finished, the fetcher will be closed. And newly added snapshot will open a new fetcher, and need lookup schema again. Snipaste_2025-04-10_19-56-51 What you say is a special case when enabling newly added table. To be honest, it's not a good idea for that a reader fequently switch between streaming split and snapshot split(unless just 1 parrellism). The switch costs a lot. I have too ideas:

  1. the first idea: do not assign newly added snapshot split to the reader which handling binlog. I will recently push forward a FLIP to let enumerator knows currently split distributions.
  2. the second idea: The splitReader can reading snapshot split and stream split into a queue in the same time, is no need to just read one in a time. It will need to change current thread model.

But this PR still can improve a lot, we can let it in at first.

if can hold streamsplit context when snapshot come ,and not recycle . the performance can improve. in stream phase, the pg connector will costs too much time for load schema. if we can hold schema in cache ,the cost will reduce. the logic is right? @loserwang1024 Looking forward to your flip, we can discuss it in dev email list. @hql0312

@Mrart ok. in my branch, i add a static object to hold the schema , and can avoid load many times

hql0312 avatar Apr 16 '25 07:04 hql0312

i add a static object to hold the schema

@hql0312 To be honest, I doesn't recommanded that use static object in cdc connectot. It will influence between tasks(when in session mode), and also will influence multiple slots in same jvm. @leonardBang , CC, WDYT?

loserwang1024 avatar Apr 16 '25 08:04 loserwang1024

i add a static object to hold the schema

@hql0312 To be honest, I doesn't recommanded that use static object in cdc connectot. It will influence between tasks(when in session mode), and also will influence multiple slots in same jvm. @leonardBang , CC, WDYT?

@loserwang1024 because source operator, in stream phase, only one task will hold the stream split , and the static object Will be dynamically created , even the source operator is assigned to other task.

and the logic only exists in my branch . but it runs successfully.

hql0312 avatar Apr 17 '25 01:04 hql0312

This pull request has been automatically marked as stale because it has not had recent activity for 120 days. It will be closed in 60 days if no further activity occurs.

github-actions[bot] avatar Aug 21 '25 00:08 github-actions[bot]

This pull request has been closed because it has not had recent activity. You could reopen it if you try to continue your work, and anyone who are interested in it are encouraged to continue work on this pull request.

github-actions[bot] avatar Oct 20 '25 00:10 github-actions[bot]

@Mrart @loserwang1024 Do we still need this PR ?

leonardBang avatar Nov 03 '25 06:11 leonardBang