TiBigData icon indicating copy to clipboard operation
TiBigData copied to clipboard

When the DDL statement is different from the actual schema in the database, ArrayIndexOutOfBoundsException will be reported

Open pyscala opened this issue 2 years ago • 0 comments

DDL

CREATE TABLE if not exists table_a (
       `user_id` BIGINT NULL COMMENT '',
       `id` BIGINT NULL COMMENT '',
       `position_id` BIGINT NULL COMMENT '',
       `status` STRING NULL COMMENT '',
       `transaction_id` BIGINT NULL COMMENT '',
    PRIMARY KEY (`user_id`, `id`) NOT ENFORCED
    ) WITH(
          'connector'='kafka',
          'topic'='xxxx',
          'properties.bootstrap.servers'='xxx',
          'properties.group.id'='xxx',
          'properties.auto.offset.reset'='earliest',
          'scan.startup.mode'='earliest-offset',
          'format'='debezium-avro-confluent',
          'debezium-avro-confluent.schema-registry.url'='xxxx'
          );
CREATE TABLE if not exists table_b (
     `user_id` BIGINT NULL COMMENT '',
     `id` BIGINT NULL COMMENT '',
     `position_id` BIGINT NULL COMMENT '',
     `status` STRING NULL COMMENT '',
     `transaction_id` BIGINT NULL COMMENT '',
    ) WITH (
          'connector' = 'tidb',
          'tidb.database.url' = 'jdbc:mysql://xxxx',
          'tidb.username' = 'xxxx',
          'tidb.password' = 'xxxxx',
          'tidb.database.name' = 'xxxxx',
          'tidb.maximum.pool.size' = '1',
          'tidb.minimum.idle.size' = '1',
          'tidb.table.name' = 'withdraws',
          'tidb.write_mode' = 'upsert',
          'sink.buffer-flush.max-rows' = '0'
          );
insert into table_b select * from table_a;

The actual schema in tidb has one more auto-increment column than table_b, and the following error is reported when the task is started.

java.lang.ArrayIndexOutOfBoundsException: -1
	at org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.lambda$createBufferReduceExecutor$1(JdbcDynamicOutputFormatBuilder.java:145) ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
	at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) ~[?:1.8.0_291]
	at java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032) ~[?:1.8.0_291]
	at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) ~[?:1.8.0_291]
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[?:1.8.0_291]
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[?:1.8.0_291]
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546) ~[?:1.8.0_291]
	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) ~[?:1.8.0_291]
	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) ~[?:1.8.0_291]
	at org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.createBufferReduceExecutor(JdbcDynamicOutputFormatBuilder.java:145) ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.lambda$build$edc08011$1(JdbcDynamicOutputFormatBuilder.java:106) ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.createAndOpenStatementExecutor(JdbcBatchingOutputFormat.java:142) ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:116) ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.open(GenericJdbcSinkFunction.java:49) ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) ~[flink-tidb-connector-1.13-0.0.4.jar:?]
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:58) ~[flink-table-blink_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_291]
2021-11-19 07:55:36,996 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Clearing resource requirements of job 25e6800fc67392651c32db54b2fcc483 

pyscala avatar Nov 23 '21 02:11 pyscala