flink-cdc icon indicating copy to clipboard operation
flink-cdc copied to clipboard

Ensure checkpointing is enabled for Postgres connector

Open gunnarmorling opened this issue 2 years ago • 3 comments

Is your feature request related to a problem? Please describe.

Yes, it is: when not enabling checkpointing with the Debezium-based Postgres connector, then the replication slot in the database will never be advanced (as this is triggered by committing offsets in Debezium, which in turn is triggered by checkpointing in Flink CDC), causing WAL to be retained indefinitely.

Describe the solution you'd like Raise a warning or even an exception when using this connector without checkpointing.

Describe alternatives you've considered n/a

gunnarmorling avatar Jan 10 '23 12:01 gunnarmorling

Hey @leonardBang, happy to send a PR for this if you agree this makes sense. After discussing with @rmetzger, we think it should be doable rather easily by checking StreamingRuntimeContext::isCheckpointingEnabled() in DebeziumSourceFunction::open(). WDYT?

gunnarmorling avatar Jan 10 '23 12:01 gunnarmorling

Thanks @gunnarmorling for reporting this ticket, it makes sense to me, this could improve all Debezium-based connectors, maybe we can make this issue to an umbrella issue, the sub-tasks would contains :

  • Ensure checkpointing is enabled for Debezium-based(DebeziumSourceFunction) connector
  • Ensure checkpointing is enabled for MySQL/JdbcIncrementalSource connector
  • Ensure checkpointing is enabled for OceanBase/TiDB(xxSourceFunction) connector

leonardBang avatar Jan 11 '23 02:01 leonardBang

hi, @gunnarmorling, in JdbcIncrementalSource connector, only when checkpoint is enabled, the enumerator will assign stream split.

    @Override
    public void notifyCheckpointComplete(long checkpointId) {
        // we have waited for at-least one complete checkpoint after all snapshot-splits are
        // finished, then we can mark snapshot assigner as finished.
        if (checkpointIdToFinish != null && !assignerFinished && allSplitsFinished()) {
            assignerFinished = checkpointId >= checkpointIdToFinish;
            LOG.info("Snapshot split assigner is turn into finished status.");
        }
    }

The PG CDC connector no longer recommends the non-incremental version of CDC; therefore, I believe this issue has been resolved.

loserwang1024 avatar Feb 05 '24 11:02 loserwang1024

Closing this issue as it has been migrated to Apache Jira.

PatrickRen avatar Apr 09 '24 06:04 PatrickRen