server-sdk-go icon indicating copy to clipboard operation
server-sdk-go copied to clipboard

bug msg"="could not negotiate" "error"="ICEAgent does not exist: unable to restart ICETransport

Open brucel11qwe opened this issue 9 months ago • 0 comments

2024/05/15 19:34:47 "level"=0 "msg"="resuming connection..." "reconnectCount"=0 2024/05/15 19:34:50 "msg"="could not negotiate" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:34:50 "msg"="resume connection failed" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:34:50 "level"=0 "msg"="resuming connection..." "reconnectCount"=1 2024/05/15 19:34:58 "msg"="could not negotiate" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:34:58 "msg"="resume connection failed" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:34:58 "level"=0 "msg"="resuming connection..." "reconnectCount"=2 2024/05/15 19:35:06 "msg"="could not negotiate" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:35:06 "msg"="resume connection failed" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:35:07 "level"=0 "msg"="resuming connection..." "reconnectCount"=3 2024/05/15 19:35:14 "msg"="could not negotiate" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:35:14 "msg"="resume connection failed" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:35:17 "level"=0 "msg"="resuming connection..." "reconnectCount"=4 2024/05/15 19:35:22 "msg"="could not negotiate" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:35:22 "msg"="resume connection failed" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:35:26 "level"=0 "msg"="resuming connection..." "reconnectCount"=5 2024/05/15 19:35:29 "msg"="could not negotiate" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:35:29 "msg"="resume connection failed" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:35:37 "level"=0 "msg"="resuming connection..." "reconnectCount"=6 2024/05/15 19:35:37 "msg"="could not negotiate" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:35:37 "msg"="resume connection failed" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:35:48 "level"=0 "msg"="resuming connection..." "reconnectCount"=7 2024/05/15 19:35:48 "msg"="could not negotiate" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:35:48 "msg"="resume connection failed" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:36:03 "level"=0 "msg"="resuming connection..." "reconnectCount"=8 2024/05/15 19:36:03 "msg"="could not negotiate" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:36:03 "msg"="resume connection failed" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:36:22 "level"=0 "msg"="resuming connection..." "reconnectCount"=9 2024/05/15 19:36:22 "msg"="could not negotiate" "error"="ICEAgent does not exist: unable to restart ICETransport" 2024/05/15 19:36:22 "msg"="resume connection failed" "error"="ICEAgent does not exist: unable to restart ICETransport"

`package main

import ( "fmt" "github.com/google/gopacket" "github.com/google/gopacket/layers" lksdk "github.com/livekit/server-sdk-go/v2" "github.com/livekit/server-sdk-go/v2/common" "github.com/pkg/errors" "github.com/vishvananda/netlink" "gopkg.in/yaml.v2" "io" "os" "os/signal" "strings" "syscall" )

var ( device io.ReadWriteCloser topic string cidr string mtu int dst string identity string tunName string prefix string )

type Config struct { LiveKit struct { Host string yaml:"host" APIKey string yaml:"apiKey" APISecret string yaml:"apiSecret" RoomName string yaml:"roomName" } yaml:"livekit" Participant struct { Identity string yaml:"identity" TunName string yaml:"tunName" MTU int yaml:"mtu" CIDR string yaml:"cidr" Topic string yaml:"topic" Dst string yaml:"dst" } yaml:"participant" }

func main() {

dir, _ := os.Getwd()
path := fmt.Sprintf("%s/%s", dir, "config.yaml")

file, err := os.ReadFile(path)
if err != nil {
	panic(err)

}
t := Config{}
err = yaml.Unmarshal(file, &t)
if err != nil {
	panic(err)

}
mtu = t.Participant.MTU
tunName = t.Participant.TunName
cidr = t.Participant.CIDR
topic = t.Participant.Topic
dst = t.Participant.Dst
identity = t.Participant.Identity
prefix = getPrefix(cidr)

//创建tun设备
device, _, err = common.CreateTunDevice(tunName, mtu)
if err != nil {
	panic(err)

}
err = prepareInterface(&tunConfig{
	InterfaceName:    tunName,
	InterfaceAddress: cidr,
	InterfaceMTU:     mtu,
})
if err != nil {
	panic(err)

}

room, err := lksdk.ConnectToRoom(t.LiveKit.Host, lksdk.ConnectInfo{
	APIKey:              t.LiveKit.APIKey,
	APISecret:           t.LiveKit.APISecret,
	RoomName:            t.LiveKit.RoomName,
	ParticipantIdentity: identity,
}, &lksdk.RoomCallback{
	ParticipantCallback: lksdk.ParticipantCallback{
		OnDataPacket: onDataPacket,
	},
	OnDisconnected: func() {
		fmt.Println("disconnected")

	},
})
if err != nil {
	panic(err)
}

//传package
packets := make(chan []byte, mtu)
go func() {
	for v := range packets {
		data, err1 := filterPacket(v, prefix)
		if err1 != nil {
			fmt.Println("Error handling packet:", err1)
			continue
		}
		err1 = room.LocalParticipant.PublishDataPacket(&lksdk.UserDataPacket{
			Payload: data,
			Topic:   topic,
		}, lksdk.WithDataPublishReliable(true),
			lksdk.WithDataPublishDestination([]string{dst}))

		if err1 != nil {
			fmt.Println("error publishing data", err1)
			continue
		}
	}
}()

//tun流量读取
go func() {
	for {
		frame := make([]byte, mtu)
		_, err1 := device.Read(frame)
		if err1 != nil {
			fmt.Println("Error reading from TUN device:", err)
			continue
		}

		packets <- frame
	}
}()

sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
<-sigs
room.Disconnect()

}

func filterPacket(packet []byte, prefix string) ([]byte, error) { var ip layers.IPv4

if err := ip.DecodeFromBytes(packet, gopacket.NilDecodeFeedback); err != nil {

	return nil, errors.Wrap(err, "could not parse IP packet")
}
dst1 := ip.DstIP.String()

if !strings.Contains(dst1, prefix) {
	return nil, errors.New("invalid destination")
}
//改成取cidr前2位

fmt.Println("dst1", dst1)

return packet, nil

}

// 收到的包都写入网卡里面 func onDataPacket(data lksdk.DataPacket, params lksdk.DataReceiveParams) { if data.ToProto().GetUser().GetTopic() == topic { a := data.ToProto().GetUser().GetPayload() if device == nil { fmt.Println("TUN device not initialized") return }

	_, err := device.Write(a)
	if err != nil {
		fmt.Println("Error writing to TUN device:", err)
	}
}

}

type tunConfig struct { InterfaceName string InterfaceAddress string InterfaceMTU int }

func prepareInterface(c *tunConfig) error { link, err := netlink.LinkByName(c.InterfaceName) if err != nil { return err }

addr, err := netlink.ParseAddr(c.InterfaceAddress)
if err != nil {
	return err
}

err = netlink.LinkSetMTU(link, c.InterfaceMTU)
if err != nil {
	return err
}

err = netlink.AddrAdd(link, addr)
if err != nil {
	return err
}

err = netlink.LinkSetUp(link)
if err != nil {
	return err
}
return nil

}

func getPrefix(cidr string) string {

ipParts := strings.Split(cidr, ".")

return strings.Join(ipParts[:2], ".")

} ` livekit-server v1.6.1 docker run --rm
-p 7880:7880
-p 7881:7881
-p 7882:7882/udp
-v $PWD/livekit.yaml:/livekit.yaml
livekit/livekit-server
--config /livekit.yaml
--node-ip=xxxx

brucel11qwe avatar May 15 '24 17:05 brucel11qwe