easyswoole
easyswoole copied to clipboard
easyswoole 使用php-amqplib/php-amqplib 进行自定义消费会出现mysql连接断开
软件版本: easyswoole: 3.5.1 php-amqplib/php-amqplib:3.1.2 swoole: 4.4.23
问题出现时机: 1、项目运行1天样子基本就会出现 SQLSTATE[HY000] [2006] MySQL server has gone away ,一旦自定义进程出现这个报错,必须重启进程才可以解决, 2、在项目消费者里面已经尝试加了 \Co::sleep(0.01);,并未解决mysql 链接断开的问题 一旦自定义进程报 MySQL server has gone away,后面的消费 依然是继续报: Connection reset by peer or Transport endpoint is not connected
群里网名“ 那就这样吧” 的开发者也遇到同样的问题
目前的解决办法: 1、捕获异常,重新入列,然后重启进程
希望官方解决一下这个问题,感谢
自定义进程代码:
<?php
namespace App\Process\RabbitMqConsumer;
use App\HttpController\ShopApi\Service\ClientService;
use App\Models\VipPayOrderModel;
use App\Service\LogService;
use App\Service\MqFanoutService;
use EasySwoole\Component\Process\AbstractProcess;
use EasySwoole\EasySwoole\Logger;
use EasySwoole\RabbitMq\MqJob;
use EasySwoole\RabbitMq\MqQueue;
use EasySwoole\RabbitMq\RabbitMqQueueDriver;
/**
* mq广播消费进程
* Class ClientRegisterFinishProcess
* @package App\Process
*/
class RabbitMqConsumerProcess extends AbstractProcess
{
protected function run($arg)
{
$queueName = 'WORK_QUEUE';
$config = \EasySwoole\EasySwoole\Config::getInstance()->getConf("rabbitmq");
$connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['password']);
$channel = $connection->channel();
$channel->queue_declare($queueName, false, true, false, false);
$callback = function ($msg) use ($channel){
$headersObject = $msg->get_properties()['application_headers'];
$headersArray = $headersObject->getNativeData();
$body = json_decode($msg->body, true);
$tag = $body['desc'];
$msgId = $body['msgId'];
$listenerKey = $body['listenerKey'];
try {
if ($listenerKey){
\Co::sleep(0.01);
\EasySwoole\EasySwoole\Task\TaskManager::getInstance()->sync(function () use ($body){
WorkQueueService::handler($body);
});
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
}catch (\Throwable $e){
LogService::updateRabbitMqLog($msgId,'RabbitMqQueueConsumerProcess 消费异常:'.$e->getMessage(),2);
if($headersArray['retry'] > $headersArray['maxRetry']){
Logger::getInstance()->error("{$tag}-达到最大重试次数!,停止重试,参数={$msg->body},errMsg=".$e->getMessage());
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
//todo 这里需要写db
return;
}
Logger::getInstance()->error("{$tag}-工作队列处理异常,开始重试!,参数={$msg->body},errMsg=".$e->getMessage());
$headersArray['retry']++;
Logger::getInstance()->waring("{$tag}-消息ID:".$body['msgId'].' 第'.$headersArray['retry'].'次失败,消息重新入队');
$exchange = $msg->getExchange();
$routingKey = $msg->getRoutingKey();
$body = $msg->getBody();
\Co::sleep(5);
$msg->delivery_info['channel']->basic_publish(
new AMQPMessage($body,['application_headers'=>new AMQPTable($headersArray)]),
$exchange,
$routingKey
);
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
}
protected function onException(\Throwable $throwable, ...$args)
{
Logger::getInstance()->error("广播队列异常:".$throwable->getMessage());
Logger::getInstance()->error("广播队列异常trace:".$throwable->getTraceAsString());
parent::onException($throwable, $args);
}
protected function onShutDown()
{
Logger::getInstance()->waring("RabbitMqConsumerProcess 进程退出了");
parent::onShutDown(); // TODO: Change the autogenerated stub
}
}
是因为rabbitmq 中的订阅消息模式是采用的while true死循环,导致mysql链接池无法回收process中的链接,如果队列中在一定时间内不进行sql操作,导致链接空闲时间达到数据库空闲链接时间上限,mysql会主动断开此链接,由于ORM未进行重连机制,所以会一段时间没有操作后出现此报错 SQLSTATE[HY000] [2006] MySQL server has gone away,需要更换mysql 操作工具,可使用此包,composer require simple-swoole/db(包内部采用了短线重连机制,并实现了swoole自带链接池),使用process的时候尽量不进行调用外部类方法,redis尽量也使用独立的链接(可采用REDIS扩展)
整体集成方案可参考此包 https://gitee.com/orange-studio/simple-handle
是因为rabbitmq 中的订阅消息模式是采用的while true死循环,导致mysql链接池无法回收process中的链接,如果队列中在一定时间内不进行sql操作,导致链接空闲时间达到数据库空闲链接时间上限,mysql会主动断开此链接,由于ORM未进行重连机制,所以会一段时间没有操作后出现此报错 SQLSTATE[HY000] [2006] MySQL server has gone away,需要更换mysql 操作工具,可使用此包,composer require simple-swoole/db(包内部采用了短线重连机制,并实现了swoole自带链接池),使用process的时候尽量不进行调用外部类方法,redis尽量也使用独立的链接(可采用REDIS扩展)
整体集成方案可参考此包 https://gitee.com/orange-studio/simple-handle
谢谢,我目前使用的解决方案是 自己监听异常,当监听到 SQLSTATE 后就直接close rabbitmq 消费者链接,然后再kill 到当前进程,easyswoole 会重启进程,然后继续完成消费,如果现在去改的话 有太多代码要改
队列注意ACK机制,防止消息丢失,其他问题不大
队列注意ACK机制,防止消息丢失,其他问题不大
我是采用手动ack,就是必须执行完成后我才提交ack,这样进程下次启动,消息依然会被消费,直到正常执行完成
} catch (\Throwable $e) {
$message = $e->getMessage();
Logger::getInstance()->error("延时队列消费异常:" . $message);
Logger::getInstance()->error("延时队列消费异常:" . $e->getTraceAsString());
$isKillProcess = false;
if (strrpos(strtoupper($message), 'SQLSTATE') !== false) {
\Co::sleep(1);
Logger::getInstance()->error("数据库异常:" . $message);
$message = $message . ',重启进程';
$isKillProcess = true;
}
// 邮件通知管理员
$title = '延时队列消费异常:' . $message;
$body = '数据='.$msg->body.',异常信息:'.$e->getTraceAsString();
$form = [
'recipient' => '',
'title' => $title,
'body' => $body,
'cc' => '',
'bcc' => ''
];
// 重启进程
MqComposer::workQueue('SEND_EMAIL', $form, '异常邮件发送');
if ($isKillProcess == true) {
$channel->close();
$connection->close();
\Co::sleep(1);
$cmd = "php easyswoole process kill --pid={$pid}";
Logger::getInstance()->error("已完成进程重启cmd:\r\n" . $cmd);
shell_exec($cmd);
}
}
官方一直没解决这个问题 也没办法
可以使用非连接池模式,用 easyswoole/mysqli 组件即可,当需要使用数据库时才进行连接mysql并操作数据