flink
flink copied to clipboard
Field-level lineage inference
This pr introduces an explainLineage function to get field-level lineage relationship about Flink SQL task. For example,
create table src (a String,b String, c String) with ( 'connector' = 'COLLECTION');
create table des (a String,b String, c String) with ( 'connector' = 'COLLECTION');
insert into des select * from src;
The above SQL statements can infer field lineage like
{
"tables": [
{
"name": "`default_catalog`.`default_database`.`src`",
"columns": [
{
"name": "a",
"type": "STRING"
},
{
"name": "b",
"type": "STRING"
},
{
"name": "c",
"type": "STRING"
}
],
"options": {
"connector": "COLLECTION"
}
},
{
"name": "`default_catalog`.`default_database`.`des`",
"columns": [
{
"name": "a",
"type": "STRING"
},
{
"name": "b",
"type": "STRING"
},
{
"name": "c",
"type": "STRING"
}
],
"options": {
"connector": "COLLECTION"
}
}
],
"relations": [
{
"srcTable": "`default_catalog`.`default_database`.`src`",
"tgtTable": "`default_catalog`.`default_database`.`des`",
"srcTableColName": "a",
"tgtTableColName": "a"
},
{
"srcTable": "`default_catalog`.`default_database`.`src`",
"tgtTable": "`default_catalog`.`default_database`.`des`",
"srcTableColName": "b",
"tgtTableColName": "b"
},
{
"srcTable": "`default_catalog`.`default_database`.`src`",
"tgtTable": "`default_catalog`.`default_database`.`des`",
"srcTableColName": "c",
"tgtTableColName": "c"
}
]
}
CI report:
- ec7e21b3fd37e0eb69852b2c0354edcd14116499 UNKNOWN
Bot commands
The @flinkbot bot supports the following commands:@flinkbot run azurere-run the last Azure build