gf icon indicating copy to clipboard operation
gf copied to clipboard

带缓冲的tcp网络通信example

Open ywanbing opened this issue 5 years ago • 1 comments

server.go

package main

import (
	"errors"
	"fmt"
	"github.com/gogf/gf/net/gtcp"
	"net"
	"strings"
	"time"
)

type Connect struct {
	Conn     *gtcp.Conn
	recvChan AnyChan
}

func main() {

	// Server
	gtcp.NewServer("127.0.0.1:8999", func(conn *gtcp.Conn) {
		defer conn.Close()
		Conn := new(Connect)
		Conn.Conn = conn
		Conn.recvChan = make(AnyChan, 100)
		go Conn.Recv()
		for {
			re, err := Conn.recvChan.Read(time.Second)
			if err == CHERR_EMPTY || err == CHERR_TIMEOUT {
				fmt.Println("not data ...")
				continue
			} else if err != nil {
				fmt.Println(err)
				break
			}
			data := re.([]byte)
			fmt.Println(string(data))
		}
	}).Run()
}

func (c *Connect) Recv() {
	//关闭通道
	defer close(c.recvChan)
	for {
		data, err := c.Conn.RecvPkg()
		if err != nil {
			if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
				fmt.Println("timeout")
				continue
			} else {
				fmt.Println(err)
				break
			}
		}
		err = c.recvChan.Write(data, time.Millisecond)
		if err != nil {
			fmt.Println(err)
			break
		}
	}
}

var CHERR_CLOSED = errors.New("channel is closed")
var CHERR_TIMEOUT = errors.New("channel ops timeout")
var CHERR_EMPTY = errors.New("channel is empty")

type AnyChan chan interface{}

// 从管道中读取数据,返回CHERR_CLOSED表示管道已经被关闭,返回CHERR_TIMEOUT表示
// 操作超时,返回CHERR_EMPTY表示管道是空的
//	t: 操作超时值,为0表示在没有数据可读时立刻返回错误,否则等待特定时间直至超时
func (ch AnyChan) Read(t time.Duration) (re interface{}, err error) {
	defer func() {
		if err := recover(); err != nil {
			err = errors.New("read channel panic")
		}
	}()

	re = nil
	err = nil

	if t == 0 {
		select {
		case r, ok := <-ch:
			if !ok {
				// 管道已关闭
				err = CHERR_CLOSED
			} else {
				re = r
			}
		default:
			// 没有数据可读
			err = CHERR_EMPTY
		}
	} else {
		select {
		case r, ok := <-ch:
			if !ok {
				// 管道已关闭
				err = CHERR_CLOSED
			} else {
				re = r
			}
		case <-time.After(t):
			// 操作超时
			err = CHERR_TIMEOUT
		}
	}

	return
}

// 向管道写入数据,返回CHERR_CLOSED表示管道已经被关闭,返回CHERR_TIMEOUT表示操作超时
//	v: 向管道写入的变量
//	t: 操作超时值,为0表示一直等待直至操作完成,否则等待特定时间直至超时
func (ch AnyChan) Write(v interface{}, t time.Duration) (err error) {
	defer func() {
		if e := recover(); e != nil {
			str := fmt.Sprintf("%v", e)
			if strings.Contains(str, "closed channel") {
				err = CHERR_CLOSED
			} else {
				err = errors.New("write channel panic")
			}
		}
	}()

	err = nil
	if t == 0 {
		ch <- v
		err = nil
	} else {
		select {
		case ch <- v:
			err = nil
		case <-time.After(t):
			err = CHERR_TIMEOUT
		}
	}
	return
}

client.go

package main

import (
	"github.com/gogf/gf/net/gtcp"
	"github.com/gogf/gf/os/glog"
	"github.com/gogf/gf/util/gconv"
	"time"
)

func main() {
	// Client
	conn, err := gtcp.NewConn("127.0.0.1:8999")
	if err != nil {
		panic(err)
	}
	defer conn.Close()
	for i := 0; i < 3; i++ {
		if err := conn.SendPkg([]byte(gconv.String(i))); err != nil {
			glog.Error(err)
		}
		time.Sleep(time.Second)
	}
}

