server-sdk-go icon indicating copy to clipboard operation
server-sdk-go copied to clipboard

Packet loss seriously over 200 participants in one room

Open ChenMoGe2 opened this issue 3 years ago • 7 comments

Describe the bug I use server-sdk-go write a load test program,and create one publisher send video track from a VP8 codec video file in machine 1.Next I create one subscriber which save remote video on disk in machine 2,Then I create 200 subscribers which not save remote video in machine 3.Finally I download the video file from machine 2 and the video frame loss seriously.

Server

  • Version: [0.15.1]
  • Environment: [local dev]
  • Cpu: 32 core, 2.10GHz
  • Mem: 32GB
  • Bandwidth: 10Gb

Client

  • SDK: [go]
  • Version: [0.7.4]

To Reproduce Steps to reproduce the behavior:

  1. 200 clients are connected to room concurrent on machine 1
  2. 1 client is connected to the same room and record the video received from publisher on machine 2
  3. 1 client is connected to the same room and publish the video on machine 3
  4. Download the record video from machine 2 and play it.
  5. The Cpu of server just uses 5 cores and the Mem just use 2.5GB and the bandwidth just uses 201Mb.(If 100 clients on machine 1,the bandwidth uses 146Mb)

Expected behavior The video not frame loss.

Screenshots image

Additional context Go mod

module livekit-server-sdk-sample

go 1.16

require (
	github.com/livekit/protocol v0.9.11
	github.com/livekit/server-sdk-go v0.7.4
	github.com/pion/webrtc/v3 v3.0.32
)

Load test program code

package main

import (
	"context"
	"flag"
	"fmt"
	livekit "github.com/livekit/protocol/proto"
	lksdk "github.com/livekit/server-sdk-go"
	"github.com/pion/webrtc/v3"
	"github.com/pion/webrtc/v3/pkg/media/ivfwriter"
	"log"
	"math/rand"
	"os"
	"os/signal"
	"sync/atomic"
	"syscall"
	"time"
)

var (
	subCount int64
	onSubCount int64
	videoPath string
	savePath string
	pub bool
	save bool
	roomNum int
	pubNum int
	subNum int
	retryNum int
	retrySleepNum int
)

func init() {
	flag.StringVar(&videoPath, "videoPath", "./video.ivf", "video file path")
	flag.StringVar(&savePath, "savePath", "./save", "save video file path")
	flag.BoolVar(&pub, "pub", false, "is publisher")
	flag.BoolVar(&save, "save", false, "save video")
	flag.IntVar(&roomNum, "roomNum", 1, "number of create room")
	flag.IntVar(&pubNum, "pubNum", 1, "number of publisher in per room")
	flag.IntVar(&subNum, "subNum", 1, "number of subscriber in per room")
	flag.IntVar(&retryNum, "retryNum", 5, "retry number when connect to room failed")
	flag.IntVar(&retrySleepNum, "retrySleepNum", 10, "retry sleep when connect to room failed")
}

