flink-cdc
flink-cdc copied to clipboard
[oracle]Use Incremental Snapshot Framework for Oracle CDC Connector
[oracle]Use Incremental Snapshot Framework for Oracle CDC Connector #1079 @leonardBang
- flink-connector-oracle-cdc incremental Snapshot implemention base on flink-cdc-base
- It lacks tests, and then replenishes the corresponding test classes and fixes the problems found by the tests
- Below is the file directory and the implementation file description
-
source
- assigner
- splitter
- OracleChunkSplitter.java(used to split table into a set of chunks for oracle data source)
- splitter
- config
- OracleSourceConfig.java
- OracleSourceConfigFactory.java
- OracleSourceOptions.java
- meta
- offset
- RedoLogOffset.java(A structure describes an offset in a redo log event in oracle)
- RedoLogOffsetFactory.java
- RedoLogOffsetSerializer.java
- split
- FinishedSnapshotSplitInfo.java(The information used to describe a finished snapshot split)
- OracleRedoLogSplit.java(The split to describe the redo log of Oracle table(s))
- OracleSnapshotSplit.java(The split to describe a split of an Oracle table snapshot)
- OracleSplit.java(The split of table comes from a Table that splits by primary key.)
- offset
- reader
- fetch
- OracleScanFetchTask.java(The task to work for fetching data of Oracle table snapshot split.)
- OracleSourceFetchTaskContext.java
- OracleStreamFetchTask.java(The task to work for fetching data of Oracle table stream split)
- fetch
- utils
- OracleConnectionUtils.java
- OracleSchema.java
- OracleTypeUtils.java
- OracleUtils.java
- EmbeddedFlinkDatabaseHistory.java(A {@link DatabaseHistory} implementation which store the latest table schema in Flink state.)
- OracleDialect.java(The {@link JdbcDataSourceDialect} implementation for Oracle datasource)
- OraclePooledDataSourceFactory
- OracleSourceBuilder
- assigner
-
debezium - logminer - LogMinerStreamingChangeEventSource(copy from io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource,and add afterHandleScn method to handle scn event, just like io.debezium.connector.mysql.MySqlStreamingChangeEventSource#handleEvent) - TransactionalBuffer - OracleErrorHandler
These is a question, in OracleStreamFetchTask.java , LogMinerStreamingChangeEventSource does not have handle(event) method so that we can not stop for fetch binlog for snapshot split. I can not kown how to resolve this question.
I have a solution that we can reimplements StreamingChangeEventSource that copy from LogMinerStreamingChangeEventSource and add event handle method, but i do not know if it works @leonardBang can you give me some advice? Thanks
I run OracleE2eITCase in my compute successfully, i guess it is time out after 150 seconds
proxy.checkResultWithTimeout( expectResult, "products_sink", new String[] {"id", "name", "description", "weight"}, 150000L);
这是一个问题,在 OracleStreamFetchTask.java 中,LogMinerStreamingChangeEventSource 没有 handle(event) 方法,因此我们无法停止获取 binlog 以进行快照拆分。我不知道如何解决这个问题。
我有一个解决方案,我们可以重新实现从 LogMinerStreamingChangeEventSource 复制的 StreamingChangeEventSource 并添加事件句柄方法,但我不知道它是否有效@leonardBang你能给我一些建议吗?谢谢
hi , @molsionmo 我之前调研的时候做了一个简单的验证,你可以参考一下: 1、重写 LogMinerStreamingChangeEventSource#execute 方法,把原本在 execute 方法中创建 TransactionalBuffer 的操作,改成可以右外部输入 setTransactionalBuffer ()。
2、继承 transactionalBuffer 类,实现 commit() 方法,在此方法中获取到消费的位置。
这个是我之前的验证过程,它能获取到消费的位置,并根据消费位置停止任务,但是实现方式不算是很优雅。
Hi @fuyun2024 Thanks for your comment. You inspired me. I did it with a simpler way.
OracleRedoLogSplitReadTask implement afterHandleScn method and check do we need to stop for fetch redo log for snapshot split. OracleLogMinerStreamingChangeEventSource copy from LogMinerStreamingChangeEventSource and add afterHandleScn() that can handle it after handle scn
Hi @molsionmo I'm glad you have a simpler way, but I think it has some optimization. The afterHandleScn () method is called after processResult (), so scn in offsetContext will cross many commit.
I open a PR to avoid duplicate records when restoration #1100, maybe my fix way can inspire you a a little @molsionmo @fuyun2024
@leonardBang @GOODBOY008 @fuyun2024 I'm excited to tell you that I have passed the test. I'll merge the code, including the test code, on Sunday. And then I'll add end-to-end testing later
@molsionmo Thanks for the great work, I just merged several PRs that improve the cdc-base, could you help rebase the PR? and I'll start review this PR now.
add afterHandleScn() that can handle it after handle scn.
@molsionmo , it seems some problem will occur(See https://github.com/ververica/flink-cdc-connectors/issues/2909).
LogMinerStreamingChangeEventSource does not have handle(event) method so that we can not stop for fetch binlog for snapshot split. I can not kown how to resolve this question.
Almost all subclass of AbstractLogMinerEventProcessor
provide a processRow
method, I try in https://github.com/ververica/flink-cdc-connectors/pull/2911, it works.