netman icon indicating copy to clipboard operation
netman copied to clipboard

高性能轻量级TCP、UDP框架、支持TLS、路由、中间件、WebSocket、事件循环(epoll),百万连接(C1000K)

目录

  • 目录
    • 介绍
    • 优势
    • 安装
    • 开始
    • Websocket
    • 中间件
    • 配置
      • Hooks
      • 心跳
      • 包体最大长度
      • TCP Keepalive
      • TLS
      • 自定义封包解包
      • 组合使用
    • 架构
    • 百万连接

介绍

  • 轻量的高性能TCP网络框架,基于epoll/kqueue,reactor模型实现
  • 简单的API,细节在框架内部实现,几行代码即可构建高性能的Server
  • 支持路由配置,更专注业务需求的处理,无需关心封包解包
  • 支持自定义封包格式,更灵活
  • 支持linux/macos,windows请在docker中运行
  • 支持TLS
  • 支持websocket
  • 中间件

优势

  • 非阻塞IO
  • 底层基于事件循环,在net包中,一个连接需要一个goroutine去维持,但netman基于事件循环则不需要,大大减少了内存的占用,在大量连接的场景下更为明显
  • 基于路由配置,业务层不关心封包解包的实现
  • 全局中间件、分组中间件
  • 经过测试在阿里云服务器(单机)上建立100万个连接(C1000K)的内存消耗在3.8GB左右

安装

  • 下载

    go get -u github.com/ikilobyte/netman
    
  • 导入

    import "github.com/ikilobyte/netman/server"
    

开始

server

  • 基本使用
    package main
    
    import "github.com/ikilobyte/netman/server"
    
    type Hello struct{}
    
    func (h *Hello) Do(request iface.IRequest) {
        message := request.GetMessage()
        connect := request.GetConnect()
        n, err := connect.Send(message.ID(), message.Bytes())
        fmt.Println("conn.send.n", n, "send err", err, "recv len()", message.Len())
    
        // 以下方式都可以获取到所有连接
        // 1、request.GetConnects()
        // 2、connect.GetConnectMgr().GetConnects()
    
        // 主动关闭连接
        // connect.Close()
    }
    
    func main() {
       s := server.New(
           "0.0.0.0",
           6565,
    
           // options 更多配置请看 #配置 文档
           server.WithMaxBodyLength(1024*1024*100), // 包体最大长度限制,0表示不限制
       )
    
       // add router 
    s.AddRouter(0, new(Hello))  // 设置消息ID为0的处理方法
        //s.AddRouter(1, new(xxx))  // ...
    
       s.Start()
    }
    

client

  • 示例

    package main
    
    import (
        "fmt"
        "io"
        "net"
        "os"
        "strings"
        "time"
    
        "github.com/ikilobyte/netman/util"
    )
    
    func main() {
        conn, err := net.Dial("tcp", "127.0.0.1:6565")
        if err != nil {
            panic(err)
        }
    
        // 用于消息的封包和解包,也可以自行实现封包解包规则
        packer := util.NewDataPacker()
    
        // 100MB
        c := strings.Repeat("a", 1024*1024*100)
        bs, err := packer.Pack(0, []byte(c))
    
        if err != nil {
            panic(err)
        }
    
        // 发送消息
        for {
            fmt.Println(conn.Write(bs))
            time.Sleep(time.Second * 1)
        }
    }
    
    
  • 接收消息

    // 备注:以下规则是框架默认实现的规则,你也可以自行实现,使用自己的 Packer 即可
    for {
        header := make([]byte, 8)
        n, err := io.ReadFull(conn, header)
        if n == 0 && err == io.EOF {
        	fmt.Println("连接已断开")
        	os.Exit(0)
        }
    
        if err != nil {
        	fmt.Println("read head bytes err", err)
        	os.Exit(1)
        }
    
        // 解包头部,会返回一个IMessage
        message, err := packer.UnPack(header)
        if err != nil {
        	fmt.Println("unpack err", err)
        	os.Exit(1)
        }
    
        // 创建一个和数据大小一样的bytes并读取
        dataBuff := make([]byte, message.Len())
        n, err = io.ReadFull(conn, dataBuff)
    
        if n == 0 && err == io.EOF {
        	fmt.Println("连接已断开")
        	os.Exit(0)
        }
    
        if err != nil {
        	fmt.Println("read dataBuff err", err, len(dataBuff[:n]))
        	os.Exit(1)
        }
    
        message.SetData(dataBuff)
        fmt.Printf(
            "recv msgID[%d] len[%d] %s\n",
            message.ID(),
            message.Len(),
            time.Now().Format("2006-01-02 15:04:05.000"),
        )
    }
    
    

Websocket

  • server
type Handler struct{}

// 连接建立
func (h *Handler) Open(connect iface.IConnect) {
	fmt.Println("onopen", connect.GetID())
}

// 消息到来时
func (h *Handler) Message(request iface.IRequest) {

	// 消息
	message := request.GetMessage()

	// 来自那个连接的
	connect := request.GetConnect()

	fmt.Printf("recv %s\n", message.String())

	// 普通文本格式
	fmt.Println(connect.Text([]byte(fmt.Sprintf("hi %s", message.Bytes()))))

	// 二进制格式
	//fmt.Println(connect.Binary([]byte("hi")))
}

// 连接关闭
func (h *Handler) Close(connect iface.IConnect) {
	fmt.Println("onclose", connect.GetID())
}

s := server.Websocket(
    "0.0.0.0",
    6565,
    new(Handler),   // websocket事件回调处理
)
  • client
  • 各语言的Websocket Client库即可,如Javascript的 new Websocket
  • client.html

