gomedia icon indicating copy to clipboard operation
gomedia copied to clipboard

Rtmp Client Fallback Stream

Open hasnhasan opened this issue 1 year ago • 1 comments

I am sending the incoming broadcast to the RTMP Server to another client. If the incoming broadcast to the Server is interrupted, I send the fallback video until the broadcast comes back. I get the following error.

[h264 @ 0x7f79a20414c0] cabac_init_idc 32 overflow [h264 @ 0x7f79a20414c0] decode_slice_header error [h264 @ 0x7f79a20414c0] no frame! [h264 @ 0x7f79b2046c80] co located POCs unavailable [NULL @ 0x7f79617045c0] illegal reordering_of_pic_nums_idc 4 [h264 @ 0x7f7982219d40] illegal modification_of_pic_nums_idc 4 [h264 @ 0x7f7982219d40] decode_slice_header error [h264 @ 0x7f7982219d40] no frame!

package main

import (
	"fmt"
	"io"
	"log"
	"net"
	"net/url"
	"os"
	"sync"

	"github.com/yapingcat/gomedia/go-codec"
	"github.com/yapingcat/gomedia/go-flv"
	"github.com/yapingcat/gomedia/go-rtmp"
)

type RTMPRelay struct {
	conn net.Conn
	server     *rtmp.RtmpServerHandle
	client     *rtmp.RtmpClient
	fallbackFLV string
	mutex      sync.Mutex
	isFallback bool
	lastPts, lastDts uint32
}

type Broadcast map[string]*RTMPRelay

var Broadcasts Broadcast = make(map[string]*RTMPRelay)
var mtx sync.Mutex

func (b *Broadcast) Find(name string) *RTMPRelay {
	mtx.Lock()
	defer mtx.Unlock()
	if p, found := (*b)[name]; found {
		return p
	} else {
		return nil
	}
}

func NewRtmpClient(rtmpUrl string) *rtmp.RtmpClient {
	u, err := url.Parse(rtmpUrl)
	if err != nil {
		panic(err)
	}
	host := u.Host
	if u.Port() == "" {
		host += ":1935"
	}
	clientConn, err := net.Dial("tcp4", host)
	if err != nil {
		fmt.Println("connect failed", err)
	}

	client := rtmp.NewRtmpClient(rtmp.WithComplexHandshake(), rtmp.WithEnablePublish())
	client.SetOutput(func(data []byte) error {
		_,err := clientConn.Write(data)
		return err
	})

	client.Start(rtmpUrl)
	go func ()  {
		buf := make([]byte, 4096)
		n := 0
		for err == nil {
			n, err = clientConn.Read(buf)
			if err != nil {
				continue
			}
			client.Input(buf[:n])
		}
	}()
	return client
}

func NewRTMPRelay(conn net.Conn) (*RTMPRelay, error) {
	relay := &RTMPRelay{}

	handle := rtmp.NewRtmpServerHandle()
	handle.OnPublish(func(app, streamName string) rtmp.StatusCode {
		relay = Broadcasts.Find(streamName)
		if relay == nil {
			relay = &RTMPRelay{
				conn: conn,
				server: handle,
				client: NewRtmpClient("rtmp://127.0.0.1/live/SyXTJSpx1e"),
				isFallback: false,
				fallbackFLV: "videos/test.flv",
				lastPts: uint32(0),
				lastDts: uint32(0),
			}
		} else {
			relay.stopFallback()
			relay.conn = conn
			relay.server = handle
		}
		Broadcasts[streamName] = relay
		return rtmp.NETSTREAM_PUBLISH_START
	})

	handle.SetOutput(func(b []byte) error {
			_, err := conn.Write(b)
			return err
	})

	handle.OnFrame(func(cid codec.CodecID, pts, dts uint32, frame []byte) {
		if cid == codec.CODECID_VIDEO_H264 || cid == codec.CODECID_VIDEO_H265 {
			relay.lastDts = dts
			relay.lastPts = pts
		}
		relay.client.WriteFrame(cid,frame,pts,dts)
	})

	go func ()  {
		buf := make([]byte, 4096)
		for {
			n, err := conn.Read(buf)
			if err == io.EOF {
				log.Println("Broadcast interrupted, switching to fallback")
				relay.startFallback()
				return
			}
			err = handle.Input(buf[0:n])
			if err != nil {
				log.Printf("Package reading error: %v", err)
				break
			}
		}
		conn.Close()
	}()

	return relay, nil
}

func (r *RTMPRelay) startFallback() {
	r.mutex.Lock()
	r.isFallback = true
	r.mutex.Unlock()
	
	f := flv.CreateFlvReader()

	_lastPts := uint32(0)
	_lastDts := uint32(0)

	f.OnFrame = func(cid codec.CodecID, frame []byte, pts, dts uint32) {
		if !r.isFallback {
			return
		}
		
		if cid == codec.CODECID_VIDEO_H264 || cid == codec.CODECID_VIDEO_H265 {
			pts += r.lastPts
			dts += r.lastDts
			_lastPts = pts
			_lastDts = dts
		}

		r.client.WriteFrame(cid,frame,pts,dts)
	}

	fd, _ := os.Open(r.fallbackFLV)
	defer fd.Close()
	cache := make([]byte, 4096)
	for {
		if !r.isFallback {
			return
		}
		n, err := fd.Read(cache)
		if err != nil {
			fmt.Println(err)
			break
		}
		f.Input(cache[0:n])
		//err = r.server.Input(cache[0:n])
	}
	r.mutex.Lock()
	r.lastDts = _lastDts
	r.lastPts = _lastPts
	r.mutex.Unlock()
}

func (r *RTMPRelay) stopFallback() {
	r.mutex.Lock()
	r.isFallback = false
	r.mutex.Unlock()
}


func main() {
	listen, _ := net.Listen("tcp4", ":1936")
	for {
		conn, _ := listen.Accept()
		go NewRTMPRelay(conn)
	}
}

hasnhasan avatar Oct 28 '24 17:10 hasnhasan

@yapingcat I think you need to change “picture numbers” in f.OnFrame. I would be very grateful if you could help me with this 🙏

hasnhasan avatar Oct 29 '24 12:10 hasnhasan

If we don't send the metadata, there is no problem. I solved it this way.

hasnhasan avatar Nov 28 '24 12:11 hasnhasan