DataX
DataX copied to clipboard
并发时 Communication 统计有异常
在项目实践中同时触发两个job任务 datax 打印出日志任务 2021-07-14 19:56:22.531 INFO 14772 --- [job-26] com.alibaba.datax.core.job.JobContainer : 任务启动时刻 : 2021-07-14 19:56:12 任务结束时刻 : 2021-07-14 19:56:22 任务总计耗时 : 10s 任务平均流量 : 2.49KB/s 记录写入速度 : 17rec/s 读出记录总数 : 170 读写失败总数 : 0
: Total 170 records, 25495 bytes | Speed 2.49KB/s, 17 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00% 2021-07-14 19:56:22.763 INFO 14772 --- [job-852] com.alibaba.datax.core.job.JobContainer : 任务启动时刻 : 2021-07-14 19:56:12 任务结束时刻 : 2021-07-14 19:56:22 任务总计耗时 : 10s 任务平均流量 : 2.49KB/s 记录写入速度 : 17rec/s 读出记录总数 : 170 读写失败总数 : 0 其中第一个任务真实交换量是170条数据 第二个任务的交换量不是170条数据。
虽然我也认为统计有点小问题,但是也许你不应该这么用。 如果你想提高读写性能,不是开多个Datax同时跑,而是设置channel多线程跑同一个任务。
我的实际情况是同一时刻开了2个datax同时跑,两个datax的读写表都不一样,知识最后的统计数据有异常
可以关注下:#909 #464,不知道这个能否解决你的问题。
请问一下你们是怎么跑的? 我多任务的情况下,没有用JobContainer,我用的TaskGroupContainer。你们是同事start 两个jobContainer么?
我也碰到这个问题,不知道会不会和运行模式有关,比如standalone,local这些。。
请问一下你们是怎么跑的? 我多任务的情况下,没有用JobContainer,我用的TaskGroupContainer。你们是同事start 两个jobContainer么?
对,我是在多个线程中调用JobContainer。如果是调用TaskGroupContainer会出现吗?
请问一下你们是怎么跑的? 我多任务的情况下,没有用JobContainer,我用的TaskGroupContainer。你们是同事start 两个jobContainer么?
对,我是在多个线程中调用JobContainer。如果是调用TaskGroupContainer会出现吗?
我跑TaskGroupContainer 根本就没有统计信息 还没有实现,可能需要自己去实现
请问一下你们是怎么跑的? 我多任务的情况下,没有用JobContainer,我用的TaskGroupContainer。你们是同事start 两个jobContainer么?
对,我是在多个线程中调用JobContainer。如果是调用TaskGroupContainer会出现吗?
我跑TaskGroupContainer 根本就没有统计信息 还没有实现,可能需要自己去实现
在这个方法中com.alibaba.datax.core.container.util.JobAssignUtil#doAssign taskGroupId被简单重置,出现重复。。
还有,用collect获取Communication,会把所有map里所有的都累加。。。
郁闷,不知道大咖们为什么这么写。。。
我已经改完了。忘记关闭这个issue了。和您说的方案差不多,自己改了一点源码
---原始邮件--- 发件人: @.> 发送时间: 2021年12月2日(周四) 中午11:39 收件人: @.>; 抄送: @.@.>; 主题: Re: [alibaba/DataX] 并发时 Communication 统计有异常 (#1065)
问题出在LocalTGCommunicationManager这个类中,该类使用一个map汇集统计信息,但他的key是taskGroupId,即:用task作为区分而不是用jobid,所以开源的是个单机版。你同时跑2个job任务,则需要用jobid作为区分进行汇集,不然会相互影响,应为两个任务的taskgroupid会重复。在LocalTGCommunicationManager类你加个map,以jobid为key就行,仿照原taskGroupId的代码,改吧改吧就行。改的类大多集中在etl.core.statistics包下。
— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub, or unsubscribe. Triage notifications on the go with GitHub Mobile for iOS or Android.
我已经改完了。忘记关闭这个issue了。和您说的方案差不多,自己改了一点源码 … ---原始邮件--- 发件人: @.> 发送时间: 2021年12月2日(周四) 中午11:39 收件人: @.>; 抄送: @.@.>; 主题: Re: [alibaba/DataX] 并发时 Communication 统计有异常 (#1065) 问题出在LocalTGCommunicationManager这个类中,该类使用一个map汇集统计信息,但他的key是taskGroupId,即:用task作为区分而不是用jobid,所以开源的是个单机版。你同时跑2个job任务,则需要用jobid作为区分进行汇集,不然会相互影响,应为两个任务的taskgroupid会重复。在LocalTGCommunicationManager类你加个map,以jobid为key就行,仿照原taskGroupId的代码,改吧改吧就行。改的类大多集中在etl.core.statistics包下。 — You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub, or unsubscribe. Triage notifications on the go with GitHub Mobile for iOS or Android.
可以说一下怎么改的吗?
能说下怎么改的么
你可以看看我仓库低下的MyDataX项目 看看,具体哪个版本我都忘了
------------------ 原始邮件 ------------------ 发件人: "alibaba/DataX" @.>; 发送时间: 2022年7月2日(星期六) 下午4:30 @.>; @.@.>; 主题: Re: [alibaba/DataX] 并发时 Communication 统计有异常 (#1065)
我已经改完了。忘记关闭这个issue了。和您说的方案差不多,自己改了一点源码 … ---原始邮件--- 发件人: @.> 发送时间: 2021年12月2日(周四) 中午11:39 收件人: @.>; 抄送: @.@.>; 主题: Re: [alibaba/DataX] 并发时 Communication 统计有异常 (#1065) 问题出在LocalTGCommunicationManager这个类中,该类使用一个map汇集统计信息,但他的key是taskGroupId,即:用task作为区分而不是用jobid,所以开源的是个单机版。你同时跑2个job任务,则需要用jobid作为区分进行汇集,不然会相互影响,应为两个任务的taskgroupid会重复。在LocalTGCommunicationManager类你加个map,以jobid为key就行,仿照原taskGroupId的代码,改吧改吧就行。改的类大多集中在etl.core.statistics包下。 — You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub, or unsubscribe. Triage notifications on the go with GitHub Mobile for iOS or Android.
可以说一下怎么改的吗?
— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you commented.Message ID: @.***>
@joson1205 没有修改统计相关的问题
放一下我的思路: 修改的是core这个模块 第一个修改JobAssignUtil类的doAssign方法:增加一个taskGourpId参数。原来的id是循环增加的。
for (int i = 0; i < taskGroupNumber; i++) {
tempTaskGroupConfig = taskGroupTemplate.clone();
tempTaskGroupConfig.set(CoreConstant.DATAX_JOB_CONTENT, taskGroupConfigList.get(i));
//原来的
tempTaskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID, i);
//修改的
tempTaskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID, groupTaskId);
result.add(tempTaskGroupConfig);
}
然后修改JobContainer类的logStatistic方法:
//原, 这个方法可以看出是合并所有的统计信息的,这是造成数据统计异常的点
Communication communication = super.getContainerCommunicator().collect();
//修改
Integer taskGroupId = this.configuration.getInt(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID);
Communication communication = super.getContainerCommunicator().getCommunication(taskGroupId);
Engine启动类增加启动参数taskgroupId
options.addOption("taskGroupId", true, "taskGroup id ");
String taskGroupIdString = cl.getOptionValue("taskGroupId");
configuration.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID, taskGroupIdString);
大致修改的部分就这些