skynet icon indicating copy to clipboard operation
skynet copied to clipboard

cluster 集群组网

Open fanyh opened this issue 3 years ago • 31 comments

云大侠,我在某业务场景中需要实现 cluster 组网 skynet 进程都是等价的slave(不是使用master-slave模式) 目前cluser的配置只能声明

    nodename = "nodeaddress"

我期望的输入方式

    slave = {
        "address",
        "address"
    }

loadconfig中config已经规定了cluser的配置格式

    for name,address in pairs(tmp) do
        if name:sub(1,2) == "__" then
            name = name:sub(3)
            config[name] = address
            skynet.error(string.format("Config %s = %s", name, address))
        else
            assert(address == false or type(address) == "string")
            if node_address[name] ~= address then
                -- address changed
                if node_sender[name] then
                    -- reset connection if node_sender[name] exist
                    node_channel[name] = nil 
                    table.insert(reload, name)
                end 
                node_address[name] = address
            end 
            local ct = connecting[name]
            if ct and ct.namequery and not config.nowaiting then
                skynet.error(string.format("Cluster node [%s] resloved : %s", name, address))
                skynet.wakeup(ct.namequery)
            end 
        end 
    end 

#Question: 我想问的是: 业务层不关心目标node在哪的情况下

可以对cluster进行扩展 支持 array吗?

还是自行将cluster包装一次 在wrapper中调度cluster (wrapper 中可以实现load level) ,例如: wrapper.lua:

local config = {
	slave = {
		"slave1",	--cluster config key
		"slave2"
	}
}

local wrapper = {}

