GatewayWorker icon indicating copy to clipboard operation
GatewayWorker copied to clipboard

一个奇葩的心跳验证需求

Open YoY1991 opened this issue 4 years ago • 1 comments

公司要求客户端心跳连续成功一定次数之后,想让客户端下次更长时间之后再发送心跳

我看了下gatewayworker的心跳处理,没法实现,然后我在这个类上加上了一个公开属性$pingStepConf,默认值是 [ 0 => 1 ],0表示连续接收客户端发来的消息总数,1表示允许客户端未发送数据包给服务器的总数的倍数,就是不想改之前的心跳逻辑和心跳检测的定时器,直接在判断那里加了一个倍数$step

然后在start文件中,在实例化的Gateway类的对象里面,修改对象的属性pingStepConf, 例如:[ 0 => 1, 10 => 3, 30 => 5, 60 => 10 ] 假设:pingInterval = 10; pingNotResponseLimit = 1; 那么服务端允许客户端心跳时间间隔为: 当客户端连续发送数据给服务器计数为:0到9之间,发送心跳的时间间隔为101 当客户端连续发送数据给服务器计数为:10到29之间,发送心跳的时间间隔为103 当客户端连续发送数据给服务器计数为:30到59之间,发送心跳的时间间隔为105 当客户端连续发送数据给服务器计数为:60及以上,发送心跳的时间间隔为1010

不知道这样可行否

修改了GatewayWorker\Gateway的这个类

