easyswoole icon indicating copy to clipboard operation
easyswoole copied to clipboard

easyswoole 使用php-amqplib/php-amqplib 进行自定义消费会出现mysql连接断开

Open 645263 opened this issue 2 years ago • 6 comments

软件版本: easyswoole: 3.5.1 php-amqplib/php-amqplib:3.1.2 swoole: 4.4.23

问题出现时机: 1、项目运行1天样子基本就会出现 SQLSTATE[HY000] [2006] MySQL server has gone away ,一旦自定义进程出现这个报错,必须重启进程才可以解决, 2、在项目消费者里面已经尝试加了 \Co::sleep(0.01);,并未解决mysql 链接断开的问题 一旦自定义进程报 MySQL server has gone away,后面的消费 依然是继续报: Connection reset by peer or Transport endpoint is not connected

群里网名“ 那就这样吧” 的开发者也遇到同样的问题

目前的解决办法: 1、捕获异常,重新入列,然后重启进程

希望官方解决一下这个问题,感谢

自定义进程代码:

<?php
namespace App\Process\RabbitMqConsumer;
use App\HttpController\ShopApi\Service\ClientService;
use App\Models\VipPayOrderModel;
use App\Service\LogService;
use App\Service\MqFanoutService;
use EasySwoole\Component\Process\AbstractProcess;
use EasySwoole\EasySwoole\Logger;
use EasySwoole\RabbitMq\MqJob;
use EasySwoole\RabbitMq\MqQueue;
use EasySwoole\RabbitMq\RabbitMqQueueDriver;

/**
 * mq广播消费进程
 * Class ClientRegisterFinishProcess
 * @package App\Process
 */
class RabbitMqConsumerProcess extends AbstractProcess
{
    protected function run($arg)
    {
        $queueName  = 'WORK_QUEUE';
        $config     = \EasySwoole\EasySwoole\Config::getInstance()->getConf("rabbitmq");
        $connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['password']);
        $channel    = $connection->channel();
        $channel->queue_declare($queueName, false, true, false, false);
        $callback = function ($msg) use ($channel){
            $headersObject = $msg->get_properties()['application_headers'];
            $headersArray  = $headersObject->getNativeData();
            $body = json_decode($msg->body, true);
            $tag = $body['desc'];
            $msgId = $body['msgId'];
            $listenerKey = $body['listenerKey'];
            try {
                if ($listenerKey){
                    \Co::sleep(0.01);
                    \EasySwoole\EasySwoole\Task\TaskManager::getInstance()->sync(function () use ($body){
                        WorkQueueService::handler($body);
                    });
                    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
                }
            }catch (\Throwable $e){
                LogService::updateRabbitMqLog($msgId,'RabbitMqQueueConsumerProcess 消费异常:'.$e->getMessage(),2);
                if($headersArray['retry'] > $headersArray['maxRetry']){
                    Logger::getInstance()->error("{$tag}-达到最大重试次数!,停止重试,参数={$msg->body},errMsg=".$e->getMessage());
                    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
                    //todo 这里需要写db
                    return;
                }
                Logger::getInstance()->error("{$tag}-工作队列处理异常,开始重试!,参数={$msg->body},errMsg=".$e->getMessage());
                $headersArray['retry']++;
                Logger::getInstance()->waring("{$tag}-消息ID:".$body['msgId'].' 第'.$headersArray['retry'].'次失败,消息重新入队');
                $exchange      = $msg->getExchange();
                $routingKey    = $msg->getRoutingKey();
                $body          = $msg->getBody();
                \Co::sleep(5);
                $msg->delivery_info['channel']->basic_publish(
                    new AMQPMessage($body,['application_headers'=>new AMQPTable($headersArray)]),
                    $exchange,
                    $routingKey
                );
                $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
            }
        };
        $channel->basic_qos(null, 1, null);
        $channel->basic_consume($queueName, '', false, false, false, false, $callback);
        while (count($channel->callbacks)) {
            $channel->wait();
        }
        $channel->close();
        $connection->close();
    }

    protected function onException(\Throwable $throwable, ...$args)
    {
        Logger::getInstance()->error("广播队列异常:".$throwable->getMessage());
        Logger::getInstance()->error("广播队列异常trace:".$throwable->getTraceAsString());
        parent::onException($throwable, $args);
    }

    protected function onShutDown()
    {
        Logger::getInstance()->waring("RabbitMqConsumerProcess 进程退出了");
        parent::onShutDown(); // TODO: Change the autogenerated stub
    }
}

