mysql binlog数据同步,数据变更类型字段提取
当使用binlog方式同步数据时,数据库变更记录类型“rowKind”如何作为普通字段提取出来?
下面是我通过json配置文件方式的例子:
{
"job" : {
"content" : [ {
"reader" : {
"parameter" : {
"username" : "root",
"password" : "123456",
"cat" : "insert,delete,update",
"jdbcUrl" : "jdbc:mysql://localhost:3306/test_cdc?useSSL=false",
"host" : "localhost",
"port" : 3306,
"start" : {
},
"table" : [ "test_cdc.cdc_flink" ],
"splitUpdate" : true,
"pavingData" : true,
"column": [
{
"name": "id",
"type": "INT"
},
{
"name": "name",
"type": "VARCHAR"
},
{
"name": "sex",
"type": "VARCHAR"
}
]
},
"table": {
"tableName": "cdc_flink"
},
"name" : "binlogreader"
},
"writer" : {
"parameter" : {
"print" : true,
"column": [
{
"name": "id",
"type": "INT"
},
{
"name": "name",
"type": "VARCHAR"
},
{
"name": "sex",
"type": "VARCHAR"
}
]
},
"table": {
"tableName": "sinkTable"
},
"name" : "streamwriter"
},
"transformer": {
"transformSql": "select rowKind as updateType,id,name,sex from cdc_flink"
}
} ],
"setting" : {
"speed" : {
"bytes" : 0,
"channel" : 1
}
}
}
}
提交flink作业:
sh bin/start-chunjun.sh -mode standalone
-jobType sync
-job /opt/apps/chunjun/chunjun-examples/json/binlog/binlog_mysql.json
-chunjunDistDir chunjun-dist
-flinkConfDir /opt/apps/flink-1.12.7/conf
报错信息如下: 2022-06-06 10:16:49.708 [main] WARN com.dtstack.chunjun.options.Options - Option 'flinkxDistDir' is deprecated, please replace with 'chunjunDistDir'. 2022-06-06 10:16:49.708 [main] WARN com.dtstack.chunjun.options.Options - Option 'flinkxDistDir' is deprecated, please replace with 'chunjunDistDir'. 2022-06-06 10:16:49.710 [main] WARN com.dtstack.chunjun.options.Options - Option 'flinkxDistDir' is deprecated, please replace with 'chunjunDistDir'. 2022-06-06 10:16:49.728 [main] INFO com.dtstack.chunjun.util.PluginUtil - Flinkx executionMode: standalone 2022-06-06 10:16:49.729 [main] INFO com.dtstack.chunjun.util.PluginUtil - Flinkx reset pipeline.jars: [/opt/apps/chunjun/chunjun-dist/chunjun-core-master.jar, /opt/apps/chunjun/chunjun-dist/dirty-data-collector/log/chunjun-dirty-log-master.jar, /opt/apps/chunjun/chunjun-dist/connector/binlog/chunjun-connector-binlog-master.jar, /opt/apps/chunjun/chunjun-dist/connector/stream/chunjun-connector-stream-master.jar, /opt/apps/chunjun/chunjun-dist/metrics/prometheus/chunjun-metrics-prometheus-master.jar] 2022-06-06 10:16:49.732 [main] INFO com.dtstack.chunjun.classloader.ClassLoaderManager - jarUrl:file:/opt/apps/chunjun/chunjun-dist/chunjun-core-master.jar_file:/opt/apps/chunjun/chunjun-dist/connector/binlog/chunjun-connector-binlog-master.jar_file:/opt/apps/chunjun/chunjun-dist/connector/stream/chunjun-connector-stream-master.jar_file:/opt/apps/chunjun/chunjun-dist/dirty-data-collector/log/chunjun-dirty-log-master.jar_file:/opt/apps/chunjun/chunjun-dist/metrics/prometheus/chunjun-metrics-prometheus-master.jar create ClassLoad successful... Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL validation failed. From line 1, column 8 to line 1, column 16: Column 'rowKind' not found in any table at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219) at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:158) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:82) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:117) at com.dtstack.chunjun.client.util.JobGraphUtil.buildJobGraph(JobGraphUtil.java:62) at com.dtstack.chunjun.client.standalone.StandaloneClusterClientHelper.submit(StandaloneClusterClientHelper.java:61) at com.dtstack.chunjun.client.Launcher.main(Launcher.java:119) Caused by: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 8 to line 1, column 16: Column 'rowKind' not found in any table at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:640) at com.dtstack.chunjun.Main.syncStreamToTable(Main.java:256) at com.dtstack.chunjun.Main.exeSyncJob(Main.java:212) at com.dtstack.chunjun.Main.main(Main.java:122) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349) ... 7 more Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 8 to line 1, column 16: Column 'rowKind' not found in any table at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5043) at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:259) at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:6015) at org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6178) at org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6164) at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:320) at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:134) at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:101) at org.apache.calcite.sql.SqlAsOperator.acceptCall(SqlAsOperator.java:121) at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visitScoped(SqlValidatorImpl.java:6033) at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:50) at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:33) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectExpr(SqlValidatorImpl.java:5600) at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:411) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4205) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3474) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1067) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1041) at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1016) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147) ... 19 more Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'rowKind' not found in any table at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560) ... 46 more
另外,通过sql的方式,如何提取变更类型("rowKind)字段?