seatunnel
seatunnel copied to clipboard
[Bug][Connector-V2] fix NPE when decimal type precision is incompatible for Paimon
Purpose of this pull request
source : mysql
CREATE TABLE `product` (
`id` int NOT NULL AUTO_INCREMENT COMMENT '主键',
`name` varchar(100) DEFAULT NULL COMMENT '商品名称',
`price` decimal(4,1) DEFAULT NULL COMMENT '单价',
PRIMARY KEY (`id`)
) COMMENT='商品表' ;
insert into product values (1,'product1', 101.1);
insert into product values (2,'product2', 999.9);
sink : paimon flink
CREATE CATALOG paimon WITH (
'type' = 'paimon',
'warehouse' = '/tmp/paimon'
);
use catalog `paimon`;
create database if not exists `test`;
CREATE TABLE `test`.`product` (
`id` int NOT NULL COMMENT '主键',
`name` varchar(100) COMMENT '商品名称',
`price` decimal(4,2) COMMENT '单价',
CONSTRAINT `PK_id` PRIMARY KEY (`id`) NOT ENFORCED
)
conf
env {
parallelism = 1
job.mode = "BATCH"
}
source {
jdbc {
url = "jdbc:mysql://127.0.0.1:3306/source?connectTimeout=5000"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "*****"
password = "******"
table_path = "source.product"
split.size = 8096
plugin_output = "table_info"
}
}
sink {
Paimon {
plugin_input = "table_info"
schema_save_mode = "RECREATE_SCHEMA"
catalog_name = "seatunnel_test"
warehouse = "/tmp/paimon"
database = "test"
table = "product"
generate_sink_sql = true
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
data_save_mode = "APPEND_DATA"
}
}
Exception before the fix : the exception information is unclear, which is not convenient to locate the problem
Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228)
at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:40)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: table source.product sink throw error
at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:302)
at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:70)
at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)
at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:70)
at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50)
at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51)
at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:72)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165)
at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:77)
at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:694)
at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1023)
at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: table source.product sink throw error
at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.abortPrepare(MultiTableSinkWriter.java:309)
at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:203)
... 17 more
Caused by: java.lang.RuntimeException: table source.product sink throw error
at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.subSinkErrorCheck(MultiTableSinkWriter.java:140)
at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.checkQueueRemain(MultiTableSinkWriter.java:358)
at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.abortPrepare(MultiTableSinkWriter.java:289)
... 18 more
Caused by: java.lang.NullPointerException
at org.apache.paimon.data.AbstractBinaryWriter.writeDecimal(AbstractBinaryWriter.java:128)
at org.apache.paimon.data.BinaryRowWriter.writeDecimal(BinaryRowWriter.java:25)
at org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter.reconvert(RowConverter.java:412)
at org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.write(PaimonSinkWriter.java:191)
at org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.write(PaimonSinkWriter.java:71)
at org.apache.seatunnel.api.sink.multitablesink.MultiTableWriterRunnable.run(MultiTableWriterRunnable.java:67)
... 6 more
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:220)
... 2 more
Exception after the fix :
Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228)
at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:40)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: table source.product sink throw error
at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:302)
at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:70)
at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)
at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:70)
at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50)
at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51)
at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:72)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165)
at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:77)
at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:694)
at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1023)
at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: table source.product sink throw error
at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.abortPrepare(MultiTableSinkWriter.java:309)
at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:203)
... 17 more
Caused by: java.lang.RuntimeException: table source.product sink throw error
at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.subSinkErrorCheck(MultiTableSinkWriter.java:140)
at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.checkQueueRemain(MultiTableSinkWriter.java:358)
at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.abortPrepare(MultiTableSinkWriter.java:289)
... 18 more
Caused by: org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException: ErrorCode:[PAIMON-11], ErrorDescription:[deciaml type precision is incompatible. ] - `price` field value is: 101.1, except filed schema of sink is `price` DECIMAL(4, 1), but the filed in sink table which actual schema is `price` DECIMAL(4, 2) '单价'.Please check schema of sink table.
at org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter.checkCanWriteWithSchema(RowConverter.java:542)
at org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter.reconvert(RowConverter.java:392)
at org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.write(PaimonSinkWriter.java:191)
at org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.write(PaimonSinkWriter.java:71)
at org.apache.seatunnel.api.sink.multitablesink.MultiTableWriterRunnable.run(MultiTableWriterRunnable.java:67)
... 6 more
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:220)
... 2 more
Does this PR introduce any user-facing change?
How was this patch tested?
Check list
- [ ] If any new Jar binary package adding in your PR, please add License Notice according New License Guide
- [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/seatunnel/tree/dev/docs
- [ ] If you are contributing the connector code, please check that the following files are updated:
- Update plugin-mapping.properties and add new connector information in it
- Update the pom file of seatunnel-dist
- Add ci label in label-scope-conf
- Add e2e testcase in seatunnel-e2e
- Update connector plugin_config