porter icon indicating copy to clipboard operation
porter copied to clipboard

如何开发自定义数据处理插件

Open zhangkewei opened this issue 6 years ago • 0 comments

假设我们要将mysql表T_USER同步到目标端Oracle T_USER_2,源端表T_USER表结构与目标端表T_USER_2一致。我们的需求是只保留FLAG字段等于0的用户数据。

需求有了,接下来我们就要实现EventProcessor接口做自定义数据过滤

	package cn.vbill.middleware.porter.plugin;
	public class UserFilter implements cn.vbill.middleware.porter.core.event.s.EventProcessor {
    @Override
    public void process(ETLBucket etlBucket) {
        List<ETLRow> rows = etlBucket.getRows().stream().filter(r -> {
            //第一步 找到表名为T_USER的记录
            boolean tableMatch = r.getFinalTable().equalsIgnoreCase("T_USER");
            if (!tableMatch) return tableMatch;
            //第二步 找到字段FLAG的值不等于0的记录
            boolean columnMatch = r.getColumns().stream().filter(c -> c.getFinalName().equalsIgnoreCase("FLAG")
            && (null == c.getFinalValue() || !c.getFinalValue().equals("0"))).count() > 0;
            return tableMatch && columnMatch;
        }).collect(Collectors.toList());
        //第三步 清除不符合条件的集合
        etlBucket.getRows().removeAll(rows);
    }
}

在任务中指定自定义数据处理插件:

以下配置文件格式适用配置管理后台"同步管理->高级任务配置(原菜单名:本地任务)->新增" 如果是本地任务配置文件需要增加前缀"porter.task[任务下标,从0开始]"

taskId=任务ID
nodeId=节点1,节点2,节点3
consumer.consumerName=CanalFetch
consumer.converter=canalRow
consumer.source.sourceType=CANAL
consumer.source.slaveId=0
consumer.source.address=127.0.0.1:3306
consumer.source.database=数据库
consumer.source.username=账号
consumer.source.password=密码
consumer.source.filter=*.\.t_user
consumer.eventProcessor.className=cn.vbill.middleware.porter.plugin.UserFilter
consumer.eventProcessor.content=/path/UserFilter.class(xxx.jar包,xxx.java类)

loader.loaderName=JdbcBatch #目标端插件
loader.source.sourceType=JDBC
loader.source.dbType=ORACLE
loader.source.url=jdbc:oracle:thin:@//127.0.0.1:1521/oracledb
loader.source.userName=demo
loader.source.password=demo

mapper[0].auto=false
mapper[0].table=T_USER,T_USER_2

zhangkewei avatar Jan 21 '19 05:01 zhangkewei