kyuubi icon indicating copy to clipboard operation
kyuubi copied to clipboard

[Bug] [authZ]can't update iceberg table

Open lordk911 opened this issue 3 years ago • 5 comments

Code of Conduct

Search before asking

  • [X] I have searched in the issues and found no similar issues.

Describe the bug

env: spark:3.2.1 \ authz:master

1、complie authz from master branch clean package -pl :kyuubi-spark-authz_2.12 -DskipTests -Dspark.version=3.2.1 -Dranger.version=1.2.0 2、put kyuubi-spark-authz_2.12-1.7.0-SNAPSHOT.jar to $SPARK_HOME/jars 3、config spark spark.sql.extensions org.apache.kyuubi.plugin.spark.authz.ranger.RangerSparkExtension,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.apache.kyuubi.sql.KyuubiSparkSQLExtension

spark.sql.catalog.iceberg_catalog org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.iceberg_catalog.type hive

spark.sql.catalog.spark_catalog org.apache.iceberg.spark.SparkSessionCatalog spark.sql.catalog.spark_catalog.type hive

4、use spark-shell 5、spark.sql("create table test.iceberg919(id bigint, name string) USING iceberg") 6、spark.sql("INSERT INTO test.iceberg919 VALUES (1, 'a'), (2, 'b'), (3, 'c')") 7、spark.sql("select * from test.iceberg919").show

+---+----+                                                                      
| id|name|
+---+----+
|  1|   a|
|  2|   b|
|  3|   c|
+---+----+

8、

spark.sql("update test.iceberg919 set name='aa' where id=1")
spark.sql("update iceberg_catalog.test.iceberg919 set name='aa' where id=1")
spark.sql("update spark_catalog.test.iceberg919 set name='aa' where id=1")

all got error with : is not an Iceberg table

Affects Version(s)

master

Kyuubi Server Log Output

spark.sql("update test.iceberg919 set name='aa' where id=1")
org.apache.spark.sql.AnalysisException: Project [id#25L, name#26]
+- RowFilterAndDataMaskingMarker
   +- RelationV2[id#25L, name#26] spark_catalog.test.iceberg919
 is not an Iceberg table
  at org.apache.spark.sql.catalyst.analysis.RewriteUpdateTable$$anonfun$apply$1.applyOrElse(RewriteUpdateTable.scala:71)
  at org.apache.spark.sql.catalyst.analysis.RewriteUpdateTable$$anonfun$apply$1.applyOrElse(RewriteUpdateTable.scala:53)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:170)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:170)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning(AnalysisHelper.scala:99)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning$(AnalysisHelper.scala:96)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:76)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:75)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.analysis.RewriteUpdateTable$.apply(RewriteUpdateTable.scala:53)
  at org.apache.spark.sql.catalyst.analysis.RewriteUpdateTable$.apply(RewriteUpdateTable.scala:51)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
  at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
  at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
  at scala.collection.immutable.List.foldLeft(List.scala:91)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
  at scala.collection.immutable.List.foreach(List.scala:431)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:222)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:218)
  at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:167)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:218)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:182)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:203)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:202)
  at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:88)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
  at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:196)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:196)
  at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:88)
  at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:86)
  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:78)
  at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:98)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
  at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
  ... 47 elided

Kyuubi Engine Log Output

No response

Kyuubi Server Configurations

No response

Kyuubi Engine Configurations

No response

Additional context

No response

Are you willing to submit PR?

  • [ ] Yes. I can submit a PR independently to fix.
  • [ ] Yes. I would be willing to submit a PR with guidance from the Kyuubi community to fix.
  • [ ] No. I cannot submit a PR at this time.

lordk911 avatar Sep 19 '22 04:09 lordk911

@bowenliang123

lordk911 avatar Sep 19 '22 04:09 lordk911

Thanks for reporting. Let me have a check.

bowenliang123 avatar Sep 19 '22 05:09 bowenliang123

in our qa env, similar update explain like this:

EXPLAIN EXTENDED UPDATE spkui05_1_s3a_mig SET field_one = 'one' WHERE field_three = 'two';

== Parsed Logical Plan ==
'UpdateIcebergTable [assignment('field_one, one)], ('field_three = two)
+- 'UnresolvedRelation [spkui05_1_s3a_mig], [], false

== Analyzed Logical Plan ==
UpdateIcebergTable [assignment(banana_id#779, banana_id#779), assignment(field_one#780, one), assignment(field_two#781, field_two#781), assignment(field_three#782, field_three#782), assignment(date_created#783, date_created#783)], (field_three#782 = two)
:- RelationV2[banana_id#779, field_one#780, field_two#781, field_three#782, date_created#783] spark_catalog.spkui01.spkui05_1_s3a_mig
+- ReplaceData RelationV2[banana_id#779, field_one#780, field_two#781, field_three#782, date_created#783] spark_catalog.spkui01.spkui05_1_s3a_mig
   +- Project [if ((field_three#782 = two)) banana_id#779 else banana_id#779 AS banana_id#788, if ((field_three#782 = two)) one else field_one#780 AS field_one#789, if ((field_three#782 = two)) field_two#781 else field_two#781 AS field_two#790, if ((field_three#782 = two)) field_three#782 else field_three#7...

maybe RowFilterAndDataMaskingMarker replaces UpdateIcebergTable is the problem.

@lordk911 can you share your explain statement?

minyk avatar Sep 19 '22 08:09 minyk

@minyk explain the udpate sql will causes failure. And the exception comes from org.apache.spark.sql.catalyst.analysis.RewriteUpdateTable from Iceberg's extension. RewriteUpdateTable failed to recognize RowFilterAndDataMaskingMarker as instance of DataSourceV2Relation since in UpdateIcebergTable method the execution plan is rewritten.

bowenliang123 avatar Sep 19 '22 09:09 bowenliang123

case class UpdateIcebergTable(
    table: LogicalPlan,
    assignments: Seq[Assignment],
    condition: Option[Expression],
    rewritePlan: Option[LogicalPlan] = None) extends RowLevelCommand {

  lazy val aligned: Boolean = AssignmentUtils.aligned(table, assignments)

  override def children: Seq[LogicalPlan] = if (rewritePlan.isDefined) {
    table :: rewritePlan.get :: Nil
  } else {
    table :: Nil
  }

The spark's v2 command would not have the table filed as a child, so we will not apply our marker on it and it's fine. but the iceberg custom command adds it to the children function, so we apply our marker upon it and it fails.

yaooqinn avatar Sep 19 '22 09:09 yaooqinn

It should be fixed now on the master branch. Please have a check if you have some time. @lordk911

bowenliang123 avatar Oct 11 '22 02:10 bowenliang123