swoft
swoft copied to clipboard
chunk 分块获取数据中增加子协程部分数据丢失
StatsCommand 代码块
$queue = $input->getInt('queue');
$startId = $input->getInt('start_id');
$endId = $input->getInt('end_id');
$this->StartTime = time();
$count = $queue??40;
unset($args);
unset($input);
echo "开始执行,并发数量: ".$count."\n";
if ($startId && $endId) {
$where = [
'total_count'=>0,
'pass_push_count'=>0,
['id', '>=', $startId],
['id', '<=', $endId]
];
} else {
$where = [
'total_count'=>0,
'pass_push_count'=>0
];
}
$channel = new Channel(1000);
Stats::where($where)->orderBy('id','asc')->chunk($count, function ($words) use ($channel){
echo count($words)." while\n";
foreach ($words as $key=>$wordObj) {
sgo(function () use ($channel, $wordObj) {
$word = ArrayHelper::toArray($wordObj);
if ($channel->isFull()) {
\co::sleep(1);
}
$result = $channel->push([
'id' => $word['id'],
'total' => 1
]);
if ($result == false) {
echo $channel->errCode."\n";
}
});
}
$count = count($words);
$i = 1;
while(true) {
if($i>=$count) {
break;
} else {
$data = $channel->pop();
if ($data) {
echo "ID:".$data['id']."start \n";
$i++;
}
if ($data['total']>0) {
$co = sgo(function () use ($data) {
$flag = Stats::find($data['id'])->update([
'total_count' => $data['total'],
]);
if ($flag == false) {
echo "ID:".$data['id']." total:".($data['total'])." update失败! \n";
} else {
echo "ID:".$data['id']." total:".($data['total'])."\n";
}
});
if ($co ==false) {
echo "ID:".$data['id']." 协程创建失败 \n";
}
}
}
}
echo "使用内存: ".$this->getFilesize(memory_get_usage())."\n";
});
命令行执行
php bin/swoft stats:run queue=50 start_id=1 end_id=500
问题描述
如果执行这样的代码发现会少update近200条数据,也没有报错
$flag = Stats::find($data['id'])->update([
'total_count' => $data['total'],
]);
如果把上面的这段注释掉就日志就会echo出500条记录
我不清楚是因为协程的问题还是生成的实体连接池问题
版本
swoft2.0 swoole 4.4.7
打开S QL事件监听看看 有没有执行 sql
打开S QL事件监听看看 有没有执行 sql 是swoft里面的事件吗
打开S QL事件监听看看 有没有执行 sql
class ModelSavedListener implements EventHandlerInterface
{
/**
* @param EventInterface $event
*/
public function handle(EventInterface $event): void
{
/** @var Model $modelStatic */
$modelStatic = $event->getTarget();
if ($modelStatic instanceof KeywordStats) {
$data = $modelStatic->getModelOriginal();
$modify = $modelStatic->getModelChanges();
echo $data['id']."修改内容为".json_encode($modify)."\n";
}
}
}
我添加了事件监听发现没有检测到丢失数据的日志,我在下面这段代码中打上了断点测试
if ($data['total']>0) {
$co = sgo(function () use ($data) {
--------------start---------------
$flag = Stats::find($data['id'])->update([
'total_count' => $data['total'],
]);
----------------end--------------
if ($flag == false) {
echo "ID:".$data['id']." total:".($data['total'])." update失败! \n";
} else {
echo "ID:".$data['id']." total:".($data['total'])."\n";
}
});
if ($co ==false) {
echo "ID:".$data['id']." 协程创建失败 \n";
}
}
发现 chunk($count, function (\Swoft\Stdlib\Collection $words)
的count输入多少这个数据就执行到哪,比如我输入的是50(每次返回50条),id范围在1-55,id 51就已经断掉了但是也不报错。如果我把这个model的代码注释掉,51-55的id就能打印出来
打开S QL事件监听看看 有没有执行 sql
您好 我还要等几天
🥺 这个问题不好查询 不是连接问题吧 这种可以考虑使用 生产消费模型 去处理不要在 chunk 里面去直接消费