ywanbing avatar Jan 03 '20 09:01 ywanbing

server.go

package main

import (
	"errors"
	"fmt"
	"github.com/gogf/gf/net/gtcp"
	"net"
	"strings"
	"time"
)

type Connect struct {
	Conn     *gtcp.Conn
	recvChan AnyChan
}

func main() {

	// Server
	gtcp.NewServer("127.0.0.1:8999", func(conn *gtcp.Conn) {
		defer conn.Close()
		Conn := new(Connect)
		Conn.Conn = conn
		Conn.recvChan = make(AnyChan, 100)
		go Conn.Recv()
		for {
			re, err := Conn.recvChan.Read(time.Second)
			if err == CHERR_EMPTY || err == CHERR_TIMEOUT {
				fmt.Println("not data ...")
				continue
			} else if err != nil {
				fmt.Println(err)
				break
			}
			data := re.([]byte)
			fmt.Println(string(data))
		}
	}).Run()
}

func (c *Connect) Recv() {
	//关闭通道
	defer close(c.recvChan)
	for {
		data, err := c.Conn.RecvPkg()
		if err != nil {
			if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
				fmt.Println("timeout")
				continue
			} else {
				fmt.Println(err)
				break
			}
		}
		err = c.recvChan.Write(data, time.Millisecond)
		if err != nil {
			fmt.Println(err)
			break
		}
	}
}

var CHERR_CLOSED = errors.New("channel is closed")
var CHERR_TIMEOUT = errors.New("channel ops timeout")
var CHERR_EMPTY = errors.New("channel is empty")

type AnyChan chan interface{}

// 从管道中读取数据,返回CHERR_CLOSED表示管道已经被关闭,返回CHERR_TIMEOUT表示
// 操作超时,返回CHERR_EMPTY表示管道是空的
//	t: 操作超时值,为0表示在没有数据可读时立刻返回错误,否则等待特定时间直至超时
func (ch AnyChan) Read(t time.Duration) (re interface{}, err error) {
	defer func() {
		if err := recover(); err != nil {
			err = errors.New("read channel panic")
		}
	}()

	re = nil
	err = nil

	if t == 0 {
		select {
		case r, ok := <-ch:
			if !ok {
				// 管道已关闭
				err = CHERR_CLOSED
			} else {
				re = r
			}
		default:
			// 没有数据可读
			err = CHERR_EMPTY
		}
	} else {
		select {
		case r, ok := <-ch:
			if !ok {
				// 管道已关闭
				err = CHERR_CLOSED
			} else {
				re = r
			}
		case <-time.After(t):
			// 操作超时
			err = CHERR_TIMEOUT
		}
	}

	return
}

// 向管道写入数据,返回CHERR_CLOSED表示管道已经被关闭,返回CHERR_TIMEOUT表示操作超时
//	v: 向管道写入的变量
//	t: 操作超时值,为0表示一直等待直至操作完成,否则等待特定时间直至超时
func (ch AnyChan) Write(v interface{}, t time.Duration) (err error) {
	defer func() {
		if e := recover(); e != nil {
			str := fmt.Sprintf("%v", e)
			if strings.Contains(str, "closed channel") {
				err = CHERR_CLOSED
			} else {
				err = errors.New("write channel panic")
			}
		}
	}()

	err = nil
	if t == 0 {
		ch <- v
		err = nil
	} else {
		select {
		case ch <- v:
			err = nil
		case <-time.After(t):
			err = CHERR_TIMEOUT
		}
	}
	return
}

client.go

package main

import (
	"github.com/gogf/gf/net/gtcp"
	"github.com/gogf/gf/os/glog"
	"github.com/gogf/gf/util/gconv"
	"time"
)

func main() {
	// Client
	conn, err := gtcp.NewConn("127.0.0.1:8999")
	if err != nil {
		panic(err)
	}
	defer conn.Close()
	for i := 0; i < 3; i++ {
		if err := conn.SendPkg([]byte(gconv.String(i))); err != nil {
			glog.Error(err)
		}
		time.Sleep(time.Second)
	}
}

ywanbing avatar Jan 03 '20 09:01 ywanbing