[KYUUBI #2282] Add lineage records for sql statement execution in Kyuubi engine logs
Why are the changes needed?
set kyuubi.operation.lineage.enabled = true to enable lineage information logging. SQL lineage information will be written to SparkOperationEvent and output to the engine log. kyuubi.operation.lineage.enabled defaults to false.
Columns lineage definition
col0 -> (table.a, table.b)
col1 -> (table.c, table.a)
SQL lineage logger JSON format example. SQL:
select a as col0, b as col1 from test_table0
Lineage:
{
"inputTables":[
"default.test_table0"
],
"outputTables":[
],
"columnLineage":[
[
"col0",
[
"default.test_table0.a"
]
],
[
"col1",
[
"default.test_table0.b"
]
]
]
}
Currently supported column lineage for spark Command and Query TreeNode:
Query
Select
Command
CreateDataSourceTableAsSelectCommandCreateHiveTableAsSelectCommandOptimizedCreateHiveTableAsSelectCommandCreateTableAsSelectReplaceTableAsSelectInsertIntoDataSourceCommandInsertIntoHadoopFsRelationCommandInsertIntoDataSourceDirCommandInsertIntoHiveDirCommandInsertIntoHiveTable
How was this patch tested?
-
[x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
-
[ ] Add screenshots for manual tests if appropriate
-
[x] Run test locally before make a pull request
Codecov Report
Merging #3185 (e2eed8e) into master (c02ff48) will increase coverage by
0.06%. The diff coverage is57.44%.
@@ Coverage Diff @@
## master #3185 +/- ##
============================================
+ Coverage 51.44% 51.50% +0.06%
Complexity 13 13
============================================
Files 470 474 +4
Lines 26244 26432 +188
Branches 3629 3688 +59
============================================
+ Hits 13500 13615 +115
- Misses 11464 11490 +26
- Partials 1280 1327 +47
| Impacted Files | Coverage Δ | |
|---|---|---|
| ...bi/plugin/lineage/helper/SparkListenerHelper.scala | 33.33% <33.33%> (ø) |
|
| ...in/lineage/helper/SparkSQLLineageParseHelper.scala | 57.61% <57.61%> (ø) |
|
| .../plugin/lineage/events/OperationLineageEvent.scala | 63.63% <63.63%> (ø) |
|
| .../SparkOperationLineageQueryExecutionListener.scala | 66.66% <66.66%> (ø) |
|
| ...n/scala/org/apache/kyuubi/engine/ProcBuilder.scala | 82.60% <0.00%> (-0.63%) |
:arrow_down: |
| ...in/scala/org/apache/kyuubi/config/KyuubiConf.scala | 96.97% <0.00%> (-0.09%) |
:arrow_down: |
| ...mon/src/main/scala/org/apache/kyuubi/Logging.scala | 53.94% <0.00%> (+9.21%) |
:arrow_up: |
| ...a/org/apache/spark/kyuubi/SparkContextHelper.scala | 64.70% <0.00%> (+11.76%) |
:arrow_up: |
:mega: We’re building smart automated test selection to slash your CI/CD build times. Learn more
@ulysses-you I refactored some code,could you help me to review it? Thanks
I suggest this feature goes to extensions.
also, this feature needs a complete usage doc
yeah, +1 move to extension
Will keep following this feature. I am looking for an Atlas hook as lineage plugin for Spark sql or Kyuubi .
I suggest this feature goes to extensions.
The original design was implemented in the extension layer, extending QueryExecutionListener, so we can take this approach, right?
default.test_table0.a
yes, we can create a new module like the kyuubi-spark-authz module
and maybe the lineage info shall have its own output logic
Two approaches are currently implemented:
- Spark SQL Engine ,
set kyuubi.operation.lineage.enabled = trueto enable lineage information logging. The lineage information will be added toSparkOperationEventand output to operation event json logger together.
{
"Event": "org.apache.kyuubi.engine.spark.events.SparkOperationEvent",
"statementId": "ea7b2d7a-1301-4f82-aa79-797ccab43d4a",
"statement": "select a as col0, b as col1 from test_table0;",
"shouldRunAsync": false,
"state": "FINISHED_STATE",
"eventTime": 1660650146416,
"createTime": 1660650145261,
"startTime": 1660650145311,
"completeTime": 1660650146415,
"exception": null,
"sessionId": "e8f06b0f-5c5c-40ef-90de-c77fb4432d93",
"sessionUser": "work",
"executionId": 3,
"lineage": {
"inputTables": ["default.test_table0"],
"outputTables": [],
"columnLineage": [
["col0", ["default.test_table0.a"]],
["col1", ["default.test_table0.b"]]
]
},
"eventType": "spark_operation"
- Spark Listener Extension Plugging, which is not strongly bound to Kyuubi Engine, can be tried to raw Spark. It has its own independent lineage information output, also output in json logger.
{
"executionId": 2,
"eventTime": 1660650415849,
"lineage": {
"inputTables": ["default.test_table0"],
"outputTables": [],
"columnLineage": [
["col0", ["default.test_table0.a"]],
["col1", ["default.test_table0.b"]]
]
},
"exception": null,
"eventType": "operation_lineage"
}
In way 1, the lineage information can be combined with sql event information to easy to collection and statistics.
In way 2, sql operation event logger and lineage event logger are independent of each other and can only be associated by executionId. After collection, we need to do the correlation process.
Do we need to keep all these two ways? @yaooqinn @ulysses-you
It seems Kyuubi can provide a way for Spark extensions to inject event to Kyuubi event, so we can always unify the Spark event into Kyuubi event. Maybe something like user can get KyuubiSparkSession by KyuubiSparkSession.get in Spark extension. What do you think ? @iodone @yaooqinn @pan3793 @turboFei
I do not get your point @ulysses-you
Let's clean the way 1 first, I think 2 is better and can still be improved
I mean we can reuse the functionality of Kyuubi engine event at extension, for example: ENGINE_EVENT_LOGGERS. So we do not need additional config and code to do the similar thing. To approach this, we can provide a global static instance like KyuubiSparkSession which provides some methods about injecting event, so that at Spark extension can use it.
Let's clean the way 1 first, I think 2 is better and can still be improved
In some scenarios the user submits SQL only to get the lineage information and does not execute it. Just like PlanOnlyMode, we can output only the lineage information. In this case we have to implement the lineage extraction capability in Kyuubi Spark Engine. This implement can reuse the way 1.
Let's clean the way 1 first, I think 2 is better and can still be improved
In some scenarios the user submits SQL only to get the lineage information and does not execute it. Just like
PlanOnlyMode, we can output only the lineage information. In this case we have to implement the lineage extraction capability in Kyuubi Spark Engine. This implement can reuse the way 1
way 2 can be reused too and more clear
@ulysses-you can't the current event framework achieve such a goal? with custom event and costume event handler?
@ulysses-you can't the current event framework achieve such a goal? with custom event and costume event handler?
class SparkHistoryLoggingEventHandler(sc: SparkContext) extends EventHandler[KyuubiEvent] {
override def apply(event: KyuubiEvent): Unit = {
SparkContextHelper.postEventToSparkListenerBus(event, sc)
}
}
Here's the corresponding implementation that writes a custom SparkOperationKyuubiEvent to the SparkListenerBus, which I think can catch the event by extends spark extra listener.
@iodone the style check job failed can be ignored since we do not install the new added module before style check
@iodone can you create a issue that support track lineage from subquery and v2write ? we can improve that in future
@iodone can you create a issue that support track lineage from subquery and v2write ? we can improve that in future
OK
thanks @iodone for the great work, merged to master