chunjun
chunjun copied to clipboard
[Bug][chunjun-connection-oraclelogminer] oraclelogminer to mysql have error java.lang.Long cannot be cast to java.lang.Integer
Search before asking
- [X] I had searched in the issues and found no similar issues.
What happened
oracle to mysql real-time sync error: java.lang.Long cannot be cast to java.lang.Integer
What you expected to happen
sync success
How to reproduce
oracle ddl :
CREATE TABLE "TEST"."SOURCE1" (
"ID" NUMBER NOT NULL,
"NAME" VARCHAR2(30 BYTE),
"AGG" NUMBER
)
TABLESPACE "USERS"
LOGGING
NOCOMPRESS
PCTFREE 10
INITRANS 1
STORAGE (
INITIAL 65536
NEXT 1048576
MINEXTENTS 1
MAXEXTENTS 2147483645
BUFFER_POOL DEFAULT
)
PARALLEL 1
NOCACHE
DISABLE ROW MOVEMENT
;
mysql ddlï¼
CREATE TABLE taier_update_incre2_copy (
id int(11) NOT NULL,
name varchar(255) DEFAULT NULL,
age int(11) DEFAULT NULL,
record_time datetime DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=DYNAMIC;
job jsonï¼
{
"job":{
"content":[
{
"reader":{
"parameter":{
"password":"System_$1234#_Cmict",
"splitUpdate":true,
"cat":"update,insert,delete",
"jdbcUrl":"jdbc:oracle:thin:@//10.136.106.163:1521/helowin",
"column":[
{
"name":"NAME",
"type":"VARCHAR2"
},
{
"name":"ID",
"type":"NUMBER"
},
{
"name":"AGG",
"type":"NUMBER"
}
],
"readPosition":"time",
"startTime": 1682038755563,
"pavingData": false,
"table":[
"TEST.SOURCE1"
],
"username":"test"
},
"name":"oraclelogminerreader",
"table":{
"tableName":"sourceTable"
}
},
"transformer":{
"transformSql":"SELECT NAME,ID,AGG FROM sourceTable"
},
"writer":{
"parameter":{
"password":"bigdata123",
"column":[
{
"index":0,
"name":"name",
"type":"VARCHAR"
},
{
"index":1,
"name":"id",
"type":"INT"
},
{
"index":2,
"name":"age",
"type":"INT"
}
],
"connection":[
{
"schema":"test",
"jdbcUrl":"jdbc:mysql://10.136.106.197:3306/test",
"table":[
"taier_update_incre2_copy"
]
}
],
"writeMode":"insert",
"username":"root"
},
"name":"mysqlwriter",
"flushIntervalMills":1,
"batchSize":1,
"table":{
"tableName":"sinkTable"
}
}
}
],
"setting":{
"restore":{
"isStream":true,
"isRestore":true
},
"log":{
"path":"",
"level":"debug",
"pattern":"",
"isLogger":false
},
"errorLimit":{
},
"speed":{
"readerChannel":1,
"writerChannel":1,
"bytes":0
}
}
} }
Anything else
No response
Version
1.12_release
Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
logminer将number类型的数据当作java.lang.Long去处理。 此处使用了transformSQL,使用的是genericRowData传输数据(如果断开算子链,在序列化后应该会变为binaryRowData),这两种RowData均不能实现Long->Int的转换。 解决方案: 1.此处如非必要可以去掉transformSQL,这样会使用Chunjun的ColumnRowData去兼容Long->Int的转换。 2.使用SQL脚本的方式去跑作业,flinkSQL会隐式处理Long->Int的cast
我用之前的一个版本,当实例化binaryRowData时是可以的,新的版本示例化为了genericRowData对象,请问这里是怎么设置的,我想设置成转化为binaryRowData试一试
可以尝试关闭算子链 env.disableOperatorChaining() 或者 设置环境变量 pipeline.operator-chaining=false
å ³éç®åé¾åï¼æµè¯mysql binlogå°oracle,åçç±»å转æ¢ç²¾åº¦ä¸¢å¤±çé®é¢ï¼ mysql ddlï¼ CREATE TABLE taier_update_incre2_copy ( id int(11) NOT NULL, name varchar(255) DEFAULT NULL, age int(11) DEFAULT NULL, record_time datetime DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=DYNAMIC;
oracle ddl : CREATE TABLE "TEST"."SOURCE1" ( "ID" NUMBER NOT NULL, "NAME" VARCHAR2(30 BYTE), "AGG" NUMBER ) TABLESPACE "USERS" LOGGING NOCOMPRESS PCTFREE 10 INITRANS 1 STORAGE ( INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645 BUFFER_POOL DEFAULT ) PARALLEL 1 NOCACHE DISABLE ROW MOVEMENT ;
job json:
{
"job": {
"content": [
{
"transformer": {
"transformSql": "SELECT name,id,age FROM sourceTable"
},
"reader": {
"parameter": {
"password": "Pwd@1234_Cmict",
"port": 33062,
"cat": "insert",
"column": [
{
"name": "name",
"type": "VARCHAR",
"key": "name"
},
{
"name": "id",
"type": "INT",
"key": "id"
},
{
"name": "age",
"type": "INT",
"key": "age"
}
],
"host": "10.136.106.163",
"jdbcUrl": "jdbc:mysql://10.136.106.163:33062/test",
"start": {
"timestamp": 1682321090992
},
"table": [
"source"
],
"username": "root"
},
"name": "binlogreader",
"table": {
"tableName": "sourceTable"
}
},
"writer": {
"parameter": {
"postSql": [],
"password": "System_$1234#_Cmict",
"column": [
{
"name": "NAME",
"isPart": false,
"type": "VARCHAR2",
"key": "NAME"
},
{
"name": "ID",
"isPart": false,
"type": "NUMBER",
"key": "ID"
},
{
"name": "AGG",
"isPart": false,
"type": "NUMBER",
"key": "AGG"
}
],
"connection": [
{
"schema": "TEST",
"jdbcUrl": "jdbc:oracle:thin:@10.136.106.163:1521/helowin",
"table": [
"SOURCE1"
]
}
],
"writeMode": "insert",
"flushIntervalMills": 1000,
"batchSize": 1,
"sourceIds": [
57
],
"preSql": [],
"username": "test"
},
"name": "oraclewriter",
"table": {
"tableName": "sinkTable"
}
}
}
],
"setting": {
"speed": {
"channel": 1,
"readerChannel": 1,
"writerChannel": 1,
"bytes": -1048576
},
"restore": {
"isRestore": false,
"isStream": false
}
}
}
}
对äºoracle numberç±»åçé®é¢ï¼å卿°æ®åæ¥ä¸æ£ç¡®çé®é¢ï¼åæ¥soure表ä¸sinkè¡¨ç»æå¦ä¸ï¼
source:
sink:

该é®é¢æ¯OracleRawTypeConverter对äºNUMBERç±»åçå¼é»è®¤è½¬æ¢ä¸ºDataTypes.DECIMAL(38, 18)æè³ï¼å¦æé»è®¤DataTypes.BIGINT()åæ²¡æé®é¢ã
用transformersql的情况下,数据类型不对应需要手动cast 另外,在上述场景下transformSQL好像没有存在的必要
不使用transform的条件下,columnRowData序列化出错,报错com.dtstack.chunjun.throwable.CastException: String[source] can not cast to BigDecimal.
用transformersql的情况下,数据类型不对应需要手动cast 另外,在上述场景下transformSQL好像没有存在的必要
"transformer": {
"transformSql": "SELECT name,CAST(id AS BIGINT) as id,CAST(age AS BIGINT) as age FROM sourceTable"
}
transformSql是这样写吗?
不使用transform的条件下,columnRowData序列化出错,报错com.dtstack.chunjun.throwable.CastException: String[source] can not cast to BigDecimal.
可以附上一下你的json脚本吗
用transformersql的情况下,数据类型不对应需要手动cast 另外,在上述场景下transformSQL好像没有存在的必要
"transformer": { "transformSql": "SELECT name,CAST(id AS BIGINT) as id,CAST(age AS BIGINT) as age FROM sourceTable" }transformSql是这样写吗?
这样应该是ok的
不使用transform的条件下,columnRowData序列化出错,报错com.dtstack.chunjun.throwable.CastException: String[source] can not cast to BigDecimal.
可以附上一下你的json脚本吗
{ "job": { "content": [ { "reader": { "parameter": { "password": "Pwd@1234_Cmict", "port": 33062, "cat": "insert", "column": [ { "name": "id", "type": "INT", "key": "id" }, { "name": "name", "type": "VARCHAR", "key": "name" }, { "name": "age", "type": "INT", "key": "age" } ], "host": "10.136.106.163", "jdbcUrl": "jdbc:mysql://10.136.106.163:33062/test", "start": { "timestamp": 1682330965654 }, "table": [ "source" ], "username": "root" }, "name": "binlogreader", "table": { "tableName": "sourceTable" } }, "writer": { "parameter": { "postSql": [], "password": "System_$1234#_Cmict", "column": [ { "name": "ID", "isPart": false, "type": "NUMBER", "key": "ID" }, { "name": "NAME", "isPart": false, "type": "VARCHAR2", "key": "NAME" }, { "name": "AGG", "isPart": false, "type": "NUMBER", "key": "AGG" } ], "connection": [ { "schema": "TEST", "jdbcUrl": "jdbc:oracle:thin:@10.136.106.163:1521/helowin", "table": [ "SOURCE1" ] } ], "writeMode": "insert", "flushIntervalMills": 1000, "batchSize": 1, "sourceIds": [ 57 ], "preSql": [], "username": "test" }, "name": "oraclewriter", "table": { "tableName": "sinkTable" } } } ], "setting": { "speed": { "channel": 1, "readerChannel": 1, "writerChannel": 1, "bytes": -1048576 }, "restore": { "isRestore": false, "isStream": false } } } }
用transformersql的情况下,数据类型不对应需要手动cast 另外,在上述场景下transformSQL好像没有存在的必要
"transformer": { "transformSql": "SELECT name,CAST(id AS BIGINT) as id,CAST(age AS BIGINT) as age FROM sourceTable" }transformSql是这样写吗?
这样应该是ok的
这样写依然没有解决同步不正确的问题
transformSQL的字段顺序需要保证一下。
另外
1.截图和你的错误信息对不上
2.binlog和logminer一样需要加这两个配置

transformSQL的字段顺序需要保证一下。 另外 1.截图和你的错误信息对不上 2.binlog和logminer一样需要加这两个配置
截图的错误是没有设置transform下报的
transformSQL的字段顺序需要保证一下。 另外 1.截图和你的错误信息对不上 2.binlog和logminer一样需要加这两个配置
加了配置没有生效,您可以给一个成功的示例吗?
报歉,这里确实是只能加sql才能保证准确性,但是sql需要手动cast确保数据准确 JSON作业对应了chunjun数据还原的场景,稍后给出示例
{ "job": { "content": [ { "nameMapping":{ "identifierMappings":{ "liuliu_test.cdc_source_test":"LIULIU.CDC_SINK_TEST" }, "casing":"UPPER" }, "reader": { "parameter": { "schema": "liuliu_test", "username": "root", "password": "root", "cat": "insert,delete,update", "jdbcUrl": "jdbc:mysql://localhost:3306/liuliu_test?useSSL=false", "host": "localhost", "port": 3306, "table": [ "cdc_source_test" ], "split": true, "pavingData": false }, "name": "binlogreader" }, "writer": { "parameter": { "connection": [ { "jdbcUrl": "jdbc:oracle:thin:@localhost:1521:orcl", "table": [ "*" ] } ], "writeMode": "insert", "username": "oracle", "password": "oracle" }, "name": "oraclewriter" } } ], "setting": { "speed": { "bytes": 0, "channel": 1 } } } }

还是有报错啊,这里获取sink端字段的索引异常了
这是我的json { "job": { "content": [ { "nameMapping":{ "identifierMappings":{ "test.source":"TEST.SOURCE1" }, "columnTypeMappings": { "id": "ID", "name": "NAME", "age": "AGG" }, "casing":"UPPER" }, "reader": { "parameter": { "password": "Pwd@1234_Cmict", "port": 33062, "cat": "insert", "splitUpdate": true, "pavingData": true, "host": "10.136.106.163", "jdbcUrl": "jdbc:mysql://10.136.106.163:33062/test", "start": { "timestamp": 1682330965654 }, "table": [ "source" ], "username": "root" }, "name": "binlogreader", "table": { "tableName": "sourceTable" } }, "writer": { "parameter": { "postSql": [], "password": "System_$1234#_Cmict", "connection": [ { "jdbcUrl": "jdbc:oracle:thin:@10.136.106.163:1521/helowin", "table": [ "*" ] } ], "writeMode": "insert", "flushIntervalMills": 1000, "batchSize": 1, "sourceIds": [ 57 ], "preSql": [], "username": "test" }, "name": "oraclewriter", "table": { "tableName": "sinkTable" } } } ], "setting": { "speed": { "channel": 1, "readerChannel": 1, "writerChannel": 1, "bytes": -1048576 }, "restore": { "isRestore": false, "isStream": false } } } }
paving=false。 在我的基础上修改一下,我用的最新的1.12-release

我拉了最新的1.12-releasese试了一下,还是报错啊,序列化的时候报错了 我在你的基础上改了一下json: { "job": { "content": [ { "nameMapping":{ "identifierMappings":{ "test.source":"TEST.SOURCE1" }, "casing":"UPPER" }, "reader": { "parameter": { "schema": "test", "username": "root", "password": "Pwd@1234_Cmict", "cat": "insert,delete,update", "jdbcUrl": "jdbc:mysql://10.136.106.163:33062/test?useSSL=false", "host": "10.136.106.163", "port": 33062, "table": [ "source" ], "split": true, "pavingData": false }, "name": "binlogreader" }, "writer": { "parameter": { "connection": [ { "jdbcUrl": "jdbc:oracle:thin:@10.136.106.163:1521/helowin", "table": [ "*" ] } ], "writeMode": "insert", "username": "test", "password": "System_$1234#_Cmict", "batchSize": 1, "flushIntervalMills": 1000 }, "name": "oraclewriter" } } ], "setting": { "speed": { "bytes": 0, "channel": 1 } } } }
可以看看你的代码有没有走到这行哈,
我看你这个SOURCE1是表名,如果是按上述脚本配置的话,在最新release上,库名表名等额外信息会在这里去掉的
![]()
可以看看你的代码有没有走到这行哈, 我看你这个SOURCE1是表名,如果是按上述脚本配置的话,在最新release上,库名表名等额外信息会在这里去掉的
感谢您,终于可以了。原来是我设置了batchSize=1,writeSingleRecordInternal在这里缺少了你截图的这段代码。

但是有新的问题,source表和sink表好像必须字段名一样才行,不一样nameList在这里获取columnList的索引因为获取不到指定字段名,返回结果为-1,会报数组越界异常。我在source表中的字段age对应sink的字段agg,代码中好像没有字段名columnName映射相关规则的配置。 "job": { "content": [ { "nameMapping":{ "identifierMappings":{ "test.source":"TEST.SOURCE2" }, "casing":"UPPER" }, "reader": { "parameter": { "schema": "test", "username": "root", "password": "Pwd@1234_Cmict", "cat": "insert,delete,update", "jdbcUrl": "jdbc:mysql://10.136.106.163:33062/test?useSSL=false", "host": "10.136.106.163", "port": 33062, "table": [ "source" ], "split": true, "pavingData": false }, "name": "binlogreader" }, "writer": { "parameter": { "connection": [ { "jdbcUrl": "jdbc:oracle:thin:@10.136.106.163:1521/helowin", "table": [ "*" ] } ], "writeMode": "insert", "username": "test", "password": "System_$1234#_Cmict", "batchSize": 1, "flushIntervalMills": 1000 }, "name": "oraclewriter" } } ], "setting": { "speed": { "bytes": 0, "channel": 1 } } } }
MappingConf只有identifierMappings及columnTypeMappings
我的问题,字段映射目前还没有。 这个功能的定位是用于数据还原的,要求上下游表的结构相同。 不过实现一个mapping应该是比较方便的,可以自己拓展一下,或者我们后期把这个功能完善
明白了,数据还原的功能是不是还没有实现。我看代码这里的逻辑没有写

此外,使用transfrom时,在sql中强制cast似乎没有作用,mysql int到oracle number存在数据一致性问题。
{ "job": { "content": [ { "transformer": { "transformSql": "SELECT cast(name as varchar),cast(id as int),cast(age as int) FROM sourceTable" }, "reader": { "parameter": { "password": "Pwd@1234_Cmict", "port": 33062, "cat": "insert", "split": true, "pavingData": false, "column": [ { "name": "name", "type": "VARCHAR", "key": "name" }, { "name": "id", "type": "INT", "key": "id" }, { "name": "age", "type": "INT", "key": "age" } ], "host": "10.136.106.163", "jdbcUrl": "jdbc:mysql://10.136.106.163:33062/test", "start": { "timestamp": 1682330965654 }, "table": [ "source" ], "username": "root" }, "name": "binlogreader", "table": { "tableName": "sourceTable" } }, "writer": { "parameter": { "postSql": [], "password": "System_$1234#_Cmict", "column": [ { "name": "NAME", "isPart": false, "type": "VARCHAR2", "key": "NAME" }, { "name": "ID", "isPart": false, "type": "NUMBER", "key": "ID" }, { "name": "AGE", "isPart": false, "type": "NUMBER", "key": "AGE" } ], "connection": [ { "schema": "TEST", "jdbcUrl": "jdbc:oracle:thin:@10.136.106.163:1521/helowin", "table": [ "SOURCE1" ] } ], "writeMode": "insert", "flushIntervalMills": 1000, "batchSize": 1, "sourceIds": [ 57 ], "preSql": [], "username": "test" }, "name": "oraclewriter", "table": { "tableName": "sinkTable" } } } ], "setting": { "speed": { "channel": 1, "readerChannel": 1, "writerChannel": 1, "bytes": -1048576 }, "restore": { "isRestore": false, "isStream": false } } } }
明白了,数据还原的功能是不是还没有实现。我看代码这里的逻辑没有写
这里有子类实现的吧,你说的是哪里没实现呢
此外,使用transfrom时,在sql中强制cast似乎没有作用,mysql int到oracle number存在数据一致性问题。
{ "job": { "content": [ { "transformer": { "transformSql": "SELECT cast(name as varchar),cast(id as int),cast(age as int) FROM sourceTable" }, "reader": { "parameter": { "password": "Pwd@1234_Cmict", "port": 33062, "cat": "insert", "split": true, "pavingData": false, "column": [ { "name": "name", "type": "VARCHAR", "key": "name" }, { "name": "id", "type": "INT", "key": "id" }, { "name": "age", "type": "INT", "key": "age" } ], "host": "10.136.106.163", "jdbcUrl": "jdbc:mysql://10.136.106.163:33062/test", "start": { "timestamp": 1682330965654 }, "table": [ "source" ], "username": "root" }, "name": "binlogreader", "table": { "tableName": "sourceTable" } }, "writer": { "parameter": { "postSql": [], "password": "System_$1234#_Cmict", "column": [ { "name": "NAME", "isPart": false, "type": "VARCHAR2", "key": "NAME" }, { "name": "ID", "isPart": false, "type": "NUMBER", "key": "ID" }, { "name": "AGE", "isPart": false, "type": "NUMBER", "key": "AGE" } ], "connection": [ { "schema": "TEST", "jdbcUrl": "jdbc:oracle:thin:@10.136.106.163:1521/helowin", "table": [ "SOURCE1" ] } ], "writeMode": "insert", "flushIntervalMills": 1000, "batchSize": 1, "sourceIds": [ 57 ], "preSql": [], "username": "test" }, "name": "oraclewriter", "table": { "tableName": "sinkTable" } } } ], "setting": { "speed": { "channel": 1, "readerChannel": 1, "writerChannel": 1, "bytes": -1048576 }, "restore": { "isRestore": false, "isStream": false } } } }
这里不是cast为int,要看number在oracle插件中的处理方式,我记得是BIGINT还是DECIMAL,可以看下OracleSqlConvert里面的具体实现方式
此外,使用transfrom时,在sql中强制cast似乎没有作用,mysql int到oracle number存在数据一致性问题。 { "job": { "content": [ { "transformer": { "transformSql": "SELECT cast(name as varchar),cast(id as int),cast(age as int) FROM sourceTable" }, "reader": { "parameter": { "password": "Pwd@1234_Cmict", "port": 33062, "cat": "insert", "split": true, "pavingData": false, "column": [ { "name": "name", "type": "VARCHAR", "key": "name" }, { "name": "id", "type": "INT", "key": "id" }, { "name": "age", "type": "INT", "key": "age" } ], "host": "10.136.106.163", "jdbcUrl": "jdbc:mysql://10.136.106.163:33062/test", "start": { "timestamp": 1682330965654 }, "table": [ "source" ], "username": "root" }, "name": "binlogreader", "table": { "tableName": "sourceTable" } }, "writer": { "parameter": { "postSql": [], "password": "System_$1234#_Cmict", "column": [ { "name": "NAME", "isPart": false, "type": "VARCHAR2", "key": "NAME" }, { "name": "ID", "isPart": false, "type": "NUMBER", "key": "ID" }, { "name": "AGE", "isPart": false, "type": "NUMBER", "key": "AGE" } ], "connection": [ { "schema": "TEST", "jdbcUrl": "jdbc:oracle:thin:@10.136.106.163:1521/helowin", "table": [ "SOURCE1" ] } ], "writeMode": "insert", "flushIntervalMills": 1000, "batchSize": 1, "sourceIds": [ 57 ], "preSql": [], "username": "test" }, "name": "oraclewriter", "table": { "tableName": "sinkTable" } } } ], "setting": { "speed": { "channel": 1, "readerChannel": 1, "writerChannel": 1, "bytes": -1048576 }, "restore": { "isRestore": false, "isStream": false } } } }
这里不是cast为int,要看number在oracle插件中的处理方式,我记得是BIGINT还是DECIMAL,可以看下OracleSqlConvert里面的具体实现方式

在oracle中是dicimal(38,8),cast为dicmal依然有问题,这是同步后的显示结果

明白了,数据还原的功能是不是还没有实现。我看代码这里的逻辑没有写
这里有子类实现的吧,你说的是哪里没实现呢
我又看了一下代码,数据还原需要有restoration的配置,有示例的json吗