spark
spark copied to clipboard
[SPARK-48100][SQL] Fix issues in skipping nested structure fields not selected in schema
What changes were proposed in this pull request?
Previously, the XML parser can't skip nested structure data fields effectively when they were not selected in the schema. For instance, in the below example, df.select("struct2").collect()
returns Seq(null)
as struct1
wasn't effectively skipped. This PR fixes this issue.
<ROW>
<struct1>
<innerStruct><field1>1</field1></innerStruct>
</struct1>
<struct2>
<field2>2</field2>
</struct2>
</ROW>
We also added more tests regarding projection in this PR.
Why are the changes needed?
Fix a bug.
Does this PR introduce any user-facing change?
No
How was this patch tested?
UTs
Was this patch authored or co-authored using generative AI tooling?
No
- select with string xml object *** FAILED *** (14 milliseconds)[0m[0m
Failed to analyze query: org.apache.spark.sql.AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `John` cannot be resolved. Did you mean one of the following? [`name`, `metadata`]. SQLSTATE: 42703;[0m[0m
'Project ['John][0m[0m
+- LogicalRDD [name#674349, metadata#674350], false[0m[0m
m
'Project ['John][0m[0m
+- LogicalRDD [name#674349, metadata#674350], false[0m[0m
m
m
org.apache.spark.sql.AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `John` cannot be resolved. Did you mean one of the following? [`name`, `metadata`]. SQLSTATE: 42703;[0m[0m
'Project ['John][0m[0m
+- LogicalRDD [name#674349, metadata#674350], false[0m[0m
m
at org.apache.spark.sql.errors.QueryCompilationErrors$.unresolvedAttributeError(QueryCompilationErrors.scala:330)[0m[0m
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$$failUnresolvedAttribute(CheckAnalysis.scala:142)[0m[0m
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$7(CheckAnalysis.scala:304)[0m[0m
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$7$adapted(CheckAnalysis.scala:302)[0m[0m
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:244)[0m[0m
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$6(CheckAnalysis.scala:302)[0m[0m
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$6$adapted(CheckAnalysis.scala:302)[0m[0m
at scala.collection.immutable.List.foreach(List.scala:334)[0m[0m
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2(CheckAnalysis.scala:302)[0m[0m
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2$adapted(CheckAnalysis.scala:216)[0m[0m
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:244)[0m[0m
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0(CheckAnalysis.scala:216)[0m[0m
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0$(CheckAnalysis.scala:198)[0m[0m
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis0(Analyzer.scala:192)[0m[0m
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:190)[0m[0m
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:161)[0m[0m
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:192)[0m[0m
at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:214)[0m[0m
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:393)[0m[0m
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:212)[0m[0m
at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:92)[0m[0m
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)[0m[0m
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:225)[0m[0m
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:599)[0m[0m
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:225)[0m[0m
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:919)[0m[0m
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:224)[0m[0m
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:92)[0m[0m
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:89)[0m[0m
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:73)[0m[0m
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:94)[0m[0m
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:919)[0m[0m
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:92)[0m[0m
at org.apache.spark.sql.Dataset.withPlan(Dataset.scala:4468)[0m[0m
at org.apache.spark.sql.Dataset.select(Dataset.scala:1592)[0m[0m
at org.apache.spark.sql.Dataset.select(Dataset.scala:1609)[0m[0m
at org.apache.spark.sql.execution.datasources.xml.XmlSuite.$anonfun$new$288(XmlSuite.scala:3060)[0m[0m
at org.apache.spark.sql.QueryTest.checkAnswer(QueryTest.scala:136)[0m[0m
at org.apache.spark.sql.execution.datasources.xml.XmlSuite.$anonfun$new$287(XmlSuite.scala:3060)[0m[0m
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)[0m[0m
at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)[0m[0m
at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)[0m[0m
at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)[0m[0m
at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)[0m[0m
at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)[0m[0m
at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)[0m[0m
at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)[0m[0m
at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)[0m[0m
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)[0m[0m
at org.scalatest.Transformer.apply(Transformer.scala:22)[0m[0m
at org.scalatest.Transformer.apply(Transformer.scala:20)[0m[0m
at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)[0m[0m
at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)[0m[0m
at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)[0m[0m
at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)[0m[0m
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)[0m[0m
at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)[0m[0m
at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)[0m[0m
at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)[0m[0m
at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)[0m[0m
at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)[0m[0m
at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:69)[0m[0m
at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)[0m[0m
at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)[0m[0m
at scala.collection.immutable.List.foreach(List.scala:334)[0m[0m
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)[0m[0m
at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)[0m[0m
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)[0m[0m
at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)[0m[0m
at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)[0m[0m
at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)[0m[0m
at org.scalatest.Suite.run(Suite.scala:1114)[0m[0m
at org.scalatest.Suite.run$(Suite.scala:1096)[0m[0m
at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)[0m[0m
at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)[0m[0m
at org.scalatest.SuperEngine.runImpl(Engine.scala:535)[0m[0m
at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)[0m[0m
at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)[0m[0m
at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69)[0m[0m
at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)[0m[0m
at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)[0m[0m
at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)[0m[0m
at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:69)[0m[0m
at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)[0m[0m
at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)[0m[0m
at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)[0m[0m
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)[0m[0m
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)[0m[0m
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)[0m[0m
at java.base/java.lang.Thread.run(Thread.java:840) (QueryTest.scala:139)[0m[0m
org.scalatest.exceptions.TestFailedException:[0m[0m
at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)[0m[0m
at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)[0m[0m
at org.scalatest.funsuite.AnyFunSuite.newAssertionFailedException(AnyFunSuite.scala:1564)[0m[0m
at org.scalatest.Assertions.fail(Assertions.scala:933)[0m[0m
at org.scalatest.Assertions.fail$(Assertions.scala:929)[0m[0m
at org.scalatest.funsuite.AnyFunSuite.fail(AnyFunSuite.scala:1564)[0m[0m
at org.apache.spark.sql.QueryTest.checkAnswer(QueryTest.scala:139)[0m[0m
at org.apache.spark.sql.execution.datasources.xml.XmlSuite.$anonfun$new$287(XmlSuite.scala:3060)[0m[0m
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)[0m[0m
at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)[0m[0m
at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)[0m[0m
at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)[0m[0m
at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)[0m[0m
at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)[0m[0m
at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)[0m[0m
at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)[0m[0m
at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)[0m[0m
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)[0m[0m
at org.scalatest.Transformer.apply(Transformer.scala:22)[0m[0m
at org.scalatest.Transformer.apply(Transformer.scala:20)[0m[0m
at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)[0m[0m
at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)[0m[0m
at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)[0m[0m
at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)[0m[0m
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)[0m[0m
at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)[0m[0m
at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)[0m[0m
at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)[0m[0m
at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)[0m[0m
at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)[0m[0m
at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:69)[0m[0m
at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)[0m[0m
at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)[0m[0m
at scala.collection.immutable.List.foreach(List.scala:334)[0m[0m
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)[0m[0m
at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)[0m[0m
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)[0m[0m
at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)[0m[0m
at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)[0m[0m
at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)[0m[0m
at org.scalatest.Suite.run(Suite.scala:1114)[0m[0m
at org.scalatest.Suite.run$(Suite.scala:1096)[0m[0m
at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)[0m[0m
at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)[0m[0m
at org.scalatest.SuperEngine.runImpl(Engine.scala:535)[0m[0m
at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)[0m[0m
at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)[0m[0m
at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69)[0m[0m
at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)[0m[0m
at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)[0m[0m
at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)[0m[0m
at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:69)[0m[0m
at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)[0m[0m
at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)[0m[0m
at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)[0m[0m
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)[0m[0m
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)[0m[0m
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)[0m[0m
at java.base/java.lang.Thread.run(Thread.java:840)[0m[0m
Merged to master.