server-sdk-go
server-sdk-go copied to clipboard
bug msg"="could not negotiate" "error"="ICEAgent does not exist: unable to restart ICETransport
2024/05/15 19:34:47 "level"=0 "msg"="resuming connection..." "reconnectCount"=0 2024/05/15 19:34:50 "msg"="could not negotiate" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:34:50 "msg"="resume connection failed" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:34:50 "level"=0 "msg"="resuming connection..." "reconnectCount"=1 2024/05/15 19:34:58 "msg"="could not negotiate" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:34:58 "msg"="resume connection failed" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:34:58 "level"=0 "msg"="resuming connection..." "reconnectCount"=2 2024/05/15 19:35:06 "msg"="could not negotiate" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:35:06 "msg"="resume connection failed" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:35:07 "level"=0 "msg"="resuming connection..." "reconnectCount"=3 2024/05/15 19:35:14 "msg"="could not negotiate" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:35:14 "msg"="resume connection failed" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:35:17 "level"=0 "msg"="resuming connection..." "reconnectCount"=4 2024/05/15 19:35:22 "msg"="could not negotiate" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:35:22 "msg"="resume connection failed" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:35:26 "level"=0 "msg"="resuming connection..." "reconnectCount"=5 2024/05/15 19:35:29 "msg"="could not negotiate" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:35:29 "msg"="resume connection failed" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:35:37 "level"=0 "msg"="resuming connection..." "reconnectCount"=6 2024/05/15 19:35:37 "msg"="could not negotiate" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:35:37 "msg"="resume connection failed" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:35:48 "level"=0 "msg"="resuming connection..." "reconnectCount"=7 2024/05/15 19:35:48 "msg"="could not negotiate" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:35:48 "msg"="resume connection failed" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:36:03 "level"=0 "msg"="resuming connection..." "reconnectCount"=8 2024/05/15 19:36:03 "msg"="could not negotiate" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:36:03 "msg"="resume connection failed" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:36:22 "level"=0 "msg"="resuming connection..." "reconnectCount"=9 2024/05/15 19:36:22 "msg"="could not negotiate" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:36:22 "msg"="resume connection failed" "error"="ICEAgent does not exist: unable to restart ICETransport"
`package main
import ( "fmt" "github.com/google/gopacket" "github.com/google/gopacket/layers" lksdk "github.com/livekit/server-sdk-go/v2" "github.com/livekit/server-sdk-go/v2/common" "github.com/pkg/errors" "github.com/vishvananda/netlink" "gopkg.in/yaml.v2" "io" "os" "os/signal" "strings" "syscall" )
var ( device io.ReadWriteCloser topic string cidr string mtu int dst string identity string tunName string prefix string )
type Config struct {
LiveKit struct {
Host string yaml:"host"
APIKey string yaml:"apiKey"
APISecret string yaml:"apiSecret"
RoomName string yaml:"roomName"
} yaml:"livekit"
Participant struct {
Identity string yaml:"identity"
TunName string yaml:"tunName"
MTU int yaml:"mtu"
CIDR string yaml:"cidr"
Topic string yaml:"topic"
Dst string yaml:"dst"
} yaml:"participant"
}
func main() {
dir, _ := os.Getwd()
path := fmt.Sprintf("%s/%s", dir, "config.yaml")
file, err := os.ReadFile(path)
if err != nil {
panic(err)
}
t := Config{}
err = yaml.Unmarshal(file, &t)
if err != nil {
panic(err)
}
mtu = t.Participant.MTU
tunName = t.Participant.TunName
cidr = t.Participant.CIDR
topic = t.Participant.Topic
dst = t.Participant.Dst
identity = t.Participant.Identity
prefix = getPrefix(cidr)
//创建tun设备
device, _, err = common.CreateTunDevice(tunName, mtu)
if err != nil {
panic(err)
}
err = prepareInterface(&tunConfig{
InterfaceName: tunName,
InterfaceAddress: cidr,
InterfaceMTU: mtu,
})
if err != nil {
panic(err)
}
room, err := lksdk.ConnectToRoom(t.LiveKit.Host, lksdk.ConnectInfo{
APIKey: t.LiveKit.APIKey,
APISecret: t.LiveKit.APISecret,
RoomName: t.LiveKit.RoomName,
ParticipantIdentity: identity,
}, &lksdk.RoomCallback{
ParticipantCallback: lksdk.ParticipantCallback{
OnDataPacket: onDataPacket,
},
OnDisconnected: func() {
fmt.Println("disconnected")
},
})
if err != nil {
panic(err)
}
//传package
packets := make(chan []byte, mtu)
go func() {
for v := range packets {
data, err1 := filterPacket(v, prefix)
if err1 != nil {
fmt.Println("Error handling packet:", err1)
continue
}
err1 = room.LocalParticipant.PublishDataPacket(&lksdk.UserDataPacket{
Payload: data,
Topic: topic,
}, lksdk.WithDataPublishReliable(true),
lksdk.WithDataPublishDestination([]string{dst}))
if err1 != nil {
fmt.Println("error publishing data", err1)
continue
}
}
}()
//tun流量读取
go func() {
for {
frame := make([]byte, mtu)
_, err1 := device.Read(frame)
if err1 != nil {
fmt.Println("Error reading from TUN device:", err)
continue
}
packets <- frame
}
}()
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
<-sigs
room.Disconnect()
}
func filterPacket(packet []byte, prefix string) ([]byte, error) { var ip layers.IPv4
if err := ip.DecodeFromBytes(packet, gopacket.NilDecodeFeedback); err != nil {
return nil, errors.Wrap(err, "could not parse IP packet")
}
dst1 := ip.DstIP.String()
if !strings.Contains(dst1, prefix) {
return nil, errors.New("invalid destination")
}
//改成取cidr前2位
fmt.Println("dst1", dst1)
return packet, nil
}
// 收到的包都写入网卡里面 func onDataPacket(data lksdk.DataPacket, params lksdk.DataReceiveParams) { if data.ToProto().GetUser().GetTopic() == topic { a := data.ToProto().GetUser().GetPayload() if device == nil { fmt.Println("TUN device not initialized") return }
_, err := device.Write(a)
if err != nil {
fmt.Println("Error writing to TUN device:", err)
}
}
}
type tunConfig struct { InterfaceName string InterfaceAddress string InterfaceMTU int }
func prepareInterface(c *tunConfig) error { link, err := netlink.LinkByName(c.InterfaceName) if err != nil { return err }
addr, err := netlink.ParseAddr(c.InterfaceAddress)
if err != nil {
return err
}
err = netlink.LinkSetMTU(link, c.InterfaceMTU)
if err != nil {
return err
}
err = netlink.AddrAdd(link, addr)
if err != nil {
return err
}
err = netlink.LinkSetUp(link)
if err != nil {
return err
}
return nil
}
func getPrefix(cidr string) string {
ipParts := strings.Split(cidr, ".")
return strings.Join(ipParts[:2], ".")
}
`
livekit-server v1.6.1
docker run --rm
-p 7880:7880
-p 7881:7881
-p 7882:7882/udp
-v $PWD/livekit.yaml:/livekit.yaml
livekit/livekit-server
--config /livekit.yaml
--node-ip=xxxx