kun-scheduler
kun-scheduler copied to clipboard
[Refactor] Migrate lineage feature from workflow module to metadata
背景
因为历史原因,当时我们将 Lineage 功能集成在了 Workflow 模块中。现在看来是错误的。
首先是从定义角度上,Lineage 是元数据的一部分,本来就应该交给 Metadata 模块管理。
第二是从功能和模块设计上,Workflow 作为最基本的底层模块,应该尽可能保证功能独立且最小化。引入血缘功能后,使得 Workflow 需要额外依赖 Neo4j 等图数据库来支持其运行,增大了部署成本和使用难度。
迁移计划
我们简单分析了一下 Lineage 在 Workflow 中的构成。目前 kun-workflow 模块中直接提供或构成 Lineage 相关功能的类有:
com.miotech.kun.workflow.common.lineage.service.LineageService com.miotech.kun.workflow.common.lineage.node.DatasetNode com.miotech.kun.workflow.common.lineage.node.TaskNode com.miotech.kun.workflow.web.controller.LineageController
引用到这些类的地方有:
com.miotech.kun.workflow.common.task.service.TaskService(该类的 updateLineageGraphOnTaskCreate 和 updateLineageGraphOnTaskDelete 方法)
com.miotech.kun.workflow.executor.local.LocalExecutor (该类的 processReport 方法)
除此之外,需要更改的 Workflow API 部分包括:
// In interface: WorkflowClient
/**
* Get neighbouring upstream / downstream lineage dataset nodes of specific dataset
* @param datasetGid global id of dataset
* @param direction upstream, downstream or both
* @param depth query depth
* @return dataset lineage info
*/
DatasetLineageInfo getLineageNeighbors(Long datasetGid, LineageQueryDirection direction, int depth);
/**
* Get lineage edge info
* @param upstreamDatasetGid global id of upstream dataset
* @param downstreamDatasetGid global id of downstream dataset
* @return edge info object
*/
EdgeInfo getLineageEdgeInfo(Long upstreamDatasetGid, Long downstreamDatasetGid);
// In class: DefaultWorkflowClient
@Override
public EdgeInfo getLineageEdgeInfo(Long upstreamDatasetGid, Long downstreamDatasetGid) {
return wfApi.getLineageEdgeInfo(upstreamDatasetGid, downstreamDatasetGid);
}
@Override
public DatasetLineageInfo getLineageNeighbors(Long datasetGid, LineageQueryDirection direction, int depth) {
return wfApi.getLineageNeighbors(datasetGid, direction, depth);
}
data-discovery 的 Lineage Controller 中,下列路由 需要访问 metadata 的 REST API 而非 workflow 的 REST API.
@GetMapping("/lineage/tasks")
@GetMapping("/lineage/graph")
综上,目前迁移成本比较可控。
具体迁移方案
com.miotech.kun.workflow.common.lineage 中的所有包 --- 迁移到 --> com.miotech.kun.metadata.common.lineage 下面
com.miotech.kun.workflow.web.controller.LineageController 这个类删掉,换到 com.miotech.kun.metadata.web.controller 下面
新增的模块:
kun-metadata:kun-metadata-client
目录结构类似 kun-workflow-client
需要对外提供一个接口:
com.miotech.kun.metadata.client.MetadataClient 来替代 WorkflowClient 中的相应接口方法(WorkflowClient 中的方法需删除)。
package com.miotech.kun.metadata.client.MetadataClient;
public interface MetadataClient {
/**
* Get neighbouring upstream / downstream lineage dataset nodes of specific dataset
* @param datasetGid global id of dataset
* @param direction upstream, downstream or both
* @param depth query depth
* @return dataset lineage info
*/
DatasetLineageInfo getLineageNeighbors(Long datasetGid, LineageQueryDirection direction, int depth);
/**
* Get lineage edge info
* @param upstreamDatasetGid global id of upstream dataset
* @param downstreamDatasetGid global id of downstream dataset
* @return edge info object
*/
EdgeInfo getLineageEdgeInfo(Long upstreamDatasetGid, Long downstreamDatasetGid);
}
对应地,需要更换 getLineageNeighbors 和 getLineageEdgeInfo 被使用到的类的代码:
com.miotech.kun.workflow.web.controller.LineageController
com.miotech.kun.workflow.client.DefaultWorkflowClient