seatunnel icon indicating copy to clipboard operation
seatunnel copied to clipboard

[Bug][Connector-V2] fix NPE when decimal type precision is incompatible for Paimon

Open hawk9821 opened this issue 6 months ago • 0 comments

Purpose of this pull request

source : mysql

CREATE TABLE `product` (
  `id` int NOT NULL AUTO_INCREMENT COMMENT '主键',
  `name` varchar(100) DEFAULT NULL COMMENT '商品名称',
  `price` decimal(4,1) DEFAULT NULL COMMENT '单价',
  PRIMARY KEY (`id`)
)  COMMENT='商品表' ;

insert into product values (1,'product1', 101.1);
insert into product values (2,'product2', 999.9);

sink : paimon flink

CREATE CATALOG paimon WITH (
  'type' = 'paimon',
  'warehouse' = '/tmp/paimon'
);

use catalog `paimon`;

create database if not exists `test`;

CREATE TABLE  `test`.`product` (
  `id` int NOT NULL COMMENT '主键',
  `name` varchar(100)  COMMENT '商品名称',
  `price` decimal(4,2)  COMMENT '单价',
  CONSTRAINT `PK_id` PRIMARY KEY (`id`) NOT ENFORCED
)

conf

env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  jdbc {
    url = "jdbc:mysql://127.0.0.1:3306/source?connectTimeout=5000"
    driver = "com.mysql.cj.jdbc.Driver"
    connection_check_timeout_sec = 100
    user = "*****"
    password = "******"
    table_path = "source.product"
    split.size = 8096
    plugin_output = "table_info"
  }
}

sink {
  Paimon {
    plugin_input = "table_info"
    schema_save_mode = "RECREATE_SCHEMA"
    catalog_name = "seatunnel_test"
    warehouse = "/tmp/paimon"
    database = "test"
    table = "product"
    generate_sink_sql = true
    schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
    data_save_mode = "APPEND_DATA"
  }
}

Exception before the fix : the exception information is unclear, which is not convenient to locate the problem

Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228)
	at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
	at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:40)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: table source.product sink throw error
	at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:302)
	at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:70)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)
	at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:70)
	at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50)
	at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51)
	at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:72)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165)
	at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:77)
	at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:694)
	at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1023)
	at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: table source.product sink throw error
	at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.abortPrepare(MultiTableSinkWriter.java:309)
	at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:203)
	... 17 more
Caused by: java.lang.RuntimeException: table source.product sink throw error
	at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.subSinkErrorCheck(MultiTableSinkWriter.java:140)
	at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.checkQueueRemain(MultiTableSinkWriter.java:358)
	at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.abortPrepare(MultiTableSinkWriter.java:289)
	... 18 more
Caused by: java.lang.NullPointerException
	at org.apache.paimon.data.AbstractBinaryWriter.writeDecimal(AbstractBinaryWriter.java:128)
	at org.apache.paimon.data.BinaryRowWriter.writeDecimal(BinaryRowWriter.java:25)
	at org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter.reconvert(RowConverter.java:412)
	at org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.write(PaimonSinkWriter.java:191)
	at org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.write(PaimonSinkWriter.java:71)
	at org.apache.seatunnel.api.sink.multitablesink.MultiTableWriterRunnable.run(MultiTableWriterRunnable.java:67)
	... 6 more

	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:220)
	... 2 more

Exception after the fix :

Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228)
	at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
	at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:40)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: table source.product sink throw error
	at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:302)
	at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:70)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)
	at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:70)
	at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50)
	at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51)
	at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:72)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165)
	at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:77)
	at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:694)
	at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1023)
	at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: table source.product sink throw error
	at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.abortPrepare(MultiTableSinkWriter.java:309)
	at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:203)
	... 17 more
Caused by: java.lang.RuntimeException: table source.product sink throw error
	at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.subSinkErrorCheck(MultiTableSinkWriter.java:140)
	at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.checkQueueRemain(MultiTableSinkWriter.java:358)
	at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.abortPrepare(MultiTableSinkWriter.java:289)
	... 18 more
Caused by: org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException: ErrorCode:[PAIMON-11], ErrorDescription:[deciaml type precision is incompatible. ] - `price` field value is: 101.1, except filed schema of sink is `price` DECIMAL(4, 1), but the filed in sink table which actual schema is `price` DECIMAL(4, 2) '单价'.Please check schema of sink table.
	at org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter.checkCanWriteWithSchema(RowConverter.java:542)
	at org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter.reconvert(RowConverter.java:392)
	at org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.write(PaimonSinkWriter.java:191)
	at org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.write(PaimonSinkWriter.java:71)
	at org.apache.seatunnel.api.sink.multitablesink.MultiTableWriterRunnable.run(MultiTableWriterRunnable.java:67)
	... 6 more

	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:220)
	... 2 more

Does this PR introduce any user-facing change?

How was this patch tested?

Check list

  • [ ] If any new Jar binary package adding in your PR, please add License Notice according New License Guide
  • [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/seatunnel/tree/dev/docs
  • [ ] If you are contributing the connector code, please check that the following files are updated:
    1. Update plugin-mapping.properties and add new connector information in it
    2. Update the pom file of seatunnel-dist
    3. Add ci label in label-scope-conf
    4. Add e2e testcase in seatunnel-e2e
    5. Update connector plugin_config

hawk9821 avatar Jun 18 '25 01:06 hawk9821