seatunnel icon indicating copy to clipboard operation
seatunnel copied to clipboard

[Bug] [file] Caused by: org.apache.spark.sql.AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed

Open Alxe1 opened this issue 2 years ago • 0 comments

Search before asking

  • [X] I had searched in the issues and found no similar issues.

What happened

Use file connector get spark task error

SeaTunnel Version

2.3.0-beta

SeaTunnel Config

env {
  spark.app.name = "SeaTunnel"
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
}

source {
  file {
    path="hdfs://master:8020/my_file.txt"
    result_table_name="local_table"
  }
}

transform {
}

sink {

  Console {}

}

Running Command

./bin/start-seatunnel-spark.sh -m local[1] -e client -c test.conf

Error Exception

22/11/23 17:56:28 ERROR Seatunnel: 
===============================================================================



Exception in thread "main" org.apache.seatunnel.core.base.exception.CommandExecuteException: Execute Spark task error
	at org.apache.seatunnel.core.spark.command.SparkTaskExecuteCommand.execute(SparkTaskExecuteCommand.java:69)
	at org.apache.seatunnel.core.base.Seatunnel.run(Seatunnel.java:39)
	at org.apache.seatunnel.core.spark.SeatunnelSpark.main(SeatunnelSpark.java:33)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:863)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:938)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:947)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.spark.sql.AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the
referenced columns only include the internal corrupt record column
(named _corrupt_record by default). For example:
spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()
and spark.read.schema(schema).json(file).select("_corrupt_record").show().
Instead, you can cache or save the parsed results and then send the same query.
For example, val df = spark.read.schema(schema).json(file).cache() and then
df.filter($"_corrupt_record".isNotNull).count().;
	at org.apache.spark.sql.execution.datasources.json.JsonFileFormat.buildReader(JsonFileFormat.scala:120)
	at org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:130)
	at org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:200)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:366)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:321)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:428)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:576)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:178)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:174)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:202)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:199)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:174)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:294)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:386)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3415)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2553)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2553)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3390)
	at org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$executeQuery$1(SQLExecution.scala:83)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1$$anonfun$apply$1.apply(SQLExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecutionMetrics$.withMetrics(QueryExecutionMetrics.scala:141)
	at org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$withMetrics(SQLExecution.scala:178)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:93)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:200)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:92)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3389)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2553)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2767)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:256)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:293)
	at org.apache.spark.sql.Dataset.show(Dataset.scala:756)
	at org.apache.seatunnel.spark.console.sink.Console.output(Console.scala:38)
	at org.apache.seatunnel.spark.console.sink.Console.output(Console.scala:28)
	at org.apache.seatunnel.spark.SparkEnvironment.sinkProcess(SparkEnvironment.java:186)
	at org.apache.seatunnel.spark.batch.SparkBatchExecution.start(SparkBatchExecution.java:56)
	at org.apache.seatunnel.core.spark.command.SparkTaskExecuteCommand.execute(SparkTaskExecuteCommand.java:67)
	... 14 more

Flink or Spark Version

spark 2.4.8

Java or Scala Version

java 1.8

Screenshots

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

Alxe1 avatar Nov 23 '22 09:11 Alxe1