seatunnel
seatunnel copied to clipboard
[Bug] [Kafka Source] format_error_handle_way = skip is Not effective
Search before asking
- [X] I had searched in the issues and found no similar issues.
What happened
kafka 配置 format_error_handle_way = skip 时 输入错误格式时 并没有跳过此行
SeaTunnel Version
2.3.5
SeaTunnel Config
{
"env": {
"execution.parallelism": "1",
"job.mode": "STREAMING",
"checkpoint.interval": "5000"
},
"source": [
{
"plugin_name": "Kafka",
"bootstrap.servers": "hadoop102:9092,hadoop103:9092,hadoop104:9092",
"topic": "ocean",
"consumer.group": "ocean20240506",
"schema": {
"fields": {
"id": "int",
"name": "string"
}
},
"field_delimiter": ",",
"start_mode": "latest",
"format_error_handle_way": "skip",
"format": "text",
"result_table_name": "ocean"
}
],
"transform": [
],
"sink": [
{
"source_table_name": "ocean",
"plugin_name": "console"
}
]
}
Running Command
bin/seatunnel.sh -c kafka-demo.json
Error Exception
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.util.concurrent.CompletionException: java.lang.NumberFormatException: For input string: "dasd>dasd"
at java.base/java.util.concurrent.CompletableFuture.reportJoin(CompletableFuture.java:412)
at java.base/java.util.concurrent.CompletableFuture.join(CompletableFuture.java:2044)
at org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceReader.lambda$pollNext$4(KafkaSourceReader.java:199)
at java.base/java.lang.Iterable.forEach(Iterable.java:75)
at org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceReader.pollNext(KafkaSourceReader.java:117)
at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:156)
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:116)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:121)
at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:703)
at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1004)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Zeta or Flink or Spark Version
zeta
Java or Scala Version
1.8
Screenshots
Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.
me too
+1