chunjun
chunjun copied to clipboard
[Feature][ClickHouse] Clickhouse support update sql
Search before asking
- [X] I had searched in the issues and found no similar feature requirement.
Description
当尝试在mysql数据表中更新修改某条或者删除某条数据记录的时候, 出现上述错误。
跟踪源码发现, 当实现增量实时数据同步作业时候,根据rowKind类型的值执行相关操作, 而clickhouse的 DELETE 与UPDATE_BEFORE 具体实现代码为空。 具体如下:



在此基础上,针对delete操作,添加了相关具体实现,但是对Flink相关语义以及数据原子性先关方面理解缺乏,还需要好好补充,希望大佬解惑, 指个方向, 具体如何更新操作。
Use case
No response
Related issues
No response
Are you willing to submit a PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
@FlechazoW chunjun基于flink来执行作业, 比如mysql binlog source ==>> sink clickhouse中。 flink这边是通过什么方式来获取binlog数据的? 我尝试根据flink的rowkind类型扩展了基础的delete的实现,但是不清楚方向是否是正确的。我尝试扩展下update与delete操作的数据同步,接下来该如何操作?
@FlechazoW chunjun基于flink来执行作业, 比如mysql binlog source ==>> sink clickhouse中。 flink这边是通过什么方式来获取binlog数据的? 我尝试根据flink的rowkind类型扩展了基础的delete的实现,但是不清楚方向是否是正确的。我尝试扩展下update与delete操作的数据同步,接下来该如何操作?
纯钧binlog插件是通过canal采集cdc数据的,但是对于下游来说,不需要关心上游获取数据的实现。例如,clickhouse 要实现delete / update 语法,只需要实现对应的方言ClickhouseDialect 即可。
@FlechazoW 纯钧binlog插件是通过canal采集cdc数据的,但是对于下游来说,不需要关心上游获取数据的实现。例如,clickhouse 要实现delete / update 语法,只需要实现对应的方言ClickhouseDialect 即可。
我大致通读了一遍基于canal获取binlog数据sink到clickhouse的具体实现。还是很多不太理解的地方。 然后我抛弃了这些想法。我直接考虑了实现对应的方言ClickhouseDialect。
考虑到Flink中Rowkind 语义, 如下所示:

然后在此基础上考虑了两类实现方案, 第一种方案是,针对不同的语义完成具体的语义实现,但是涉及到数据更新update操作时候,在语义上,存在更新前UPDATE_BEFORE与更新后UPDATE_AFTER的操作,个人实现上, 需要确保该更新表中存在唯一的主键标识,基于该标识确保数据更新变化成功。但是很遗憾,目前我没有办法确定fieldNames列表中哪个字段是唯一的主键,来确保幂等性的数据更新。 当然在非幂等性的情况下可以使用。
第二种方案, 较为简单粗暴。已实现对应的方言ClickhouseDialect,测试可以完成源数据Update/Delete更新操作后,目标端同步更新。 具体过程为在更新数据前先删除delete现有的数据记录(update_before),再添加insert更新后(update_after)的数据。 重点是update操作,因为已经记录了数据更新前后的数据变化值。 每次更新update操作后,直接删除delete目标端数据,然后在添加insert一条更新后update_after的数据记录。即delete与update_before绑定执行, insert与update_after绑定执行。
我不清楚第二种方案是否可取? 或者还有什么更好的方案实现? 求指导指导~~
目前在其他数据源的实现是方案二,你可以看下PreparedStmtProxy这个类,通过split参数,将update拆分成update_before 和update_after,对于update_before则采用delete,update_after则采用insert,与你的第二种方案类似。
@FlechazoW 好的,目前该问题没疑问了。