[FLINK-34820][postgressql] Not recycle but reuse fetcher for all data to improve performant
PR from https://github.com/apache/flink-cdc/pull/2571
Ignore unnecessary fetcher shutdowns To hide the element queue from the connector developers and make SplitFetcherManager the only owner class of the queue It's enough to refresh the schema when PostgresSourceFetchTaskContext#configure is called, and there is no need to refresh the schema afterwards.
@Mrart @loserwang1024 i have a question. i think the performance issue is that, when add new table to config, the stream split will change to snapshot split , and when every snapshot split finished , it will add stream split once for every snapshot, and the stream split reader will be set null, so the stream split reader will be recreated. the schema will reload for the config tablefilter. and the change code how to performance the logic ? in IncrementalSourceSplitReader.java , method pollSplitRecords have follow code :
private ChangeEventRecords pollSplitRecords() throws InterruptedException {
Iterator<SourceRecords> dataIt = null;
if (currentFetcher == null) {
// (1) Reads stream split firstly and then read snapshot split
if (streamSplits.size() > 0) {
// the stream split may come from:
// (a) the initial stream split
// (b) added back stream-split in newly added table process
StreamSplit nextSplit = streamSplits.poll();
submitStreamSplit(nextSplit);
} else if (snapshotSplits.size() > 0) {
submitSnapshotSplit(snapshotSplits.poll());
} else {
LOG.info("No available split to read.");
}
if (currentFetcher != null) {
dataIt = currentFetcher.pollSplitRecords();
} else {
currentSplitId = null;
}
return dataIt == null ? finishedSplit() : forRecords(dataIt);
} else if (currentFetcher instanceof IncrementalSourceScanFetcher) {
// (2) try to switch to stream split reading util current snapshot split finished
dataIt = currentFetcher.pollSplitRecords();
if (dataIt != null) {
// first fetch data of snapshot split, return and emit the records of snapshot split
ChangeEventRecords records;
if (context.isHasAssignedStreamSplit()) {
records = forNewAddedTableFinishedSplit(currentSplitId, dataIt);
closeScanFetcher();
closeStreamFetcher();
} else {
records = forRecords(dataIt);
SnapshotSplit nextSplit = snapshotSplits.poll();
if (nextSplit != null) {
checkState(reusedScanFetcher != null);
submitSnapshotSplit(nextSplit);
} else {
closeScanFetcher();
}
}
return records;
} else {
return finishedSplit();
}
} else if (currentFetcher instanceof IncrementalSourceStreamFetcher) {
// (3) switch to snapshot split reading if there are newly added snapshot splits
dataIt = currentFetcher.pollSplitRecords();
if (dataIt != null) {
// try to switch to read snapshot split if there are new added snapshot
SnapshotSplit nextSplit = snapshotSplits.poll();
if (nextSplit != null) {
closeStreamFetcher();
LOG.info("It's turn to switch next fetch reader to snapshot split reader");
submitSnapshotSplit(nextSplit);
}
return ChangeEventRecords.forRecords(STREAM_SPLIT_ID, dataIt);
} else {
// null will be returned after receiving suspend stream event
// finish current stream split reading
closeStreamFetcher();
return finishedSplit();
}
} else {
throw new IllegalStateException("Unsupported reader type.");
}
}
private void closeStreamFetcher() {
if (reusedStreamFetcher != null) {
LOG.debug("Close stream reader {}", reusedStreamFetcher.getClass().getCanonicalName());
reusedStreamFetcher.close();
if (currentFetcher == reusedStreamFetcher) {
currentFetcher = null;
}
reusedStreamFetcher = null;
}
}
@Mrart @loserwang1024 i have a question. i think the performance issue is that, when add new table to config, the stream split will change to snapshot split , and when every snapshot split finished , it will add stream split once for every snapshot, and the stream split reader will be set null, so the stream split reader will be recreated. the schema will reload for the config tablefilter. and the change code how to performance the logic ? in IncrementalSourceSplitReader.java , method pollSplitRecords have follow code :
private ChangeEventRecords pollSplitRecords() throws InterruptedException { Iterator<SourceRecords> dataIt = null; if (currentFetcher == null) { // (1) Reads stream split firstly and then read snapshot split if (streamSplits.size() > 0) { // the stream split may come from: // (a) the initial stream split // (b) added back stream-split in newly added table process StreamSplit nextSplit = streamSplits.poll(); submitStreamSplit(nextSplit); } else if (snapshotSplits.size() > 0) { submitSnapshotSplit(snapshotSplits.poll()); } else { LOG.info("No available split to read."); } if (currentFetcher != null) { dataIt = currentFetcher.pollSplitRecords(); } else { currentSplitId = null; } return dataIt == null ? finishedSplit() : forRecords(dataIt); } else if (currentFetcher instanceof IncrementalSourceScanFetcher) { // (2) try to switch to stream split reading util current snapshot split finished dataIt = currentFetcher.pollSplitRecords(); if (dataIt != null) { // first fetch data of snapshot split, return and emit the records of snapshot split ChangeEventRecords records; if (context.isHasAssignedStreamSplit()) { records = forNewAddedTableFinishedSplit(currentSplitId, dataIt); closeScanFetcher(); closeStreamFetcher(); } else { records = forRecords(dataIt); SnapshotSplit nextSplit = snapshotSplits.poll(); if (nextSplit != null) { checkState(reusedScanFetcher != null); submitSnapshotSplit(nextSplit); } else { closeScanFetcher(); } } return records; } else { return finishedSplit(); } } else if (currentFetcher instanceof IncrementalSourceStreamFetcher) { // (3) switch to snapshot split reading if there are newly added snapshot splits dataIt = currentFetcher.pollSplitRecords(); if (dataIt != null) { // try to switch to read snapshot split if there are new added snapshot SnapshotSplit nextSplit = snapshotSplits.poll(); if (nextSplit != null) { closeStreamFetcher(); LOG.info("It's turn to switch next fetch reader to snapshot split reader"); submitSnapshotSplit(nextSplit); } return ChangeEventRecords.forRecords(STREAM_SPLIT_ID, dataIt); } else { // null will be returned after receiving suspend stream event // finish current stream split reading closeStreamFetcher(); return finishedSplit(); } } else { throw new IllegalStateException("Unsupported reader type."); } } private void closeStreamFetcher() { if (reusedStreamFetcher != null) { LOG.debug("Close stream reader {}", reusedStreamFetcher.getClass().getCanonicalName()); reusedStreamFetcher.close(); if (currentFetcher == reusedStreamFetcher) { currentFetcher = null; } reusedStreamFetcher = null; } }
This pr is preventing the fetcher from reclaiming when it is idle
and the change code how to performance the logic ?
Hi, @Mrart , @hql0312 . this pr is mainly focused on most situations. As I discussed in https://github.com/apache/flink-cdc/pull/2571, In snapshot split phase, after each snapshot split is finished, the fetcher will be closed. And newly added snapshot will open a new fetcher, and need lookup schema again.
What you say is a special case when enabling newly added table. To be honest, it's not a good idea for that a reader fequently switch between streaming split and snapshot split(unless just 1 parrellism). The switch costs a lot. I have too ideas:
- the first idea: do not assign newly added snapshot split to the reader which handling binlog. I will recently push forward a FLIP to let enumerator knows currently split distributions.
- the second idea: The splitReader can reading snapshot split and stream split into a queue in the same time, is no need to just read one in a time. It will need to change current thread model.
But this PR still can improve a lot, we can let it in at first.
and the change code how to performance the logic ?
Hi, @Mrart , @hql0312 . this pr is mainly focused on most situations. As I discussed in #2571, In snapshot split phase, after each snapshot split is finished, the fetcher will be closed. And newly added snapshot will open a new fetcher, and need lookup schema again.
What you say is a special case when enabling newly added table. To be honest, it's not a good idea for that a reader fequently switch between streaming split and snapshot split(unless just 1 parrellism). The switch costs a lot. I have too ideas:
- the first idea: do not assign newly added snapshot split to the reader which handling binlog. I will recently push forward a FLIP to let enumerator knows currently split distributions.
- the second idea: The splitReader can reading snapshot split and stream split into a queue in the same time, is no need to just read one in a time. It will need to change current thread model.
But this PR still can improve a lot, we can let it in at first.
if can hold streamsplit context when snapshot come ,and not recycle . the performance can improve.
in stream phase, the pg connector will costs too much time for load schema. if we can hold schema in cache ,the cost will reduce.
the logic is right? @loserwang1024
and the change code how to performance the logic ?
Hi, @Mrart , @hql0312 . this pr is mainly focused on most situations. As I discussed in #2571, In snapshot split phase, after each snapshot split is finished, the fetcher will be closed. And newly added snapshot will open a new fetcher, and need lookup schema again.
What you say is a special case when enabling newly added table. To be honest, it's not a good idea for that a reader fequently switch between streaming split and snapshot split(unless just 1 parrellism). The switch costs a lot. I have too ideas:
- the first idea: do not assign newly added snapshot split to the reader which handling binlog. I will recently push forward a FLIP to let enumerator knows currently split distributions.
- the second idea: The splitReader can reading snapshot split and stream split into a queue in the same time, is no need to just read one in a time. It will need to change current thread model.
But this PR still can improve a lot, we can let it in at first.
if can hold streamsplit context when snapshot come ,and not recycle . the performance can improve.
in stream phase, the pg connector will costs too much time for load schema. if we can hold schema in cache ,the cost will reduce.
the logic is right? @loserwang1024 Looking forward to your flip, we can discuss it in dev email list. @hql0312
and the change code how to performance the logic ?
Hi, @Mrart , @hql0312 . this pr is mainly focused on most situations. As I discussed in #2571, In snapshot split phase, after each snapshot split is finished, the fetcher will be closed. And newly added snapshot will open a new fetcher, and need lookup schema again.
What you say is a special case when enabling newly added table. To be honest, it's not a good idea for that a reader fequently switch between streaming split and snapshot split(unless just 1 parrellism). The switch costs a lot. I have too ideas:
- the first idea: do not assign newly added snapshot split to the reader which handling binlog. I will recently push forward a FLIP to let enumerator knows currently split distributions.
- the second idea: The splitReader can reading snapshot split and stream split into a queue in the same time, is no need to just read one in a time. It will need to change current thread model.
But this PR still can improve a lot, we can let it in at first.
if can hold streamsplit context when snapshot come ,and not recycle . the performance can improve. in stream phase, the pg connector will costs too much time for load schema. if we can hold schema in cache ,the cost will reduce. the logic is right? @loserwang1024 Looking forward to your flip, we can discuss it in dev email list. @hql0312
@Mrart ok. in my branch, i add a static object to hold the schema , and can avoid load many times
i add a static object to hold the schema
@hql0312 To be honest, I doesn't recommanded that use static object in cdc connectot. It will influence between tasks(when in session mode), and also will influence multiple slots in same jvm. @leonardBang , CC, WDYT?
i add a static object to hold the schema
@hql0312 To be honest, I doesn't recommanded that use static object in cdc connectot. It will influence between tasks(when in session mode), and also will influence multiple slots in same jvm. @leonardBang , CC, WDYT?
@loserwang1024 because source operator, in stream phase, only one task will hold the stream split , and the static object Will be dynamically created , even the source operator is assigned to other task.
and the logic only exists in my branch . but it runs successfully.
This pull request has been automatically marked as stale because it has not had recent activity for 120 days. It will be closed in 60 days if no further activity occurs.
This pull request has been closed because it has not had recent activity. You could reopen it if you try to continue your work, and anyone who are interested in it are encouraged to continue work on this pull request.
@Mrart @loserwang1024 Do we still need this PR ?