中间件

  • 可被定义为全局中间件,和分组中间件,目前websocket只支持全局中间件
  • 配置中间件后,接收到的每条消息都会先经过中间件,再到达对应的消息回调函数
  • 中间件可提前终止执行
  • 定义中间件
    func demo1() iface.MiddlewareFunc {
        return func(ctx iface.IContext, next iface.Next) interface{} {
            fmt.Println("Front middleware")
            fmt.Println("ctx data", ctx.GetConnect(), ctx.GetRequest(), ctx.GetMessage())
    
            ctx.Set("key", "value")
            ctx.Set("now", time.Now().UnixNano())
            // 继续往下执行
            resp := next(ctx)
            fmt.Println("Rear middleware")
            return resp
        }
    }
    
    func demo2() iface.MiddlewareFunc {
        return func(ctx iface.IContext, next iface.Next) interface{} {
            fmt.Println(ctx.Get("key"),ctx.Get("now"))
            return next(ctx)
        }
    }   
    
    //authentication 这个用来做分组中间件
    var loginStore map[int]time.Time
    func authentication() iface.MiddlewareFunc {
        return func(ctx iface.IContext, next iface.Next) interface{} {
            conn := ctx.GetConnect()
            // 判断是否登录过,
            if _, ok := loginStore[conn.GetID()]; !ok {
                // 提前结束执行,不会到对应的Router
                _, _ = conn.Send(1, []byte("Authentication failed"))
                _ = conn.Close()
                return nil
            }
            // 继续执行
            return next(ctx)
        }
    }
    
    
  • 使用
    // 全局中间件
    s.Use(demo1())
    s.Use(demo2())
    
    // 分组,只有对应的路由才会执行
    g := s.Group(authentication())
    
        g.AddRouter(1, new(xxx))
        //g.AddRouter(2,new(xxx))
        //g.AddRouter(3,new(xxx))
        //g.AddRouter(4,new(xxx))
    
    

配置

  • 所有配置对 TcpServer(TLS)Websocket Server 都是生效的
  • 更多配置请查看 options.go

Hooks

  • Websocket也可以生效
type Hooks struct{}

// OnOpen 连接建立后回调
func (h *Hooks) OnOpen(connect iface.IConnect) {
    fmt.Printf("connId[%d] onOpen\n", connect.GetID())
}

// OnClose 连接成功关闭时回调
func (h *Hooks) OnClose(connect iface.IConnect) {
    fmt.Printf("connId[%d] onClose\n", connect.GetID())
}

s := server.New(
    "0.0.0.0",
    6565,
    
    // 配置 Hooks
    server.WithHooks(new(Hooks)),
)

心跳检测

  • 二者需要同时配置才会生效
s := server.New(
    "0.0.0.0",
    6565,
    
    // 表示60秒检测一次
    server.WithHeartbeatCheckInterval(time.Second*60), 
    
    // 表示一个连接如果180秒内未向服务器发送任何数据,此连接将被强制关闭
    server.WithHeartbeatIdleTime(time.Second*180),     
)

包体最大长度

s := server.New(
    "0.0.0.0",
    6565,
    
    // 0表示不限制长度
    // 这里配置的是100MB,当某条消息超过100MB时,会被拒绝处理
    server.WithMaxBodyLength(1024*1024*100),
)

TCP Keepalive

  • 参考:https://zh.wikipedia.org/wiki/Keepalive
s := server.New(
    "0.0.0.0",
    6565,
    
    server.WithTCPKeepAlive(time.Second*30),
)

TLS

tlsConfig := &tls.Config{
    Certificates: []tls.Certificate{...},
}

s := server.New(
    "0.0.0.0",
    6565,
    
    // 传入相关配置后,即可开启TLS
    server.WithTLSConfig(tlsConfig),
)

自定义封包解包

  • 为了更灵活的需求,可自定义封包解包规则,只需要使用IPacker接口即可
  • 配置
// IPacker 定义
type IPacker interface {
    Pack(msgID uint32, data []byte) ([]byte, error) // 封包
    UnPack([]byte) (IMessage, error)                // 解包
    SetMaxBodyLength(uint32)                        // 设置包体最大长度限制
    GetHeaderLength() uint32                        // 获取头部长度
}

type YouPacker struct {
    // implements IPacker
    // ... 
}
    
s := server.New(
    "0.0.0.0",
    6565,
    
    // 自定义Packer
    server.server.WithPacker(new(YouPacker)),
)

组合使用

s := server.New(
    "0.0.0.0",
    6565,
    server.WithNumEventLoop(runtime.NumCPU()*3),
    server.WithHooks(new(Hooks)),            // hook
    server.WithMaxBodyLength(0),             // 配置包体最大长度,默认为0(不限制大小)
    server.WithTCPKeepAlive(time.Second*30), // 设置TCPKeepAlive
    server.WithLogOutput(os.Stdout),         // 框架运行日志保存的地方
    server.WithPacker(new(YouPacker)),       // 可自行实现数据封包解包
    
    // 心跳检测机制,二者需要同时配置才会生效
    server.WithHeartbeatCheckInterval(time.Second*60), // 表示60秒检测一次
    server.WithHeartbeatIdleTime(time.Second*180),     // 表示一个连接如果180秒内未向服务器发送任何数据,此连接将被强制关闭
    
    // 开启TLS
    server.WithTLSConfig(tlsConfig),
)

s.Start()

架构

on

百万连接

  • 如看不到图片可以在examples目录中查看c1000k.png这张图片 c1000k

鸣谢

感谢 JetBrains 为此开源项目提供 GoLand 开发工具支持: