kyuubi icon indicating copy to clipboard operation
kyuubi copied to clipboard

[Bug] Insert into a jdbc table will cause exception when parse lineage in spark

Open Jack1007 opened this issue 1 year ago • 1 comments

Code of Conduct

Search before asking

  • [X] I have searched in the issues and found no similar issues.

Describe the bug

spark version:3.2.3 kyuubi version:1.8.0 spark lineage compile: mvn clean package -pl :kyuubi-spark-lineage_2.12 -am -DskipTests -Dspark.version=3.2.3

source table :

CREATE TABLE `test`.`ta` (
  `id` INT,
  `name` STRING)
USING text;

destination table:

CREATE TEMPORARY VIEW test1_view 
USING org.apache.spark.sql.jdbc 
OPTIONS 
(url 'jdbc:mysql://xxxx:3306/test', 
    user 'test', 
    password 'xxxxxx', 
    dbtable 'test.test1', 
    columns 'id,name');

sql:

insert into test1_view select id,name from test.ta;

In the driver log, spark lineage parser throw exception like this:

24/04/17 10:21:42 WARN SparkSQLLineageParseHelper: Extract Statement[483] columns lineage failed.
scala.MatchError: (id#20,{id#17}) (of class scala.Tuple2)
	at org.apache.kyuubi.plugin.lineage.helper.LineageParser.$anonfun$extractColumnsLineage$10(SparkSQLLineageParseHelper.scala:265)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.kyuubi.plugin.lineage.helper.LineageParser.extractColumnsLineage(SparkSQLLineageParseHelper.scala:265)
	at org.apache.kyuubi.plugin.lineage.helper.LineageParser.parse(SparkSQLLineageParseHelper.scala:54)
	at org.apache.kyuubi.plugin.lineage.helper.LineageParser.parse$(SparkSQLLineageParseHelper.scala:52)
	at org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParseHelper.parse(SparkSQLLineageParseHelper.scala:510)
	at org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParseHelper.$anonfun$transformToLineage$1(SparkSQLLineageParseHelper.scala:516)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParseHelper.transformToLineage(SparkSQLLineageParseHelper.scala:516)
	at org.apache.kyuubi.plugin.lineage.SparkOperationLineageQueryExecutionListener.onSuccess(SparkOperationLineageQueryExecutionListener.scala:34)
	at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:158)
	at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:128)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
	at org.apache.spark.sql.util.ExecutionListenerBus.postToAll(QueryExecutionListener.scala:128)
	at org.apache.spark.sql.util.ExecutionListenerBus.onOtherEvent(QueryExecutionListener.scala:140)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
	at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
	at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1433)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)

The relational code:

      case p if p.nodeName == "InsertIntoDataSourceCommand" =>
        val logicalRelation = getField[LogicalRelation](plan, "logicalRelation")
        val table = logicalRelation
          .catalogTable.map(t => getV1TableName(t.qualifiedName)).getOrElse("")
        extractColumnsLineage(getQuery(plan), parentColumnsLineage).map {
          case (k, v) if table.nonEmpty =>
            k.withName(s"$table.${k.name}") -> v
        }

The table val in the code must be None or Empty,so case match failed.But why the table val is empty?

Affects Version(s)

1.8.0

Kyuubi Server Log Output

No response

Kyuubi Engine Log Output

24/04/17 10:21:42 WARN SparkSQLLineageParseHelper: Extract Statement[483] columns lineage failed.
scala.MatchError: (id#20,{id#17}) (of class scala.Tuple2)
	at org.apache.kyuubi.plugin.lineage.helper.LineageParser.$anonfun$extractColumnsLineage$10(SparkSQLLineageParseHelper.scala:265)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.kyuubi.plugin.lineage.helper.LineageParser.extractColumnsLineage(SparkSQLLineageParseHelper.scala:265)
	at org.apache.kyuubi.plugin.lineage.helper.LineageParser.parse(SparkSQLLineageParseHelper.scala:54)
	at org.apache.kyuubi.plugin.lineage.helper.LineageParser.parse$(SparkSQLLineageParseHelper.scala:52)
	at org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParseHelper.parse(SparkSQLLineageParseHelper.scala:510)
	at org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParseHelper.$anonfun$transformToLineage$1(SparkSQLLineageParseHelper.scala:516)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParseHelper.transformToLineage(SparkSQLLineageParseHelper.scala:516)
	at org.apache.kyuubi.plugin.lineage.SparkOperationLineageQueryExecutionListener.onSuccess(SparkOperationLineageQueryExecutionListener.scala:34)
	at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:158)
	at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:128)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
	at org.apache.spark.sql.util.ExecutionListenerBus.postToAll(QueryExecutionListener.scala:128)
	at org.apache.spark.sql.util.ExecutionListenerBus.onOtherEvent(QueryExecutionListener.scala:140)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
	at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
	at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1433)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)

Kyuubi Server Configurations

No response

Kyuubi Engine Configurations

No response

Additional context

No response

Are you willing to submit PR?

  • [ ] Yes. I would be willing to submit a PR with guidance from the Kyuubi community to fix.
  • [ ] No. I cannot submit a PR at this time.

Jack1007 avatar Apr 17 '24 02:04 Jack1007

Hello @Jack1007, Thanks for finding the time to report the issue! We really appreciate the community's efforts to improve Apache Kyuubi.

github-actions[bot] avatar Apr 17 '24 02:04 github-actions[bot]