* @copyright walkor * @link http://www.workerman.net/ * @license http://www.opensource.org/licenses/mit-license.php MIT License */ namespace GatewayWorker; use GatewayWorker\Lib\Context; use Workerman\Connection\TcpConnection; use Workerman\Worker; use Workerman\Lib\Timer; use Workerman\Autoloader; use Workerman\Connection\AsyncTcpConnection; use GatewayWorker\Protocols\GatewayProtocol; /** * * Gateway,基于Worker 开发 * 用于转发客户端的数据给Worker处理,以及转发Worker的数据给客户端 * * @author walkor * */ class Gateway extends Worker { /** * 版本 * * @var string */ const VERSION = '3.0.12'; /** * 本机 IP * 单机部署默认 127.0.0.1,如果是分布式部署,需要设置成本机 IP * * @var string */ public $lanIp = '127.0.0.1'; /** * 本机端口 * * @var string */ public $lanPort = 0; /** * gateway 内部通讯起始端口,每个 gateway 实例应该都不同,步长1000 * * @var int */ public $startPort = 2000; /** * 注册服务地址,用于注册 Gateway BusinessWorker,使之能够通讯 * * @var string|array */ public $registerAddress = '127.0.0.1:1236'; /** * 是否可以平滑重启,gateway 不能平滑重启,否则会导致连接断开 * * @var bool */ public $reloadable = false; /** * 心跳时间间隔 * * @var int */ public $pingInterval = 0; /** * $pingNotResponseLimit * $pingInterval 时间内,客户端未发送任何数据,断开客户端连接 * * @var int */ public $pingNotResponseLimit = 0; /** * 服务端向客户端发送的心跳数据 * * @var string */ public $pingData = ''; /* YoY Add 新增针对不同情况设置心跳机制 */ // 心跳间隔区间设置 public $pingStepConf = array( '0' => 1, // 连续发送心跳成功的次数大于0,允许特定时间内允许的发送心跳次数的倍数是1 ); /** * 秘钥 * * @var string */ public $secretKey = ''; /** * 路由函数 * * @var callback */ public $router = null; /** * gateway进程转发给businessWorker进程的发送缓冲区大小 * * @var int */ public $sendToWorkerBufferSize = 10240000; /** * gateway进程将数据发给客户端时每个客户端发送缓冲区大小 * * @var int */ public $sendToClientBufferSize = 1024000; /** * 协议加速 * * @var bool */ public $protocolAccelerate = false; /** * 保存客户端的所有 connection 对象 * * @var array */ protected $_clientConnections = array(); /** * uid 到 connection 的映射,一对多关系 */ protected $_uidConnections = array(); /** * group 到 connection 的映射,一对多关系 * * @var array */ protected $_groupConnections = array(); /** * 保存所有 worker 的内部连接的 connection 对象 * * @var array */ protected $_workerConnections = array(); /** * gateway 内部监听 worker 内部连接的 worker * * @var Worker */ protected $_innerTcpWorker = null; /** * 当 worker 启动时 * * @var callback */ protected $_onWorkerStart = null; /** * 当有客户端连接时 * * @var callback */ protected $_onConnect = null; /** * 当客户端发来消息时 * * @var callback */ protected $_onMessage = null; /** * 当客户端连接关闭时 * * @var callback */ protected $_onClose = null; /** * 当 worker 停止时 * * @var callback */ protected $_onWorkerStop = null; /** * 进程启动时间 * * @var int */ protected $_startTime = 0; /** * gateway 监听的端口 * * @var int */ protected $_gatewayPort = 0; /** * connectionId 记录器 * @var int */ protected static $_connectionIdRecorder = 0; /** * 用于保持长连接的心跳时间间隔 * * @var int */ const PERSISTENCE_CONNECTION_PING_INTERVAL = 25; /** * 构造函数 * * @param string $socket_name * @param array $context_option */ public function __construct($socket_name, $context_option = array()) { parent::__construct($socket_name, $context_option); $this->_gatewayPort = substr(strrchr($socket_name,':'),1); $this->router = array("\\GatewayWorker\\Gateway", 'routerBind'); $backtrace = debug_backtrace(); $this->_autoloadRootPath = dirname($backtrace[0]['file']); } /** * {@inheritdoc} */ public function run() { // 保存用户的回调,当对应的事件发生时触发 $this->_onWorkerStart = $this->onWorkerStart; $this->onWorkerStart = array($this, 'onWorkerStart'); // 保存用户的回调,当对应的事件发生时触发 $this->_onConnect = $this->onConnect; $this->onConnect = array($this, 'onClientConnect'); // onMessage禁止用户设置回调 $this->onMessage = array($this, 'onClientMessage'); // 保存用户的回调,当对应的事件发生时触发 $this->_onClose = $this->onClose; $this->onClose = array($this, 'onClientClose'); // 保存用户的回调,当对应的事件发生时触发 $this->_onWorkerStop = $this->onWorkerStop; $this->onWorkerStop = array($this, 'onWorkerStop'); if (!is_array($this->registerAddress)) { $this->registerAddress = array($this->registerAddress); } // 记录进程启动的时间 $this->_startTime = time(); // 运行父方法 parent::run(); } /** * 当客户端发来数据时,转发给worker处理 * * @param TcpConnection $connection * @param mixed $data */ public function onClientMessage($connection, $data) { $connection->pingNotResponseCount = -1; /** * YoY Add 累积客户端发送给服务端消息的次数 */ $connection->clientSendMessageCount++; $this->sendToWorker(GatewayProtocol::CMD_ON_MESSAGE, $connection, $data); } /** * 当客户端连接上来时,初始化一些客户端的数据 * 包括全局唯一的client_id、初始化session等 * * @param TcpConnection $connection */ public function onClientConnect($connection) { $connection->id = self::generateConnectionId(); // 保存该连接的内部通讯的数据包报头,避免每次重新初始化 $connection->gatewayHeader = array( 'local_ip' => ip2long($this->lanIp), 'local_port' => $this->lanPort, 'client_ip' => ip2long($connection->getRemoteIp()), 'client_port' => $connection->getRemotePort(), 'gateway_port' => $this->_gatewayPort, 'connection_id' => $connection->id, 'flag' => 0, ); // 连接的 session $connection->session = ''; // 该连接的心跳参数 $connection->pingNotResponseCount = -1; /** * YoY Add * YoY 针对心跳优化做的功能新增,根据不同的心跳成功次数,提高心跳间隔允许的未发送次数, * 这样间接的提高心跳时间,减少稳定下面的socket心跳频率 */ // 初始化设置客户端连续成功给服务端发送消息的次数为0 $connection->clientSendMessageCount = 0; // 初始化设置客户端连续成功给服务器发送消息的次数对应在心跳时间间隔内,允许的最大未发送次数 $connection->pingStepConf = $this->pingStepConf; // 该链接发送缓冲区大小 $connection->maxSendBufferSize = $this->sendToClientBufferSize; // 保存客户端连接 connection 对象 $this->_clientConnections[$connection->id] = $connection; // 如果用户有自定义 onConnect 回调,则执行 if ($this->_onConnect) { call_user_func($this->_onConnect, $connection); } elseif ($connection->protocol === '\Workerman\Protocols\Websocket') { $connection->onWebSocketConnect = array($this, 'onWebsocketConnect'); } $this->sendToWorker(GatewayProtocol::CMD_ON_CONNECT, $connection); } /** * websocket握手时触发 * * @param $connection * @param $http_buffer */ public function onWebsocketConnect($connection, $http_buffer) { $this->sendToWorker(GatewayProtocol::CMD_ON_WEBSOCKET_CONNECT, $connection, array('get' => $_GET, 'server' => $_SERVER, 'cookie' => $_COOKIE)); } /** * 生成connection id * @return int */ protected function generateConnectionId() { $max_unsigned_int = 4294967295; if (self::$_connectionIdRecorder >= $max_unsigned_int) { self::$_connectionIdRecorder = 0; } while(++self::$_connectionIdRecorder _clientConnections[self::$_connectionIdRecorder])) { break; } } return self::$_connectionIdRecorder; } /** * 发送数据给 worker 进程 * * @param int $cmd * @param TcpConnection $connection * @param mixed $body * @return bool */ protected function sendToWorker($cmd, $connection, $body = '') { $gateway_data = $connection->gatewayHeader; $gateway_data['cmd'] = $cmd; $gateway_data['body'] = $body; $gateway_data['ext_data'] = $connection->session; if ($this->_workerConnections) { // 调用路由函数,选择一个worker把请求转发给它 /** @var TcpConnection $worker_connection */ $worker_connection = call_user_func($this->router, $this->_workerConnections, $connection, $cmd, $body); if (false === $worker_connection->send($gateway_data)) { $msg = "SendBufferToWorker fail. May be the send buffer are overflow. See http://wiki.workerman.net/Error2"; static::log($msg); return false; } } // 没有可用的 worker else { // gateway 启动后 1-2 秒内 SendBufferToWorker fail 是正常现象,因为与 worker 的连接还没建立起来, // 所以不记录日志,只是关闭连接 $time_diff = 2; if (time() - $this->_startTime >= $time_diff) { $msg = 'SendBufferToWorker fail. The connections between Gateway and BusinessWorker are not ready. See http://wiki.workerman.net/Error3'; static::log($msg); } $connection->destroy(); return false; } return true; } /** * 随机路由,返回 worker connection 对象 * * @param array $worker_connections * @param TcpConnection $client_connection * @param int $cmd * @param mixed $buffer * @return TcpConnection */ public static function routerRand($worker_connections, $client_connection, $cmd, $buffer) { return $worker_connections[array_rand($worker_connections)]; } /** * client_id 与 worker 绑定 * * @param array $worker_connections * @param TcpConnection $client_connection * @param int $cmd * @param mixed $buffer * @return TcpConnection */ public static function routerBind($worker_connections, $client_connection, $cmd, $buffer) { if (!isset($client_connection->businessworker_address) || !isset($worker_connections[$client_connection->businessworker_address])) { $client_connection->businessworker_address = array_rand($worker_connections); } return $worker_connections[$client_connection->businessworker_address]; } /** * 当客户端关闭时 * * @param TcpConnection $connection */ public function onClientClose($connection) { // 尝试通知 worker,触发 Event::onClose $this->sendToWorker(GatewayProtocol::CMD_ON_CLOSE, $connection); unset($this->_clientConnections[$connection->id]); // 清理 uid 数据 if (!empty($connection->uid)) { $uid = $connection->uid; unset($this->_uidConnections[$uid][$connection->id]); if (empty($this->_uidConnections[$uid])) { unset($this->_uidConnections[$uid]); } } // 清理 group 数据 if (!empty($connection->groups)) { foreach ($connection->groups as $group) { unset($this->_groupConnections[$group][$connection->id]); if (empty($this->_groupConnections[$group])) { unset($this->_groupConnections[$group]); } } } // 触发 onClose if ($this->_onClose) { call_user_func($this->_onClose, $connection); } } /** * 当 Gateway 启动的时候触发的回调函数 * * @return void */ public function onWorkerStart() { // 分配一个内部通讯端口 $this->lanPort = $this->startPort + $this->id; // 如果有设置心跳,则定时执行 if ($this->pingInterval > 0) { $timer_interval = $this->pingNotResponseLimit > 0 ? $this->pingInterval / 2 : $this->pingInterval; Timer::add($timer_interval, array($this, 'ping')); } // 如果BusinessWorker ip不是127.0.0.1,则需要加gateway到BusinessWorker的心跳 if ($this->lanIp !== '127.0.0.1') { Timer::add(self::PERSISTENCE_CONNECTION_PING_INTERVAL, array($this, 'pingBusinessWorker')); } if (!class_exists('\Protocols\GatewayProtocol')) { class_alias('GatewayWorker\Protocols\GatewayProtocol', 'Protocols\GatewayProtocol'); } // 初始化 gateway 内部的监听,用于监听 worker 的连接已经连接上发来的数据 $this->_innerTcpWorker = new Worker("GatewayProtocol://{$this->lanIp}:{$this->lanPort}"); $this->_innerTcpWorker->listen(); $this->_innerTcpWorker->name = 'GatewayInnerWorker'; // 重新设置自动加载根目录 Autoloader::setRootPath($this->_autoloadRootPath); // 设置内部监听的相关回调 $this->_innerTcpWorker->onMessage = array($this, 'onWorkerMessage'); $this->_innerTcpWorker->onConnect = array($this, 'onWorkerConnect'); $this->_innerTcpWorker->onClose = array($this, 'onWorkerClose'); // 注册 gateway 的内部通讯地址,worker 去连这个地址,以便 gateway 与 worker 之间建立起 TCP 长连接 $this->registerAddress(); if ($this->_onWorkerStart) { call_user_func($this->_onWorkerStart, $this); } } /** * 当 worker 通过内部通讯端口连接到 gateway 时 * * @param TcpConnection $connection */ public function onWorkerConnect($connection) { $connection->maxSendBufferSize = $this->sendToWorkerBufferSize; $connection->authorized = $this->secretKey ? false : true; } /** * 当 worker 发来数据时 * * @param TcpConnection $connection * @param mixed $data * @throws \Exception * * @return void */ public function onWorkerMessage($connection, $data) { $cmd = $data['cmd']; if (empty($connection->authorized) && $cmd !== GatewayProtocol::CMD_WORKER_CONNECT && $cmd !== GatewayProtocol::CMD_GATEWAY_CLIENT_CONNECT) { self::log("Unauthorized request from " . $connection->getRemoteIp() . ":" . $connection->getRemotePort()); $connection->close(); return; } switch ($cmd) { // BusinessWorker连接Gateway case GatewayProtocol::CMD_WORKER_CONNECT: $worker_info = json_decode($data['body'], true); if ($worker_info['secret_key'] !== $this->secretKey) { self::log("Gateway: Worker key does not match ".var_export($this->secretKey, true)." !== ". var_export($this->secretKey)); $connection->close(); return; } $key = $connection->getRemoteIp() . ':' . $worker_info['worker_key']; // 在一台服务器上businessWorker->name不能相同 if (isset($this->_workerConnections[$key])) { self::log("Gateway: Worker->name conflict. Key:{$key}"); $connection->close(); return; } $connection->key = $key; $this->_workerConnections[$key] = $connection; $connection->authorized = true; return; // GatewayClient连接Gateway case GatewayProtocol::CMD_GATEWAY_CLIENT_CONNECT: $worker_info = json_decode($data['body'], true); if ($worker_info['secret_key'] !== $this->secretKey) { self::log("Gateway: GatewayClient key does not match ".var_export($this->secretKey, true)." !== ".var_export($this->secretKey, true)); $connection->close(); return; } $connection->authorized = true; return; // 向某客户端发送数据,Gateway::sendToClient($client_id, $message); case GatewayProtocol::CMD_SEND_TO_ONE: if (isset($this->_clientConnections[$data['connection_id']])) { $this->_clientConnections[$data['connection_id']]->send($data['body']); } return; // 踢出用户,Gateway::closeClient($client_id, $message); case GatewayProtocol::CMD_KICK: if (isset($this->_clientConnections[$data['connection_id']])) { $this->_clientConnections[$data['connection_id']]->close($data['body']); } return; // 立即销毁用户连接, Gateway::destroyClient($client_id); case GatewayProtocol::CMD_DESTROY: if (isset($this->_clientConnections[$data['connection_id']])) { $this->_clientConnections[$data['connection_id']]->destroy(); } return; // 广播, Gateway::sendToAll($message, $client_id_array) case GatewayProtocol::CMD_SEND_TO_ALL: $raw = (bool)($data['flag'] & GatewayProtocol::FLAG_NOT_CALL_ENCODE); $body = $data['body']; if (!$raw && $this->protocolAccelerate && $this->protocol) { $body = $this->preEncodeForClient($body); $raw = true; } $ext_data = $data['ext_data'] ? json_decode($data['ext_data'], true) : ''; // $client_id_array 不为空时,只广播给 $client_id_array 指定的客户端 if (isset($ext_data['connections'])) { foreach ($ext_data['connections'] as $connection_id) { if (isset($this->_clientConnections[$connection_id])) { $this->_clientConnections[$connection_id]->send($body, $raw); } } } // $client_id_array 为空时,广播给所有在线客户端 else { $exclude_connection_id = !empty($ext_data['exclude']) ? $ext_data['exclude'] : null; foreach ($this->_clientConnections as $client_connection) { if (!isset($exclude_connection_id[$client_connection->id])) { $client_connection->send($body, $raw); } } } return; case GatewayProtocol::CMD_SELECT: $client_info_array = array(); $ext_data = json_decode($data['ext_data'], true); if (!$ext_data) { echo 'CMD_SELECT ext_data=' . var_export($data['ext_data'], true) . '\r\n'; $buffer = serialize($client_info_array); $connection->send(pack('N', strlen($buffer)) . $buffer, true); return; } $fields = $ext_data['fields']; $where = $ext_data['where']; if ($where) { $connection_box_map = array( 'groups' => $this->_groupConnections, 'uid' => $this->_uidConnections ); // $where = ['groups'=>[x,x..], 'uid'=>[x,x..], 'connection_id'=>[x,x..]] foreach ($where as $key => $items) { if ($key !== 'connection_id') { $connections_box = $connection_box_map[$key]; foreach ($items as $item) { if (isset($connections_box[$item])) { foreach ($connections_box[$item] as $connection_id => $client_connection) { if (!isset($client_info_array[$connection_id])) { $client_info_array[$connection_id] = array(); // $fields = ['groups', 'uid', 'session'] foreach ($fields as $field) { $client_info_array[$connection_id][$field] = isset($client_connection->$field) ? $client_connection->$field : null; } } } } } } else { foreach ($items as $connection_id) { if (isset($this->_clientConnections[$connection_id])) { $client_connection = $this->_clientConnections[$connection_id]; $client_info_array[$connection_id] = array(); // $fields = ['groups', 'uid', 'session'] foreach ($fields as $field) { $client_info_array[$connection_id][$field] = isset($client_connection->$field) ? $client_connection->$field : null; } } } } } } else { foreach ($this->_clientConnections as $connection_id => $client_connection) { foreach ($fields as $field) { $client_info_array[$connection_id][$field] = isset($client_connection->$field) ? $client_connection->$field : null; } } } $buffer = serialize($client_info_array); $connection->send(pack('N', strlen($buffer)) . $buffer, true); return; // 获取在线群组列表 case GatewayProtocol::CMD_GET_GROUP_ID_LIST: $buffer = serialize(array_keys($this->_groupConnections)); $connection->send(pack('N', strlen($buffer)) . $buffer, true); return; // 重新赋值 session case GatewayProtocol::CMD_SET_SESSION: if (isset($this->_clientConnections[$data['connection_id']])) { $this->_clientConnections[$data['connection_id']]->session = $data['ext_data']; } return; // session合并 case GatewayProtocol::CMD_UPDATE_SESSION: if (!isset($this->_clientConnections[$data['connection_id']])) { return; } else { if (!$this->_clientConnections[$data['connection_id']]->session) { $this->_clientConnections[$data['connection_id']]->session = $data['ext_data']; return; } $session = Context::sessionDecode($this->_clientConnections[$data['connection_id']]->session); $session_for_merge = Context::sessionDecode($data['ext_data']); $session = array_replace_recursive($session, $session_for_merge); $this->_clientConnections[$data['connection_id']]->session = Context::sessionEncode($session); } return; case GatewayProtocol::CMD_GET_SESSION_BY_CLIENT_ID: if (!isset($this->_clientConnections[$data['connection_id']])) { $session = serialize(null); } else { if (!$this->_clientConnections[$data['connection_id']]->session) { $session = serialize(array()); } else { $session = $this->_clientConnections[$data['connection_id']]->session; } } $connection->send(pack('N', strlen($session)) . $session, true); return; // 获得客户端sessions case GatewayProtocol::CMD_GET_ALL_CLIENT_SESSIONS: $client_info_array = array(); foreach ($this->_clientConnections as $connection_id => $client_connection) { $client_info_array[$connection_id] = $client_connection->session; } $buffer = serialize($client_info_array); $connection->send(pack('N', strlen($buffer)) . $buffer, true); return; // 判断某个 client_id 是否在线 Gateway::isOnline($client_id) case GatewayProtocol::CMD_IS_ONLINE: $buffer = serialize((int)isset($this->_clientConnections[$data['connection_id']])); $connection->send(pack('N', strlen($buffer)) . $buffer, true); return; // 将 client_id 与 uid 绑定 case GatewayProtocol::CMD_BIND_UID: $uid = $data['ext_data']; if (empty($uid)) { echo "bindUid(client_id, uid) uid empty, uid=" . var_export($uid, true); return; } $connection_id = $data['connection_id']; if (!isset($this->_clientConnections[$connection_id])) { return; } $client_connection = $this->_clientConnections[$connection_id]; if (isset($client_connection->uid)) { $current_uid = $client_connection->uid; unset($this->_uidConnections[$current_uid][$connection_id]); if (empty($this->_uidConnections[$current_uid])) { unset($this->_uidConnections[$current_uid]); } } $client_connection->uid = $uid; $this->_uidConnections[$uid][$connection_id] = $client_connection; return; // client_id 与 uid 解绑 Gateway::unbindUid($client_id, $uid); case GatewayProtocol::CMD_UNBIND_UID: $connection_id = $data['connection_id']; if (!isset($this->_clientConnections[$connection_id])) { return; } $client_connection = $this->_clientConnections[$connection_id]; if (isset($client_connection->uid)) { $current_uid = $client_connection->uid; unset($this->_uidConnections[$current_uid][$connection_id]); if (empty($this->_uidConnections[$current_uid])) { unset($this->_uidConnections[$current_uid]); } $client_connection->uid_info = ''; $client_connection->uid = null; } return; // 发送数据给 uid Gateway::sendToUid($uid, $msg); case GatewayProtocol::CMD_SEND_TO_UID: $uid_array = json_decode($data['ext_data'], true); foreach ($uid_array as $uid) { if (!empty($this->_uidConnections[$uid])) { foreach ($this->_uidConnections[$uid] as $connection) { /** @var TcpConnection $connection */ $connection->send($data['body']); } } } return; // 将 $client_id 加入用户组 Gateway::joinGroup($client_id, $group); case GatewayProtocol::CMD_JOIN_GROUP: $group = $data['ext_data']; if (empty($group)) { echo "join(group) group empty, group=" . var_export($group, true); return; } $connection_id = $data['connection_id']; if (!isset($this->_clientConnections[$connection_id])) { return; } $client_connection = $this->_clientConnections[$connection_id]; if (!isset($client_connection->groups)) { $client_connection->groups = array(); } $client_connection->groups[$group] = $group; $this->_groupConnections[$group][$connection_id] = $client_connection; return; // 将 $client_id 从某个用户组中移除 Gateway::leaveGroup($client_id, $group); case GatewayProtocol::CMD_LEAVE_GROUP: $group = $data['ext_data']; if (empty($group)) { echo "leave(group) group empty, group=" . var_export($group, true); return; } $connection_id = $data['connection_id']; if (!isset($this->_clientConnections[$connection_id])) { return; } $client_connection = $this->_clientConnections[$connection_id]; if (!isset($client_connection->groups[$group])) { return; } unset($client_connection->groups[$group], $this->_groupConnections[$group][$connection_id]); if (empty($this->_groupConnections[$group])) { unset($this->_groupConnections[$group]); } return; // 解散分组 case GatewayProtocol::CMD_UNGROUP: $group = $data['ext_data']; if (empty($group)) { echo "leave(group) group empty, group=" . var_export($group, true); return; } if (empty($this->_groupConnections[$group])) { return; } foreach ($this->_groupConnections[$group] as $client_connection) { unset($client_connection->groups[$group]); } unset($this->_groupConnections[$group]); return; // 向某个用户组发送消息 Gateway::sendToGroup($group, $msg); case GatewayProtocol::CMD_SEND_TO_GROUP: $raw = (bool)($data['flag'] & GatewayProtocol::FLAG_NOT_CALL_ENCODE); $body = $data['body']; if (!$raw && $this->protocolAccelerate && $this->protocol) { $body = $this->preEncodeForClient($body); $raw = true; } $ext_data = json_decode($data['ext_data'], true); $group_array = $ext_data['group']; $exclude_connection_id = $ext_data['exclude']; foreach ($group_array as $group) { if (!empty($this->_groupConnections[$group])) { foreach ($this->_groupConnections[$group] as $connection) { if(!isset($exclude_connection_id[$connection->id])) { /** @var TcpConnection $connection */ $connection->send($body, $raw); } } } } return; // 获取某用户组成员信息 Gateway::getClientSessionsByGroup($group); case GatewayProtocol::CMD_GET_CLIENT_SESSIONS_BY_GROUP: $group = $data['ext_data']; if (!isset($this->_groupConnections[$group])) { $buffer = serialize(array()); $connection->send(pack('N', strlen($buffer)) . $buffer, true); return; } $client_info_array = array(); foreach ($this->_groupConnections[$group] as $connection_id => $client_connection) { $client_info_array[$connection_id] = $client_connection->session; } $buffer = serialize($client_info_array); $connection->send(pack('N', strlen($buffer)) . $buffer, true); return; // 获取用户组成员数 Gateway::getClientCountByGroup($group); case GatewayProtocol::CMD_GET_CLIENT_COUNT_BY_GROUP: $group = $data['ext_data']; $count = 0; if ($group !== '') { if (isset($this->_groupConnections[$group])) { $count = count($this->_groupConnections[$group]); } } else { $count = count($this->_clientConnections); } $buffer = serialize($count); $connection->send(pack('N', strlen($buffer)) . $buffer, true); return; // 获取与某个 uid 绑定的所有 client_id Gateway::getClientIdByUid($uid); case GatewayProtocol::CMD_GET_CLIENT_ID_BY_UID: $uid = $data['ext_data']; if (empty($this->_uidConnections[$uid])) { $buffer = serialize(array()); } else { $buffer = serialize(array_keys($this->_uidConnections[$uid])); } $connection->send(pack('N', strlen($buffer)) . $buffer, true); return; default : $err_msg = "gateway inner pack err cmd=$cmd"; echo $err_msg; } } /** * 当worker连接关闭时 * * @param TcpConnection $connection */ public function onWorkerClose($connection) { if (isset($connection->key)) { unset($this->_workerConnections[$connection->key]); } } /** * 存储当前 Gateway 的内部通信地址 * * @return bool */ public function registerAddress() { $address = $this->lanIp . ':' . $this->lanPort; foreach ($this->registerAddress as $register_address) { $register_connection = new AsyncTcpConnection("text://{$register_address}"); $secret_key = $this->secretKey; $register_connection->onConnect = function($register_connection) use ($address, $secret_key, $register_address){ $register_connection->send('{"event":"gateway_connect", "address":"' . $address . '", "secret_key":"' . $secret_key . '"}'); // 如果Register服务器不在本地服务器,则需要保持心跳 if (strpos($register_address, '127.0.0.1') !== 0) { $register_connection->ping_timer = Timer::add(self::PERSISTENCE_CONNECTION_PING_INTERVAL, function () use ($register_connection) { $register_connection->send('{"event":"ping"}'); }); } }; $register_connection->onClose = function ($register_connection) { if(!empty($register_connection->ping_timer)) { Timer::del($register_connection->ping_timer); } $register_connection->reconnect(1); }; $register_connection->connect(); } } /** * YoY Add * 获取心跳倍率 * @param $con * @return int */ private function getPingStep( $con ) { // 如果传过来的参数不是一个连接对象,就返回0 if( !is_object($con) ) { return 0; } // 如果传过来的连接,没有设置步长,默认是1倍,按照之前的方式处理 if( empty($con->pingStepConf) || !is_array($con->pingStepConf) ) { return 1; } // 如果传过来的连接,没有设置客户端与服务端通讯的连续成功次数,那么就按照之前的心跳方式处理 if( !isset($con->clientSendMessageCount) ) { return 1; } // 按照 key 降序排序 krsort( $con->pingStepConf ); foreach ( $con->pingStepConf as $min_client_send_message_count => $multiple_num ) { if( $con->clientSendMessageCount >= $min_client_send_message_count ) { // 返回对应的倍率,转int return intval($multiple_num); } } // 如果还是没有找找满足条件的配置,那么就使用默认的1倍的倍率 return 1; } /** * 心跳逻辑 * * @return void */ public function ping() { $ping_data = $this->pingData ? (string)$this->pingData : null; $raw = false; if ($this->protocolAccelerate && $ping_data && $this->protocol) { $ping_data = $this->preEncodeForClient($ping_data); $raw = true; } // 遍历所有客户端连接 foreach ($this->_clientConnections as $connection) { // 获取对应连接的心跳倍率 $pingStep = $this->getPingStep( $connection ); var_dump('====== 连接'.$connection->id.' start ======'); var_dump('连接'.$connection->id.'的累积收到消息:'.$connection->clientSendMessageCount); var_dump('连接'.$connection->id.'倍率:'.$pingStep); var_dump('连接'.$connection->id.'未收到客户端消息累积:'.$connection->pingNotResponseCount); var_dump('连接'.$connection->id.'允许最大未发消息数:'.$this->pingNotResponseLimit * 2 * $pingStep); var_dump('====== 连接'.$connection->id.' end ======'); // 上次发送的心跳还没有回复次数大于限定值就断开 if ($this->pingNotResponseLimit > 0 && $connection->pingNotResponseCount >= $this->pingNotResponseLimit * 2 * $pingStep ) { $connection->destroy(); continue; } // $connection->pingNotResponseCount 为 -1 说明最近客户端有发来消息,则不给客户端发送心跳 $connection->pingNotResponseCount++; if ($ping_data) { if ($connection->pingNotResponseCount === 0 || ($this->pingNotResponseLimit > 0 && $connection->pingNotResponseCount % 2 === 1) ) { continue; } $connection->send($ping_data, $raw); } } } /** * 向 BusinessWorker 发送心跳数据,用于保持长连接 * * @return void */ public function pingBusinessWorker() { $gateway_data = GatewayProtocol::$empty; $gateway_data['cmd'] = GatewayProtocol::CMD_PING; foreach ($this->_workerConnections as $connection) { $connection->send($gateway_data); } } /** * @param mixed $data * * @return string */ protected function preEncodeForClient($data) { foreach ($this->_clientConnections as $client_connection) { return call_user_func(array($client_connection->protocol, 'encode'), $data, $client_connection); } } /** * 当 gateway 关闭时触发,清理数据 * * @return void */ public function onWorkerStop() { // 尝试触发用户设置的回调 if ($this->_onWorkerStop) { call_user_func($this->_onWorkerStop, $this); } } /** * Log. * @param string $msg */ public static function log($msg){ Timer::add(1, function() use ($msg) { Worker::log($msg); }, null, false); } }

YoY1991 avatar Sep 10 '19 10:09 YoY1991

收到心跳了可以自己统计,很简单

dignfei avatar Apr 30 '21 10:04 dignfei