func main() {
	flag.Parse()
	httpHost := "https://xxxx.xxxx.com"
	wsHost := "wss://xxxx.xxxx.com"
	apiKey := "API4TNGcc8K8KXt"
	apiSecret := "qDTSklbwfwpEv3y8vhq6emRdnud5hLvbUilmxvz4Bva"
	roomClient := lksdk.NewRoomServiceClient(httpHost, apiKey, apiSecret)

	for r := 1; r <= roomNum; r = r + 1 {
		roomName := fmt.Sprintf("%s:%d", "room:", r)
		_, err := roomClient.CreateRoom(context.Background(), &livekit.CreateRoomRequest{
			Name: roomName,
		})
		if err != nil {
			log.Println("CreateRoom failed ", err)
			continue
		}

		if pub {
			for p := 1; p <= pubNum; p = p + 1 {
				go pubWorker(wsHost, apiKey, apiSecret, roomName, r, p, retryNum)
			}
		} else {
			for s := 1; s <= subNum; s = s + 1 {
				go subWorker(wsHost, apiKey, apiSecret, roomName, r, s, retryNum)
			}
		}
	}

	done := make(chan os.Signal, 1)
	signal.Notify(done, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
	<-done
}

func pubWorker(host string, apiKey string, apiSecret string, roomName string, r int, p int, retry int) {
	identity := fmt.Sprintf("%s-%d", "pub", rand.Int63())
	room, err := lksdk.ConnectToRoom(host, lksdk.ConnectInfo{
		APIKey:              apiKey,
		APISecret:           apiSecret,
		RoomName:            roomName,
		ParticipantIdentity: identity,
	})
	if err != nil {
		if retry > 0 {
			log.Println("ConnectToRoom failed and retry ", err)
			time.Sleep(time.Duration(retrySleepNum) * time.Second)
			pubWorker(host, apiKey, apiSecret, roomName, r, p, retry - 1)
		}
		log.Println("ConnectToRoom failed", err)
		return
	}

	file := videoPath
	track, err := lksdk.NewLocalFileTrack(file,
		lksdk.FileTrackWithFrameDuration(33 * time.Millisecond),
		lksdk.FileTrackWithOnWriteComplete(func() {
			fmt.Println("track finished")
		}),
	)

	if err != nil {
		log.Println("NewLocalFileTrack failed ", err)
		return
	}

	if _, err = room.LocalParticipant.PublishTrack(track, file); err != nil {
		log.Println("PublishTrack failed ", err)
		return
	}
}

func subWorker(host string, apiKey string, apiSecret string, roomName string, r int, s int, retry int) {
	identity := fmt.Sprintf("%s-%d", "sub", rand.Int63())
	room, err := lksdk.ConnectToRoom(host, lksdk.ConnectInfo{
		APIKey:              apiKey,
		APISecret:           apiSecret,
		RoomName:            roomName,
		ParticipantIdentity: identity,
	})
	if err != nil {
		if retry > 0 {
			log.Println("ConnectToRoom failed and retry ", err)
			time.Sleep(time.Duration(retrySleepNum) * time.Second)
			subWorker(host, apiKey, apiSecret, roomName, r, s, retry - 1)
		}
		log.Println("ConnectToRoom failed", err)
		return
	}
	room.Callback.OnTrackSubscribed = func(track *webrtc.TrackRemote, publication lksdk.TrackPublication, rp *lksdk.RemoteParticipant) {
		atomic.AddInt64(&onSubCount, 1)
		fmt.Println("track subscribed", "pub", rp.Identity(), "sub", identity, "onSubCount", onSubCount)
		var (
			ivfFile *ivfwriter.IVFWriter
			err error
		)

		if save {
			ivfName := fmt.Sprintf("%s/%s.%s", savePath, identity, "ivf")
			ivfFile, err = ivfwriter.New(ivfName)
			if err != nil {
				log.Println("IVFWriter New failed", err)
				return
			}
		}

		for {
			rtpPacket, _, err := track.ReadRTP()
			if err != nil {
				continue
			}

			if save {
				if err := ivfFile.WriteRTP(rtpPacket); err != nil {
					continue
				}
			}
		}
	}

	atomic.AddInt64(&subCount, 1)
	fmt.Println("JoinRoom success", "subCount", subCount)
}

cmd server machine

livekit-server --dev --node-ip=xxxx.xxxx.com

machine 1

./loadTest -pub -pubNum=1

and need video.ivf file at same path machine 2

./loadTest -save -subNum=1

and need ./save dictionary at same path machine 3

./loadTest -subNum=200

ChenMoGe2 avatar Dec 28 '21 09:12 ChenMoGe2

I would not recommend load testing this way. When publishing from a static file, you are not dealing with PLIs or NACKs effectively. This means your video streams will become corrupted when there's any packet loss.

To get an accurate simulation of load, please refer to our chrometester for publishing, where it could react to re-transmissions and NACKs.

With that said, I think our SDK on the subscriber side could handle packet loss/NACKs better. I'll file an issue for that.

davidzhao avatar Dec 28 '21 21:12 davidzhao

Filed issue #26, which would improve handling of loss on the receiver side.

davidzhao avatar Dec 28 '21 21:12 davidzhao

@davidzhao Thank you very much,I will try it

ChenMoGe2 avatar Dec 29 '21 03:12 ChenMoGe2

@davidzhao Hello I use chrometest successfully,but it has two new problem 1.The chrometest use too much cpu resources much more than server,and how can I start 2000+ subscriber instances 2.When the subscriber much more 100,the live stream frame loss seriously(audio is ok).I use LAN and the bandwidth is 10Gb,and now just use 100Mb.And the server's cpu just use little.

ChenMoGe2 avatar Dec 29 '21 09:12 ChenMoGe2

@ChenMoGe2

Chrometester should be run on a container orchestrator, like Kubernetes. In a cloud environment it should be straight forward to spin up many instances of it.

What do you mean by "frame loss"? If it's video corruption, then it's likely related to the client/chrome side. If you exhaust the CPUs on Chrome, packet loss will increase and video will suffer. I would recommend 0.5 CPU per chrometester instance

davidzhao avatar Dec 30 '21 07:12 davidzhao

@davidzhao Thank you I just want to start more chrometesters.If per chrometester uses 0.5 CPU,it will be 1000 CPU used if I start 2000 chrometester.Is one subscriber uses one chrometester instance?Or if one chrometester can start more subscribers,how I config it?

ChenMoGe2 avatar Dec 30 '21 07:12 ChenMoGe2

I think there's a way to make it create multiple tabs. Take a look at the repo, it's a puppeteer script . Feel free to make a PR if you are up for it!

davidzhao avatar Jan 03 '22 04:01 davidzhao