seatunnel icon indicating copy to clipboard operation
seatunnel copied to clipboard

[Bug] [Kafka Source] format_error_handle_way = skip is Not effective

Open Xuzhengz opened this issue 1 year ago • 3 comments

Search before asking

  • [X] I had searched in the issues and found no similar issues.

What happened

kafka 配置 format_error_handle_way = skip 时 输入错误格式时 并没有跳过此行

SeaTunnel Version

2.3.5

SeaTunnel Config

{
  "env": {
    "execution.parallelism": "1",
    "job.mode": "STREAMING",
    "checkpoint.interval": "5000"
  },
  "source": [
    {
      "plugin_name": "Kafka",
      "bootstrap.servers": "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      "topic": "ocean",
      "consumer.group": "ocean20240506",
      "schema": {
        "fields": {
          "id": "int",
          "name": "string"
        }
      },
      "field_delimiter": ",",
      "start_mode": "latest",
      "format_error_handle_way": "skip",
      "format": "text",
      "result_table_name": "ocean"
    }
  ],
  "transform": [
  ],
  "sink": [
    {
      "source_table_name": "ocean",
      "plugin_name": "console"
    }
  ]
}

Running Command

bin/seatunnel.sh -c kafka-demo.json

Error Exception

Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.util.concurrent.CompletionException: java.lang.NumberFormatException: For input string: "dasd>dasd"
	at java.base/java.util.concurrent.CompletableFuture.reportJoin(CompletableFuture.java:412)
	at java.base/java.util.concurrent.CompletableFuture.join(CompletableFuture.java:2044)
	at org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceReader.lambda$pollNext$4(KafkaSourceReader.java:199)
	at java.base/java.lang.Iterable.forEach(Iterable.java:75)
	at org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceReader.pollNext(KafkaSourceReader.java:117)
	at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:156)
	at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:116)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
	at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:121)
	at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:703)
	at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1004)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

Zeta or Flink or Spark Version

zeta

Java or Scala Version

1.8

Screenshots

image

Are you willing to submit PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

Xuzhengz avatar May 06 '24 05:05 Xuzhengz

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

github-actions[bot] avatar Jun 06 '24 00:06 github-actions[bot]

me too

tmby1314 avatar Aug 13 '24 06:08 tmby1314

+1

xuefengshuai avatar Aug 27 '24 08:08 xuefengshuai