kyuubi
kyuubi copied to clipboard
[Bug] Insert into a jdbc table will cause exception when parse lineage in spark
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Search before asking
- [X] I have searched in the issues and found no similar issues.
Describe the bug
spark version:3.2.3 kyuubi version:1.8.0 spark lineage compile: mvn clean package -pl :kyuubi-spark-lineage_2.12 -am -DskipTests -Dspark.version=3.2.3
source table :
CREATE TABLE `test`.`ta` (
`id` INT,
`name` STRING)
USING text;
destination table:
CREATE TEMPORARY VIEW test1_view
USING org.apache.spark.sql.jdbc
OPTIONS
(url 'jdbc:mysql://xxxx:3306/test',
user 'test',
password 'xxxxxx',
dbtable 'test.test1',
columns 'id,name');
sql:
insert into test1_view select id,name from test.ta;
In the driver log, spark lineage parser throw exception like this:
24/04/17 10:21:42 WARN SparkSQLLineageParseHelper: Extract Statement[483] columns lineage failed.
scala.MatchError: (id#20,{id#17}) (of class scala.Tuple2)
at org.apache.kyuubi.plugin.lineage.helper.LineageParser.$anonfun$extractColumnsLineage$10(SparkSQLLineageParseHelper.scala:265)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.kyuubi.plugin.lineage.helper.LineageParser.extractColumnsLineage(SparkSQLLineageParseHelper.scala:265)
at org.apache.kyuubi.plugin.lineage.helper.LineageParser.parse(SparkSQLLineageParseHelper.scala:54)
at org.apache.kyuubi.plugin.lineage.helper.LineageParser.parse$(SparkSQLLineageParseHelper.scala:52)
at org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParseHelper.parse(SparkSQLLineageParseHelper.scala:510)
at org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParseHelper.$anonfun$transformToLineage$1(SparkSQLLineageParseHelper.scala:516)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParseHelper.transformToLineage(SparkSQLLineageParseHelper.scala:516)
at org.apache.kyuubi.plugin.lineage.SparkOperationLineageQueryExecutionListener.onSuccess(SparkOperationLineageQueryExecutionListener.scala:34)
at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:158)
at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:128)
at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
at org.apache.spark.sql.util.ExecutionListenerBus.postToAll(QueryExecutionListener.scala:128)
at org.apache.spark.sql.util.ExecutionListenerBus.onOtherEvent(QueryExecutionListener.scala:140)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1433)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
The relational code:
case p if p.nodeName == "InsertIntoDataSourceCommand" =>
val logicalRelation = getField[LogicalRelation](plan, "logicalRelation")
val table = logicalRelation
.catalogTable.map(t => getV1TableName(t.qualifiedName)).getOrElse("")
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map {
case (k, v) if table.nonEmpty =>
k.withName(s"$table.${k.name}") -> v
}
The table val in the code must be None or Empty,so case match failed.But why the table val is empty?
Affects Version(s)
1.8.0
Kyuubi Server Log Output
No response
Kyuubi Engine Log Output
24/04/17 10:21:42 WARN SparkSQLLineageParseHelper: Extract Statement[483] columns lineage failed.
scala.MatchError: (id#20,{id#17}) (of class scala.Tuple2)
at org.apache.kyuubi.plugin.lineage.helper.LineageParser.$anonfun$extractColumnsLineage$10(SparkSQLLineageParseHelper.scala:265)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.kyuubi.plugin.lineage.helper.LineageParser.extractColumnsLineage(SparkSQLLineageParseHelper.scala:265)
at org.apache.kyuubi.plugin.lineage.helper.LineageParser.parse(SparkSQLLineageParseHelper.scala:54)
at org.apache.kyuubi.plugin.lineage.helper.LineageParser.parse$(SparkSQLLineageParseHelper.scala:52)
at org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParseHelper.parse(SparkSQLLineageParseHelper.scala:510)
at org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParseHelper.$anonfun$transformToLineage$1(SparkSQLLineageParseHelper.scala:516)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParseHelper.transformToLineage(SparkSQLLineageParseHelper.scala:516)
at org.apache.kyuubi.plugin.lineage.SparkOperationLineageQueryExecutionListener.onSuccess(SparkOperationLineageQueryExecutionListener.scala:34)
at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:158)
at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:128)
at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
at org.apache.spark.sql.util.ExecutionListenerBus.postToAll(QueryExecutionListener.scala:128)
at org.apache.spark.sql.util.ExecutionListenerBus.onOtherEvent(QueryExecutionListener.scala:140)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1433)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
Kyuubi Server Configurations
No response
Kyuubi Engine Configurations
No response
Additional context
No response
Are you willing to submit PR?
- [ ] Yes. I would be willing to submit a PR with guidance from the Kyuubi community to fix.
- [ ] No. I cannot submit a PR at this time.
Hello @Jack1007, Thanks for finding the time to report the issue! We really appreciate the community's efforts to improve Apache Kyuubi.