flink-cdc icon indicating copy to clipboard operation
flink-cdc copied to clipboard

[oracle]Use Incremental Snapshot Framework for Oracle CDC Connector

Open molsionmo opened this issue 2 years ago • 7 comments

[oracle]Use Incremental Snapshot Framework for Oracle CDC Connector #1079 @leonardBang

  1. flink-connector-oracle-cdc incremental Snapshot implemention base on flink-cdc-base
  2. It lacks tests, and then replenishes the corresponding test classes and fixes the problems found by the tests
  3. Below is the file directory and the implementation file description
  • source

    • assigner
      • splitter
        • OracleChunkSplitter.java(used to split table into a set of chunks for oracle data source)
    • config
      • OracleSourceConfig.java
      • OracleSourceConfigFactory.java
      • OracleSourceOptions.java
    • meta
      • offset
        • RedoLogOffset.java(A structure describes an offset in a redo log event in oracle)
        • RedoLogOffsetFactory.java
        • RedoLogOffsetSerializer.java
      • split
        • FinishedSnapshotSplitInfo.java(The information used to describe a finished snapshot split)
        • OracleRedoLogSplit.java(The split to describe the redo log of Oracle table(s))
        • OracleSnapshotSplit.java(The split to describe a split of an Oracle table snapshot)
        • OracleSplit.java(The split of table comes from a Table that splits by primary key.)
    • reader
      • fetch
        • OracleScanFetchTask.java(The task to work for fetching data of Oracle table snapshot split.)
        • OracleSourceFetchTaskContext.java
        • OracleStreamFetchTask.java(The task to work for fetching data of Oracle table stream split)
    • utils
      • OracleConnectionUtils.java
      • OracleSchema.java
      • OracleTypeUtils.java
      • OracleUtils.java
    • EmbeddedFlinkDatabaseHistory.java(A {@link DatabaseHistory} implementation which store the latest table schema in Flink state.)
    • OracleDialect.java(The {@link JdbcDataSourceDialect} implementation for Oracle datasource)
    • OraclePooledDataSourceFactory
    • OracleSourceBuilder
  • debezium - logminer - LogMinerStreamingChangeEventSource(copy from io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource,and add afterHandleScn method to handle scn event, just like io.debezium.connector.mysql.MySqlStreamingChangeEventSource#handleEvent) - TransactionalBuffer - OracleErrorHandler

molsionmo avatar Apr 17 '22 14:04 molsionmo

These is a question, in OracleStreamFetchTask.java , LogMinerStreamingChangeEventSource does not have handle(event) method so that we can not stop for fetch binlog for snapshot split. I can not kown how to resolve this question.

I have a solution that we can reimplements StreamingChangeEventSource that copy from LogMinerStreamingChangeEventSource and add event handle method, but i do not know if it works @leonardBang can you give me some advice? Thanks

molsionmo avatar Apr 18 '22 01:04 molsionmo

I run OracleE2eITCase in my compute successfully, i guess it is time out after 150 seconds

proxy.checkResultWithTimeout( expectResult, "products_sink", new String[] {"id", "name", "description", "weight"}, 150000L);

molsionmo avatar Apr 18 '22 05:04 molsionmo

这是一个问题,在 OracleStreamFetchTask.java 中,LogMinerStreamingChangeEventSource 没有 handle(event) 方法,因此我们无法停止获取 binlog 以进行快照拆分。我不知道如何解决这个问题。

我有一个解决方案,我们可以重新实现从 LogMinerStreamingChangeEventSource 复制的 StreamingChangeEventSource 并添加事件句柄方法,但我不知道它是否有效@leonardBang你能给我一些建议吗?谢谢

hi , @molsionmo 我之前调研的时候做了一个简单的验证,你可以参考一下: 1、重写 LogMinerStreamingChangeEventSource#execute 方法,把原本在 execute 方法中创建 TransactionalBuffer 的操作,改成可以右外部输入 setTransactionalBuffer ()。

2、继承 transactionalBuffer 类,实现 commit() 方法,在此方法中获取到消费的位置。

这个是我之前的验证过程,它能获取到消费的位置,并根据消费位置停止任务,但是实现方式不算是很优雅。

fuyun2024 avatar Apr 19 '22 06:04 fuyun2024

Hi @fuyun2024 Thanks for your comment. You inspired me. I did it with a simpler way.

OracleRedoLogSplitReadTask implement afterHandleScn method and check do we need to stop for fetch redo log for snapshot split. OracleLogMinerStreamingChangeEventSource copy from LogMinerStreamingChangeEventSource and add afterHandleScn() that can handle it after handle scn

molsionmo avatar Apr 19 '22 11:04 molsionmo

Hi @molsionmo I'm glad you have a simpler way, but I think it has some optimization. The afterHandleScn () method is called after processResult (), so scn in offsetContext will cross many commit.

fuyun2024 avatar Apr 20 '22 15:04 fuyun2024

I open a PR to avoid duplicate records when restoration #1100, maybe my fix way can inspire you a a little @molsionmo @fuyun2024

leonardBang avatar Apr 21 '22 02:04 leonardBang

@leonardBang @GOODBOY008 @fuyun2024 I'm excited to tell you that I have passed the test. I'll merge the code, including the test code, on Sunday. And then I'll add end-to-end testing later

molsionmo avatar Apr 22 '22 15:04 molsionmo

@molsionmo Thanks for the great work, I just merged several PRs that improve the cdc-base, could you help rebase the PR? and I'll start review this PR now.

leonardBang avatar Oct 19 '22 14:10 leonardBang

add afterHandleScn() that can handle it after handle scn.

@molsionmo , it seems some problem will occur(See https://github.com/ververica/flink-cdc-connectors/issues/2909).

LogMinerStreamingChangeEventSource does not have handle(event) method so that we can not stop for fetch binlog for snapshot split. I can not kown how to resolve this question.

Almost all subclass of AbstractLogMinerEventProcessor provide a processRow method, I try in https://github.com/ververica/flink-cdc-connectors/pull/2911, it works.

loserwang1024 avatar Dec 22 '23 09:12 loserwang1024