flink-cdc icon indicating copy to clipboard operation
flink-cdc copied to clipboard

以后CDC里CREATE、DROP、ALTER TABLE这种想要如何实现?

Open a49a opened this issue 4 years ago • 20 comments
trafficstars

是要扩展RowKind吗,现在RowKind只有4中类型,还不包括对元数据表结构的修改。

a49a avatar Apr 28 '21 08:04 a49a

你希望接收的数据流里面也包含 DDL 变更事件么?

具体想拿 DDL 变更事件来干吗呢?

wuchong avatar Apr 29 '21 07:04 wuchong

解析DDL,做DDL同步

kanata163 avatar Apr 30 '21 03:04 kanata163

@deadwind4 @kanata163 是想同步更新目标表的 schema 么? 你们的目标表是什么呢?

wuchong avatar Apr 30 '21 04:04 wuchong

是的,目前先考虑常用的RDB数据库,如MySQL、Oracle、SqlServer、PG等

kanata163 avatar Apr 30 '21 05:04 kanata163

@wuchong DDL同步这种功能太复杂了,需要解析完DDL之后还要下游支持DDL同步,这需要各个数据源都支持DDL同步,这不是Flink CDC Connector应该干的活儿。 其实更新目标表的Schema可以通过人工手动维护,最简单的方式就是采集到DDL可以有一个Listener的接口,可以做一下预警功能 譬如将DDL通过钉钉告警的方式打印出来,方便人工维护

Tan-JiaLiang avatar May 13 '21 01:05 Tan-JiaLiang

@Tan-JiaLiang , 人工维护也会有问题,容易导致数据不一致。比如加列之后,马上来了一堆新数据,那么这些新数据到达目标表的时候就丢失了新列的数据。即使后续人工加上这个列,也已经丢失了那部分的数据。

wuchong avatar May 13 '21 02:05 wuchong

@wuchong ,是的,我说的那种告警方案确实会出现这样的问题,只能看用户是否能够容忍出现这样的错误,也可以当出现DDL这样的语句让用户的任务配置决定是否要将任务停下来,然后只发告警,直到人工处理。但是这种方案就很依赖运维了,如果处理不及时还容易导致数据库的CDC日志过期

主要Flink CDC需要支持DDL同步我想到的会有以下几个难点

  1. Flink CDC Connector是一个独立于Apache Flink项目的模块,Flink可能不会为了这个CDC项目把大部分Connector提供DDL的同步接口
  2. 如果使用Flink SQL的CDC做同步,因为我SQL已经写好Schema了,DDL同步就没发做了,因为底层也不能修改用户的写好的Schema啊
  3. 上游Source的DDL类型,要与下游Sink的类型做匹配,这个点也是最复杂的,因为不同Connector的类型可能都不一样,相同的Connector不同版本维护起来也非常复杂

对于这几个点,大佬有什么好的建议嘛

Tan-JiaLiang avatar May 14 '21 02:05 Tan-JiaLiang

@Tan-JiaLiang ,是的,这几点都是做 DDL 同步的难点。我们目前也正在研究方案。

wuchong avatar May 14 '21 02:05 wuchong

我提这个issue的目的是想了解PMC Jark Wu的想法和方案。因为我们内部也想做,就怕闭门造车后,与Flink 社区偏离太远,以后代码不好合并。

a49a avatar May 26 '21 03:05 a49a

@deadwind4 如果要输出 DDL 的话,不会放在 RowData 里面,会用新的数据结构。

wuchong avatar May 26 '21 04:05 wuchong

Flink CDC Connector确实不应该把这事全都干了,但是把DDL变更以某种形式暴露出来确实是很有意义的。

另外,其他一些信息也可以考虑进去,比如Heartbeat:现在如果把CDC的维表用作Versioned Table直接用来join,会因为维表数据变更慢,导致维表这边的Watermark涨不上去,其实用Heartbeat消息来推动Watermark上涨,才是最合理的办法。

可以用统一的数据结构来输出这些信息。

dongdongking008 avatar Jun 08 '21 03:06 dongdongking008

另外,其他一些信息也可以考虑进去,比如Heartbeat:现在如果把CDC的维表用作Versioned Table直接用来join,会因为维表数据变更慢,导致维表这边的Watermark涨不上去,其实用Heartbeat消息来推动Watermark上涨,才是最合理的办法。

想法不错,Heartbeat消息确实是个很有效的输入

leonardBang avatar Jun 08 '21 03:06 leonardBang

watermark 可以 pushdown 到 cdc source 里面,这样 heartbeat 数据不用让 flink 框架感知。

wuchong avatar Jun 08 '21 03:06 wuchong

watermark 可以 pushdown 到 cdc source 里面,这样 heartbeat 数据不用让 flink 框架感知。

其实在我看来,changelog中有必要包含heartbeat信息,因为这样才能知道真正的watermark位置,下游系统其实需要这个信息,比如把changelog输出到Kafka、Iceberg之后,他们可以去记录真实的watermark

dongdongking008 avatar Jun 10 '21 03:06 dongdongking008

watermark 可以 pushdown 到 cdc source 里面,这样 heartbeat 数据不用让 flink 框架感知。

其实在我看来,changelog中有必要包含heartbeat信息,因为这样才能知道真正的watermark位置,下游系统其实需要这个信息,比如把changelog输出到Kafka、Iceberg之后,他们可以去记录真实的watermark

+1 changelog中 目前只有数据信息,但我理解 @wuchong 的意思是 heartbeat 信息可以通过pushdown到cdc source 里作为watermark,这样heartbeat信息就可以走watermark流,一路下传给kafka,iceberg,hudi,确实这些下游系统时需要这部分信息的。

leonardBang avatar Jun 10 '21 11:06 leonardBang

+1 请问这个有后续吗?

dik111 avatar Dec 01 '21 06:12 dik111

+1 目前有后续解决方案了么

icchux avatar Apr 12 '22 08:04 icchux

我记得目前的CDC Java API已经能识别 DDL变更了,SQL层面Jark Wu老师在Flink forward ASIA 有演讲《Flink CDC 如何简化实时数据入湖入仓》未来会开源。

a49a avatar Apr 13 '22 07:04 a49a

@wuchong 请问一下目前oracle能采集ddl的数据么,我看mysql是支持的。

guandatawangjialin avatar Jun 20 '22 08:06 guandatawangjialin

请问计划何时可以支持?或者需要自己开发吗?

jkl0898 avatar Oct 27 '23 09:10 jkl0898

@jkl0898 社区近期会开启 Flink CDC 3.0 的开发,包含了这部分能力的设计,可以关注一下。

wuchong avatar Oct 27 '23 09:10 wuchong

Closing this issue because it was created before version 2.3.0 (2022-11-10). Please try the latest version of Flink CDC to see if the issue has been resolved. If the issue is still valid, kindly report it on Apache Jira under project Flink with component tag Flink CDC. Thank you!

PatrickRen avatar Feb 28 '24 15:02 PatrickRen