flink-cdc
flink-cdc copied to clipboard
Add a table of cdc json type
Add a table of cdc json type. Read mysql/oracle/postgres/sqlserver cdc data and output it as a json string. The effect is the same as that of the datastream api, here it is implemented in the form of a table.
mysql cdc: create table test_json ( cdc_json string )WITH ( 'hostname' = 'xxx', 'connector' = 'cdc-json', 'port'='53309', 'username'='root', 'password'='xxx', 'database-list'='test', 'table-list'='test.cc_test1', 'source-type'='mysql', 'extra-prop.scan.startup.mode' = 'latest-offset' ) ;
select cdc_json from test_json;
the result:
3> +I[{"before":null,"after":{"id":12,"name":"c","test_decimal":"11"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"test","sequence":null,"table":"cc_test1","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1680763113699,"transaction":null}] 3> +I[{"before":null,"after":{"id":1666,"name":"c","test_decimal":"11"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"test","sequence":null,"table":"cc_test1","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1680763113699,"transaction":null}] 3> +I[{"before":null,"after":{"id":1,"name":"c","test_decimal":"11"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"test","sequence":null,"table":"cc_test1","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1680763113699,"transaction":null}]
Thanks for your contribution! Flink CDC 3.0 release brought new values connector which allows displaying raw event records in console. I'm going to close this PR but feel free to reopen it if you have any other concerns.