function wrapper.call(name, ...)
	local slave = assert(config[name], name)
	local name = slave[math.random(1, #slave)] --load level
	cluserd.call(name, ...)
end

wrapper.call( slave,  ...)

这样就存在一个问题 cluster配置会变得臃肿 name将变成一个无意义的key cluser

    slave1 = "address"
    slave2 = "address"

fanyh avatar Jan 14 '22 07:01 fanyh

cluster 的配置不一定是文件,可以是一个 table 。你要做的是按你的需求去生成这个 table 。

name 是一个字符串,你想拼出 "name[42]" 或 "name.42" 都是没问题的。

cloudwu avatar Jan 14 '22 09:01 cloudwu

我明白你的意思 这样一来可能随着slave更换会出现一些问题

            if node_address[name] ~= address then
                -- address changed
                if node_sender[name] then
                    -- reset connection if node_sender[name] exist
                    node_channel[name] = nil 
                    table.insert(reload, name)
                end 
                node_address[name] = address
            end 

例如:

  1. 删除n slave name.n -> name.#slave 的channel都将重连
  2. cache清理不完全 删除数组尾步slave.-n -n -> #old_slave的channel 都将无法释放(除非出现更替)

第一反应想到这2种可能性

  1. 虽然可以通过替换的方式操作来避免这个问题 but 业务交付给运维后,这个表很大可能是运维在维护, so... 实际操作上极可能出现这个问题

或者是否考虑将open_channel中的bind实现改成如下

c = skynet.newservice("clustersender", key, nodename, host, port)
node_sender[address] = c

当然这将改写cache规则

fanyh avatar Jan 14 '22 10:01 fanyh

其实我也向希望cluster可以支持同类服务多个实例的情况下,自动路由,自动替换,特别是在某个节点不可用的情况下,类似微服务的容错处理。

terry8210 avatar Jan 25 '22 08:01 terry8210

因为 cluster 没有任何底层设施,所以你完全可以不用 cluster 根据需求自己实现组网的需求。cluster 可以作为一个实现的参考,它本身也没有太多代码。

skynet 作为公共代码,我觉得不应该提供、规范太多细节。不然会有很大的维护负担。

cloudwu avatar Jan 25 '22 08:01 cloudwu

我自己写了个cluster wrapper 来实现 route 和 load level 节点不可用这个比较麻烦 各种功能都支持 那差不多就等于弄个etcd 我目前使用的http来承担这些职责

另外 云大侠 cluserd 中 对应

assert(address == false or type(address) == "string")
if node_address[name] ~= address then
    -- address changed
    if node_sender[name] then
        -- reset connection if node_sender[name] exist
        node_channel[name] = nil 
        table.insert(reload, name)
    end 
    node_address[name] = address
end 

我理解为 可以通过将node[name] = false 来释放 clusterd 本地已建立的缓存节点

但是在 open_channel 中 没有释放 newservice 对应的sender

if address then
    ... 
else
    if node_sender[key] then
        skynet.kill(node_sender[key]) --free sender when address in down
    end   --这个条件语句是我认为需要添加的释放代码
    err = string.format("cluster node [%s] is %s.", key,  address == false and "down" or "absent")
end

在 address not ture 中直接去释放sender 是否正确 在对比测试中发现 如果不加skynet.kill 尽管 node 被设置成flase 但是在 lualib/skynet/cluster.d中

function cluster.call(node, address, ...)                                                            
    -- skynet.pack(...) will free by cluster.core.packrequest                                        
    local s = sender[node]                                                                           
    if not s then                                                                                    
        local task = skynet.packstring(address, ...)                                                 
        return skynet.call(get_sender(node), "lua", "req", repack(skynet.unpack(task)))              
    end                                                                                              
    return skynet.call(s, "lua", "req", address, skynet.pack(...))                                   
end                                                                                                  
         

sender存在cache 所以仍然能访问

我不太清楚 档 node == false 时 sender 仍然可以继续工作是故意为之(比如存在一些特殊状态 例如:call被挂起等等 )
还是 ... (bug?)

最后: 关于为啥不自行实现一套组网机制 个人觉得维护差异性版本太痛苦了 项目中我们一直跟踪最新的skynet,不想在魔改一份skynet so,情愿添加wrapper来绕回到 skynet

fanyh avatar Jan 26 '22 08:01 fanyh

我记得 sender 应该不能随便删的,因为服务可能会缓存地址,所以让那个 sender 负责拒绝服务更好。

我觉得再实现一套组网机构,应该不需要修改什么已有的代码,只是不用 cluster 模块就好了。skynet 现有的代码,也没有用到 cluster 。这个我觉得是可以独立出去的。

cloudwu avatar Jan 26 '22 10:01 cloudwu

仔细思考了下 sender拒绝服务 可能结果会更糟糕 如果业务持有sender (应该很少有具体业务持有sender;所以持有还是在 cluster.lua:get_sender上)

  • 如果在dispatch时判断sender是否异常 抛出skynet.error 这样跟kill node 后调用cluster.call 抛出node不存在 也没差了
  • 如果在dispatch返回服务拒绝 作为一个异常信号,这还需要业务层感知这个信息 但按cluser目前的设计来说,业务层不需要感知此内容 (ps:按当前版本设计思路的话,我觉得应该修改cluster.call/send 和 释放clust

在最近的cluster设计结构中 我觉得业务不应当持有sender 如果业务持有 那业务在reload时也应当自行释放sender 引用

cluster目前的设计思路对于业务层来说是1个很好的闭环 (业务层不需要了解对端服务器部署位置 仅需要向自己感兴趣的service 投递消息) 实现一套新的组网机制也只是重新组装一套 clusterd liker;并不需要修改skynet 部分的代码

但是就目前我们团队来说可能引入新的组网模式 填坑的能力略弱(ps:没用社区来帮忙验证/发现新的组网模式的error)

fanyh avatar Jan 27 '22 02:01 fanyh

其实我也向希望cluster可以支持同类服务多个实例的情况下,自动路由,自动替换,特别是在某个节点不可用的情况下,类似微服务的容错处理。

我们当前是玩家从wlogin登录,然后获取实际玩家代理所在服务wgame的地址;各节点配置放在etcd里面的,可以使用etcd来做是否服务可用,我们当前是wgame ping wlogin来确定是否可用。

JieTrancender avatar Jan 27 '22 03:01 JieTrancender

@fanyh @JieTrancender @cloudwu 我有一个想法打算这样实现,引入etcd作为服务发现,每个skynet有自己的唯一命名,例如game,login,common(好友,公会,支付充值)等等,特别针对一些无状态的服务,可以提供多份实例,然后每个进程都有这样一个服务master node用于把自身的服务名字(地址,端口,服务名,所属功能)注册到etcd,然后通过,cluster.register同时使用方启动一个slave node主要是监听那些节点服务有效,然后把提供相同功能的服务汇总一起,生成一个cluster.reload需要的table,然后上层业务根据需求进行(hash,失败retry)调用,针对进程内的服务想调用集群中的无状态服务则只需要通过一个功能名就可以发送业务,slave node会自动根据名字找到那些相同的服务,从而进行简单的微服务弱支持,全部都是在cluster基础上做一层业务包装,另外如果节点发生变化会通过监听来cluster.reload目前的服务,初步想法,请教各位是否可行。

terry8210 avatar Feb 09 '22 09:02 terry8210

我觉得既然参考了微服务设计方案;就应该淡化指向性定义(取消skynet name定义); 我们游戏的模式是一套skynet集群中运行一个项目(游戏) 现在的作法淡化skynet name;然后吧所有的(无法指定game上的;如:跨服聊天等)业务放在若干个skynet进程中,进程本身只关心启动的service和开放的ip:端口 服务发现时可以使用ip:port来做进程标记(这个是唯一的) 因为skynet本身不会告诉业务那些service过载 同时考虑到load level 的问题,于是我们重度依赖了debug 获取stat;于是采用save db来代替etcd(服务汇总&发现)

上层业务实现1个cluster wrapper 来服务 来代替cluster的直接调度(调度方式仅需保持和skynet.call/send 一致) wrapper中维持node valid(可以阻止node 出现过载时,业务持续输入导致情况恶化) & register route trick

目前尚存不足:

  1. 上层业务无法感知目标service是否存在,可能存在特定service 无任何可用节点;(如:特定节点过载后 无备用节点)
  2. 关联性service消息传输 是否也设计为跨节点(如:guild, guild_union)
  3. 消息回传不好实现(不期望上层业务判定回传是否正确;)

我们现在是结构是 玩家主体业务进程game(大/小服) + 玩家间需要共享的信息slave nodes(搭载如果个service 如:聊天,好友,公会,等等) 我们认为小服跨服玩法其实就是一个路由策略(game 制定策略保证相同slave node 即可) @cloudwu 对于 目前我遇到的问题2,3有什么比较好的实现建议嘛? 特别是消息回传,如果业务切换节点后(a节点过载),出现多个回传 b先至a后至这种情况时 如何较好的保证消息回传语义正确

fanyh avatar Feb 10 '22 03:02 fanyh

我觉得消息回传你可以考虑实现成 message queue 服务。在 mq 服务上创建一个 queue ,然后需要回传的消息 pub 回复消息进去,接收方可以 sub 这个 queue 。

cloudwu avatar Feb 10 '22 06:02 cloudwu

我使用集群的方式

  1. 采用原生的cluster集群
  2. 根据不同集群需求添加的不同集群路由管理服务, 如 分片集群(friend1,friend2), 均衡集群(gamemgr1,gamemgr2)
  3. api 如xxcluster.call("friend",...) 节点类型访问,及一些定制接口
  4. 调用消息 call->{cluster_type}_{node_type}_sender->clustersender
  5. 服务过载由源头和目标服务处理 a. 目标服务消息过载mqlen回复繁忙,拒绝请求 b. 源头对繁忙、错误的降级等策略处理
  6. agent -> cluster(node1, node2) 使用方式为agent的req/resp模式 即使有notify也转为req a. 如邮件,node1有新邮件 通知 agent b. agent收到通知,合适时机再发起拉邮件请求(1. 1个玩家同时一个拉邮件请求 2.拉不到业务结束, 3.错误则合适时机再请求)

问下

  1. 节点负载等级如何计算的啊? a. 通过shell ps 获取 cpu? b. 使用量化指标?如在线人数+最大人数 c. 关键服务的mqlen?

zeroatom avatar Feb 10 '22 07:02 zeroatom

我觉得消息回传你可以考虑实现成 message queue 服务。在 mq 服务上创建一个 queue ,然后需要回传的消息 pub 回复消息进去,接收方可以 sub 这个 queue 。

目前实现方案是按照mq的方式实现的;可能存在某些极限/边界情况下出现多个 slave node pub相同的消息到上层业务; 目前我想到的2种解决方案:

  1. zeroatom提到的sub响应的每个msg仅能作为notify,需要上层业务发起一次(上层业务执行判断实现需要发起)rpc才能取得最终数据
  2. 直接在上层业务的cluster wrapper里将pass掉的node 的 mq 监听 直接关掉;这个node 在他重启前将不在启用(有点简单粗暴)

但这样一来一些具备时效性的service就不太好设计(btw:谁知道策划会提出怎样的需求,难免会出现跨服务信息还具备时效性) 所有想问问有没有更好的解决方案(有点像并发事务冲突,但又无法使用加锁的等待事务处理完毕)

关于过载我这边实现策略大概是这样: 我单独启动了1个monitor 定时监听所有service 的stat(launcher.lua:STAT[, ti] 很好的帮我们避免了服务已过载的情况);同时将stat简单转换成1个ping信息(你期望的任何形式)保存下来 可以简单的设置ti来试图发现服务是否已过载 同时如果整个node连monitor的调度都无法执行(肯定已过载);那么存盘ping信息一定是一个超时信息

3层判断过载策略

  1. 判断mqlen是否过高 (这里可简单计算负载等级)
  2. 判断service stat 是否已经异常(即:timeout;error等)
  3. 判断上一次ping信息时间(即:判断node 是否假死)

过载和雪崩等都是一个主观的判断;没想到比较合适的量化标准; 如果是人数负载的话;我建议是加1个路由 通过连接数/权重 分配game(当然你也可以用你提到的人数/max的方式) service过载和role 过载处理完全不一样,我认为不因混为一谈 node 未codedump/oom前 只要服务能降级 一般自理都可以恢复(btw:恢复后也不建议复用了,maybe service很多中间状态都不对了,除非完全无状态业务,否则引入新问题更麻烦)

fanyh avatar Feb 10 '22 08:02 fanyh

这几天又想了想消息回传的问题 我最大的痛点在于 构架的调度器设计为 业务逻辑无需知晓节点调度情况 实际业务环境又可能需要干涉调度逻辑(例如:某些情况下就算node 假死也不切换node) 最终我还是打算给调度器开放一个register接口来注册路由策略;最终来决定是否熔断/retry 熔断后也就不考虑node restore的这种复杂情形;除非node restarted 这样一来我的很多问题也将将变得简单很多,对于使用者来说也不会造成太大额外负担(至少比处理预期外的pub msg简单不少)

fanyh avatar Feb 12 '22 04:02 fanyh

游戏业务大多带状态的 不建议设计这么复杂 越简单越好 互相影响的放一个进程比较好

sniper00 avatar Feb 12 '22 04:02 sniper00

我对集群包装没你那么重,暂时没有你消息回传的问题。我这边用集群是因为是全球服,方便业务扩展。

均衡集群为无状态节点集群 (login1..n) (gamemgr1...n) ...

  1. 多节点高可用
  2. 扩展功能负载

分片集群(类redis)为有状态节点集群 (friend1...n) ...

  1. 静态路由节点
  2. 节点down了也不切换节点, 除非由仲裁节点改静态配置
  3. friendx down了直接拉起
  4. firendx 启动过程中未完毕前,节点对外部节点的api都error
  5. 集群负载不够,添加节点,迁移slot

我这边req/resp的使用方式主要为了

  1. 集群节点业务独立,不与其他节点有关联,由请求方驱动业务。
  2. agent 与 战斗节点业务 a. 战斗节点推送 agent通知(send), agent down了后,cluster会疯狂connect。 b. 用 req/resp 战斗节点通知暂存战斗节点业务队列,agent req的时候推送,队列过长则认为agent down了。

问题:

  1. 像 agent和 战斗节点这种问题,有没有其他合适的处理方案? 如 a. 战斗对agent的cluster_sender 连接断开时派发节点内事件,让业务感知agent down了。(消息频率高,等心跳感知还是有大量connect) b. cluster_sender 发起connect的添加冷却时间?

zeroatom avatar Feb 12 '22 08:02 zeroatom

我们需要同时支持 全球服项目/分服项目 多种业务环境 目前这样做是为了能够更好的把项目运行在k8s中,尽可能多的利用k8s的特性同时减少开发组的制作难度

异常对开这种处理措施比较简单啊 在战斗节点侧 像云大侠说那样建立msg queue ,你吧msg queue 前面包一个heart_monitor; agent 侧保持ping ;这样逻辑就顺了,

我有点不太明白的是什么叫 等心跳感知还有大量的connect? 你的2个节点互发消息是建立的2个tcp? 如: agent call

   cluster.call("battle node", "battle service", ...)

battle push msg

   cluster.call("agent node", "agent service", ...)

这样?

fanyh avatar Feb 12 '22 08:02 fanyh

嗯,是两个tcp agent 侧 cluster.send("battle_node", "battle service", session_id, "client_battle_pkg") -- 请求 放技能 移动等

战斗 侧 cluster.send("agent node", "agent service", session_id, "battle_event_pkg") -- 实体状态变化 这样裸调,cluster底层 cluster_sender会不停connect agent node, 如心跳30,agent down了,战斗侧心跳到了会踢掉agent, 过程中还是不停send。

zeroatom avatar Feb 12 '22 08:02 zeroatom

这种我还是建议你实现个simple msg queue; battle service 需要向agent 投递消息 调 skynet.send("msg queue"); msg queue 再push 将消息push 到 agent (这样可以减少一些你调度1次就产生1个connected faild的情况) 在msg queue 里面处理链接异常 包括不限于retry 但是在heart timeout 前 connect 是 没办法停止的,只能说可以减少一点

当然我更推荐你 用一条tcp + msg queue 来解决这个问题 你仅需要修改下agent侧 监听消息的方式而已

fanyh avatar Feb 12 '22 08:02 fanyh

嗯,目前我就是用队列,因为队列是针对玩家的,不是节点的,当有agent节点有很多玩家在战斗,每个玩家都有send,感觉会狂刷日志。 所以我在考虑要不要在cluster_sender做些拦截及让cluster_sender抛出节点断开让业务感知。

zeroatom avatar Feb 12 '22 09:02 zeroatom

我明白你的意思了;但我觉得你始终需要再包装一次 或者你魔改一个你自己的cluster_sender; 你如果用云大侠提供的原神调度,你这个问题没办法避免

fanyh avatar Feb 12 '22 09:02 fanyh

嗯,是两个tcp agent 侧 cluster.send("battle_node", "battle service", session_id, "client_battle_pkg") -- 请求 放技能 移动等

战斗 侧 cluster.send("agent node", "agent service", session_id, "battle_event_pkg") -- 实体状态变化 这样裸调,cluster底层 cluster_sender会不停connect agent node, 如心跳30,agent down了,战斗侧心跳到了会踢掉agent, 过程中还是不停send。

建议魔改一下clusterd和cluster_sender,让 战斗 节点可以感知到 agent节点挂了

RockyDong avatar Mar 14 '22 03:03 RockyDong

借助cluster + etcd就可以很好实现了,不需要修改然后底层代码,上层做一个服务,做转发,能够达到同类服务的自动随机选择和或者负载选择等等,然后节点上线下线就自动去reload即可。

terry8210 avatar Apr 01 '22 10:04 terry8210

一个简单的业务状态的不建议设计比较复杂的越多过程

这个深有体会,我们的项目就是像楼主那样使用集群模式拆分了好多节点(类似微服务)导致了好多因为rpc调用结果异常的BUG,最后无奈重构成单节点的模式。

gtrs426 avatar Jul 08 '22 08:07 gtrs426

借助cluster + etcd就可以很好实现了,不需要修改然后底层代码,上层做一个服务,做转发,能够达到同类服务的自动随机选择和或者负载选择等等,然后节点上线下线就自动去reload即可。

节点上线下线自动去reload 是不是需要有一个中心来 广播 给他节点reload

CMencius avatar Oct 17 '22 08:10 CMencius

cluster 每次连其它节点时 先httpclient 请求 ip:port

sniper00 avatar Oct 17 '22 08:10 sniper00

借助cluster + etcd就可以很好实现了,不需要修改然后底层代码,上层做一个服务,做转发,能够达到同类服务的自动随机选择和或者负载选择等等,然后节点上线下线就自动去reload即可。

节点上线下线自动去reload 是不是需要有一个中心来 广播 给他节点reload

节点本身自己监听etcd的更新来reload即可,如果你说是中心广播,etcd就是中心

terry8210 avatar Oct 18 '22 04:10 terry8210

一个简单的业务状态的不建议设计比较复杂的越多过程

这个深有体会,我们的项目就是像楼主那样使用集群模式拆分了好多节点(类似微服务)导致了好多因为rpc调用结果异常的BUG,最后无奈重构成单节点的模式。

游戏不像互联网产品,高度集成的产品,基本游戏业务都在单节点处理,针对好友聊天需要用到的跨节点操作可以不处理异常行为,针对一些全区全服的跨服玩法,匹配后无非还是进入一个独立的空间进行,至于是本节点转发还是客户端主动连接独立空间,可以根据需求来确定。

terry8210 avatar Oct 18 '22 04:10 terry8210

目前采用cluster+etcd liker 组网已经过去几个月了,已经推广到多个项目中使用,虽然仍然存在部分问题,但在可接受范围内 考虑到很多业务场景实际上不期望知晓组网总节点数,仅关心与自己业务息息相关的node, 于是我们把etcd作为service并入了工程内,cluser server <->game server 通过etcd service的落地信息来同步node的上下线; 这里存在一个延时信息,我们在game etcd service中尽量的磨平了 延时对业务的负面影响;业务统一使用包装过的cluster proxyd 访问组网节点,当然也可以自己定向,不过需要自行处理掉线等问题。

在设计过程遇到最大的问题是还是cluster proxyd 和 etcd的维持上,整体来说这种组网模式对整个项目人员的要求会比直接使用定向业务组网这种方式高一些

表述得略显凌乱,终归来说这种组网模式的还是有可行性的,目前我们应用层产生的问题还是比较少;反而是cluster proxyd还有优化空间,对过载的保护做法略显粗暴,将就能用

待项目顺利上线一段时候后再将我们的组网结构呈现出来接受大家批评。

fanyh avatar Oct 18 '22 04:10 fanyh

一个简单的业务状态的不建议设计比较复杂的越多过程

这个深有体会,我们的项目就是像楼主那样使用集群模式拆分了好多节点(类似微服务)导致了好多因为rpc调用结果异常的BUG,最后无奈重构成单节点的模式。

游戏不像互联网产品,高度集成的产品,基本游戏业务都在单节点处理,针对好友聊天需要用到的跨节点操作可以不处理异常行为,针对一些全区全服的跨服玩法,匹配后无非还是进入一个独立的空间进行,至于是本节点转发还是客户端主动连接独立空间,可以根据需求来确定。

我们更多的是遇到了需要扁平处理的场景,同时不希望整个环境存在太多了,a,b,c...z功能性的服务器

fanyh avatar Oct 18 '22 05:10 fanyh