645263 avatar Mar 20 '22 14:03 645263

是因为rabbitmq 中的订阅消息模式是采用的while true死循环,导致mysql链接池无法回收process中的链接,如果队列中在一定时间内不进行sql操作,导致链接空闲时间达到数据库空闲链接时间上限,mysql会主动断开此链接,由于ORM未进行重连机制,所以会一段时间没有操作后出现此报错 SQLSTATE[HY000] [2006] MySQL server has gone away,需要更换mysql 操作工具,可使用此包,composer require simple-swoole/db(包内部采用了短线重连机制,并实现了swoole自带链接池),使用process的时候尽量不进行调用外部类方法,redis尽量也使用独立的链接(可采用REDIS扩展)

整体集成方案可参考此包 https://gitee.com/orange-studio/simple-handle

function-gy avatar Mar 22 '22 06:03 function-gy

是因为rabbitmq 中的订阅消息模式是采用的while true死循环,导致mysql链接池无法回收process中的链接,如果队列中在一定时间内不进行sql操作,导致链接空闲时间达到数据库空闲链接时间上限,mysql会主动断开此链接,由于ORM未进行重连机制,所以会一段时间没有操作后出现此报错 SQLSTATE[HY000] [2006] MySQL server has gone away,需要更换mysql 操作工具,可使用此包,composer require simple-swoole/db(包内部采用了短线重连机制,并实现了swoole自带链接池),使用process的时候尽量不进行调用外部类方法,redis尽量也使用独立的链接(可采用REDIS扩展)

整体集成方案可参考此包 https://gitee.com/orange-studio/simple-handle

谢谢,我目前使用的解决方案是 自己监听异常,当监听到 SQLSTATE 后就直接close rabbitmq 消费者链接,然后再kill 到当前进程,easyswoole 会重启进程,然后继续完成消费,如果现在去改的话 有太多代码要改

645263 avatar Mar 22 '22 06:03 645263

队列注意ACK机制,防止消息丢失,其他问题不大

function-gy avatar Mar 22 '22 06:03 function-gy

队列注意ACK机制,防止消息丢失,其他问题不大

我是采用手动ack,就是必须执行完成后我才提交ack,这样进程下次启动,消息依然会被消费,直到正常执行完成


} catch (\Throwable $e) {
                    $message = $e->getMessage();
                    Logger::getInstance()->error("延时队列消费异常:" . $message);
                    Logger::getInstance()->error("延时队列消费异常:" . $e->getTraceAsString());
                    $isKillProcess = false;
                    if (strrpos(strtoupper($message), 'SQLSTATE') !== false) {
                        \Co::sleep(1);
                        Logger::getInstance()->error("数据库异常:" . $message);
                        $message = $message . ',重启进程';
                        $isKillProcess = true;
                    }
                    // 邮件通知管理员
                    $title = '延时队列消费异常:' . $message;
                    $body = '数据='.$msg->body.',异常信息:'.$e->getTraceAsString();
                    $form  = [
                        'recipient' => '',
                        'title'     => $title,
                        'body'      => $body,
                        'cc'        => '',
                        'bcc'       => ''
                    ];
                    // 重启进程
                    MqComposer::workQueue('SEND_EMAIL', $form, '异常邮件发送');
                    if ($isKillProcess == true) {
                        $channel->close();
                        $connection->close();
                        \Co::sleep(1);
                        $cmd = "php easyswoole process kill --pid={$pid}";
                        Logger::getInstance()->error("已完成进程重启cmd:\r\n" . $cmd);
                        shell_exec($cmd);
                    }
                }

645263 avatar Mar 22 '22 07:03 645263

官方一直没解决这个问题 也没办法

function-gy avatar Mar 22 '22 07:03 function-gy

可以使用非连接池模式,用 easyswoole/mysqli 组件即可,当需要使用数据库时才进行连接mysql并操作数据

XueSiLf avatar May 13 '22 15:05 XueSiLf