spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-38717][SQL] Handle Hive's bucket spec case preserving behaviour

Open peter-toth opened this issue 2 years ago • 6 comments

What changes were proposed in this pull request?

When converting a native table metadata representation CatalogTable to HiveTable make sure bucket spec uses an existing column.

Does this PR introduce any user-facing change?

Hive metastore seems to be not case preserving with columns but case preserving with bucket spec, which means the following table creation:

  CREATE TABLE t(
    c STRING,
    B_C STRING
  )
  PARTITIONED BY (p_c STRING)
  CLUSTERED BY (B_C) INTO 4 BUCKETS
  STORED AS PARQUET

followed by a query:

  SELECT * FROM t

fails with:

Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Bucket columns B_C is not part of the table columns ([FieldSchema(name:c, type:string, comment:null), FieldSchema(name:b_c, type:string, comment:null)]

Why are the changes needed?

Bug fix.

How was this patch tested?

Added new UT.

peter-toth avatar Mar 31 '22 16:03 peter-toth

@peter-toth mind rebasing or retriggering the build? it should be fixed now.

HyukjinKwon avatar Apr 01 '22 04:04 HyukjinKwon

@peter-toth mind rebasing or retriggering the build? it should be fixed now.

Thanks. Done.

peter-toth avatar Apr 01 '22 07:04 peter-toth

Full stacktrace for the reference:

org.apache.hadoop.hive.ql.metadata.HiveException: Bucket columns B_C is not part of the table columns ([FieldSchema(name:c, type:string, comment:null), FieldSchema(name:b_c, type:string, comment:null)]
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: Bucket columns B_C is not part of the table columns ([FieldSchema(name:c, type:string, comment:null), FieldSchema(name:b_c, type:string, comment:null)]
	at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:110)
	at org.apache.spark.sql.hive.HiveExternalCatalog.listPartitionsByFilter(HiveExternalCatalog.scala:1282)
	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.listPartitionsByFilter(ExternalCatalogWithListener.scala:262)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitionsByFilter(SessionCatalog.scala:1275)
	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.listPartitionsByFilter(ExternalCatalogUtils.scala:144)
	at org.apache.spark.sql.execution.datasources.CatalogFileIndex.filterPartitions(CatalogFileIndex.scala:74)
	at org.apache.spark.sql.execution.datasources.CatalogFileIndex.listFiles(CatalogFileIndex.scala:59)
	at org.apache.spark.sql.execution.FileSourceScanExec.selectedPartitions$lzycompute(DataSourceScanExec.scala:254)
	at org.apache.spark.sql.execution.FileSourceScanExec.selectedPartitions(DataSourceScanExec.scala:249)
	at org.apache.spark.sql.execution.FileSourceScanExec.dynamicallySelectedPartitions$lzycompute(DataSourceScanExec.scala:284)
	at org.apache.spark.sql.execution.FileSourceScanExec.dynamicallySelectedPartitions(DataSourceScanExec.scala:265)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:463)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:448)
	at org.apache.spark.sql.execution.FileSourceScanExec.doExecuteColumnar(DataSourceScanExec.scala:547)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:221)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
	at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:217)
	at org.apache.spark.sql.execution.InputAdapter.doExecuteColumnar(WholeStageCodegenExec.scala:521)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:221)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
	at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:217)
	at org.apache.spark.sql.execution.ColumnarToRowExec.inputRDDs(Columnar.scala:205)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:751)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:194)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:340)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:421)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3863)
	at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3119)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3854)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3852)
	at org.apache.spark.sql.Dataset.collect(Dataset.scala:3119)
	at org.apache.spark.sql.hive.execution.SQLQuerySuiteBase.$anonfun$new$546(SQLQuerySuite.scala:2711)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.sql.test.SQLTestUtilsBase.withTable(SQLTestUtils.scala:306)
	at org.apache.spark.sql.test.SQLTestUtilsBase.withTable$(SQLTestUtils.scala:304)
	at org.apache.spark.sql.hive.execution.SQLQuerySuiteBase.withTable(SQLQuerySuite.scala:72)
	at org.apache.spark.sql.hive.execution.SQLQuerySuiteBase.$anonfun$new$545(SQLQuerySuite.scala:2699)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite.$anonfun$test$5(AdaptiveTestUtils.scala:65)
	at org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf(SQLHelper.scala:54)
	at org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf$(SQLHelper.scala:38)
	at org.apache.spark.sql.hive.execution.SQLQuerySuiteBase.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(SQLQuerySuite.scala:72)
	at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf(SQLTestUtils.scala:247)
	at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf$(SQLTestUtils.scala:245)
	at org.apache.spark.sql.hive.execution.SQLQuerySuiteBase.withSQLConf(SQLQuerySuite.scala:72)
	at org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite.$anonfun$test$4(AdaptiveTestUtils.scala:65)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:190)
	at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:203)
	at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:188)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:200)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:200)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:182)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:64)
	at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
	at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
	at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:64)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:233)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:233)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:232)
	at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1563)
	at org.scalatest.Suite.run(Suite.scala:1112)
	at org.scalatest.Suite.run$(Suite.scala:1094)
	at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1563)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:237)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
	at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:237)
	at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:236)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:64)
	at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
	at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
	at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
	at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:64)
	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1320)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1314)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1314)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:993)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:971)
	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1480)
	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:971)
	at org.scalatest.tools.Runner$.run(Runner.scala:798)
	at org.scalatest.tools.Runner.run(Runner.scala)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:38)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:25)
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Bucket columns B_C is not part of the table columns ([FieldSchema(name:c, type:string, comment:null), FieldSchema(name:b_c, type:string, comment:null)]
	at org.apache.hadoop.hive.ql.metadata.Table.setBucketCols(Table.java:552)
	at org.apache.spark.sql.hive.client.HiveClientImpl$.toHiveTable(HiveClientImpl.scala:1098)
	at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$getPartitionsByFilter$1(HiveClientImpl.scala:764)
	at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:294)
	at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:225)
	at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:224)
	at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:274)
	at org.apache.spark.sql.hive.client.HiveClientImpl.getPartitionsByFilter(HiveClientImpl.scala:763)
	at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$listPartitionsByFilter$1(HiveExternalCatalog.scala:1287)
	at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:101)

The error doesn't show up if the column used for bucketing is defined lowercase as we normalize the columns in bucketspec in PreprocessTableCreation to the defined ones.

peter-toth avatar Apr 01 '22 07:04 peter-toth

@HyukjinKwon, @dongjoon-hyun could you please take a loot at this PR?

peter-toth avatar Apr 26 '22 11:04 peter-toth

cc @cloud-fan too

peter-toth avatar Sep 07 '22 08:09 peter-toth

@HyukjinKwon, @dongjoon-hyun this is an old, small bugfix PR, but it fell into the cracks...

peter-toth avatar Sep 07 '22 08:09 peter-toth

thanks, merging to master!

cloud-fan avatar Sep 23 '22 09:09 cloud-fan

@peter-toth can you open a backport PR for 3.3? There are conflicts. Thanks!

cloud-fan avatar Sep 23 '22 09:09 cloud-fan

Thanks for the review! Sure, I will open a 3.3 backport PR today...

peter-toth avatar Sep 23 '22 10:09 peter-toth