webrtc icon indicating copy to clipboard operation
webrtc copied to clipboard

Communication with webrtc native using data channel is slow

Open FH0 opened this issue 2 years ago • 1 comments

We use webrtc native on ios and android platforms. Below are the issue we found. For this, we had to use webrtc native in our go project.

pion/webrtc/v3 v3.1.43 webrtc native latest (I don't know how to get the version) WSL2(Utuntu-20.04)

speed unstable

iperf3 client -> pion/webrtc (go) -> webrtc native (c++) -> iperf3 server

image

go code
package main

import (
	"encoding/base64"
	"encoding/json"
	"errors"
	"flag"
	"fmt"
	"io"
	"net"

	"github.com/pion/webrtc/v3"
)

/*
iperf3 -s
go run ./build/ -listen=:5555
iperf3 -c 127.0.0.1 -p 5555 -b 500M
*/
func main() {
	listenAddress := flag.String("listen", ":0", "address to listen on")
	flag.Parse()

	// 创建 peerConnection
	config := webrtc.Configuration{}
	var err error
	peerConnection, err := webrtc.NewPeerConnection(config)
	if err != nil {
		panic(err)
	}

	// 注册 peerConnection 回调
	peerConnection.OnICEConnectionStateChange(func(is webrtc.ICEConnectionState) {
		fmt.Println("peerConnection.OnICEConnectionStateChange: ", is.String())
	})
	peerConnection.OnNegotiationNeeded(func() {
		fmt.Println("peerConnection.OnNegotiationNeeded")

		offer, err := peerConnection.CreateOffer(nil)
		if err != nil {
			panic(err)
		}
		err = peerConnection.SetLocalDescription(offer)
		if err != nil {
			panic(err)
		}
	})
	peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) {
		// 跳过,直到最后一次
		if i != nil {
			return
		}

		// 发送 offer
		offer := peerConnection.LocalDescription()
		offerBytes, err := json.Marshal(offer)
		if err != nil {
			panic(err)
		}
		offerBase64 := base64.StdEncoding.EncodeToString(offerBytes)
		fmt.Println(offerBase64)

		// 接收 answer
		fmt.Println("please enter answer")
		var answerBase64 string
		fmt.Scanln(&answerBase64)
		answerBytes, err := base64.StdEncoding.DecodeString(answerBase64)
		if err != nil {
			panic(err)
		}
		var answer webrtc.SessionDescription
		err = json.Unmarshal(answerBytes, &answer)
		if err != nil {
			panic(err)
		}
		err = peerConnection.SetRemoteDescription(answer)
		if err != nil {
			panic(err)
		}
	})

	// 监听端口
	listener, err := net.Listen("tcp", *listenAddress)
	if err != nil {
		panic(err)
	}

	for {
		conn, err := listener.Accept()
		if err != nil {
			panic(err)
		}

		// 创建 dataChannel
		dataChan, err := peerConnection.CreateDataChannel(conn.RemoteAddr().String(), nil)
		if err != nil {
			panic(err)
		}
		dataChan.OnClose(func() {
			fmt.Println("dataChan.OnClose")
		})
		dataChan.OnOpen(func() {
			fmt.Printf("Data channel '%s'-'%d' open.\n", dataChan.Label(), dataChan.ID())

			// 转发数据
			buf := make([]byte, 8*1024)
			for {
				nread, err := conn.Read(buf)
				if nread > 0 {
					err = dataChan.Send(buf[:nread])
					if err != nil {
						panic(err)
					}
				}
				if err != nil {
					if errors.Is(err, io.EOF) {
						return
					}
					panic(err)
				}
			}
		})
		dataChan.OnMessage(func(msg webrtc.DataChannelMessage) {
			// 转发数据
			_, err = conn.Write(msg.Data)
			if err != nil {
				panic(err)
			}
		})
	}
}
c++ code
#include <iostream>

#include <api/create_peerconnection_factory.h>
#include <boost/asio.hpp>
#include <boost/beast/core/detail/base64.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>

using namespace std;
using namespace webrtc;
using namespace rtc;
using namespace boost::beast::detail;
using namespace boost::property_tree;
using namespace boost::asio;
using namespace boost::asio::ip;

class DummySetSessionDescriptionObserver
    : public SetSessionDescriptionObserver {
  public:
    static DummySetSessionDescriptionObserver *Create() {
        return new RefCountedObject<DummySetSessionDescriptionObserver>();
    }
    virtual void OnSuccess() {}
    virtual void OnFailure(RTCError error) { assert(false); }
};

class DataChannelObserver : public webrtc::DataChannelObserver {
  public:
    DataChannelObserver(scoped_refptr<DataChannelInterface> dataChannel,
                        io_context &ioc)
        : dataChannel(dataChannel), ioc(ioc) {}

  protected:
    void OnStateChange() {
        cout << "OnStateChange: " << dataChannel->state() << endl;

        if (dataChannel->state() == DataChannelInterface::kOpen) {
            tcp::socket tcpSocket(ioc);
            tcpSocket.async_connect(
                tcp::endpoint(ip::make_address("127.0.0.1"), 5201),
                [this](boost::system::error_code ec) {
                    if (ec) {
                        cout << ec.message() << endl;
                        return;
                    }
                    doRead();
                });
            this->tcpSocket =
                shared_ptr<tcp::socket>(new tcp::socket(move(tcpSocket)));
        } else if (dataChannel->state() == DataChannelInterface::kClosed) {
            delete (this);
        }
    }

    void OnMessage(const DataBuffer &buffer) {
        async_write(*tcpSocket,
                           boost::asio::buffer(buffer.data.data(), buffer.size()),
                           [this](boost::system::error_code ec, size_t bytesTransferred) {
                               if (ec) {
                                   cout << ec.message() << endl;
                                   return;
                               }
                           });
    }

  private:
    scoped_refptr<DataChannelInterface> dataChannel;
    io_context &ioc;
    shared_ptr<tcp::socket> tcpSocket;
    vector<char> tcpSocketReadBuf = vector<char>(8 * 1024);

    void doRead() {
        tcpSocket->async_read_some(
            buffer(tcpSocketReadBuf.data(),
                   tcpSocketReadBuf.size()),
            [this](boost::system::error_code ec, size_t nread) {
                if (ec) {
                    cout << ec.message() << endl;
                    return;
                }
                dataChannel->Send(DataBuffer(CopyOnWriteBuffer(tcpSocketReadBuf.data(), nread), true));
                doRead();
            });
    }
};

class Client : public PeerConnectionObserver,
               public CreateSessionDescriptionObserver {
  public:
    scoped_refptr<PeerConnectionInterface> peerConnection;
    io_context &ioc;
    unique_ptr<Thread> signalingThread;

    Client(io_context &ioc) : ioc(ioc) {
        // 创建 peerConnection
        signalingThread = Thread::Create();
        signalingThread->Start();
        PeerConnectionFactoryDependencies dependencies;
        dependencies.signaling_thread = signalingThread.get();
        auto peerConnectionFactory =
            CreateModularPeerConnectionFactory(move(dependencies));
        PeerConnectionInterface::RTCConfiguration configuration;
        PeerConnectionDependencies connectionDependencies(this);
        auto peerConnectionOrError =
            peerConnectionFactory->CreatePeerConnectionOrError(
                configuration, move(connectionDependencies));
        if (!peerConnectionOrError.ok()) {
            cout << "!peerConnectionOrError.ok()" << endl;
            exit(1);
        }
        peerConnection = peerConnectionOrError.MoveValue();
    }

    void start() {
        // 读取 offer
        cout << "please enter offer" << endl;
        string offerBase64;
        getline(cin, offerBase64);
        string offerStr(base64::decoded_size(offerBase64.size()), ' ');
        auto decodePair = base64::decode((void *)offerStr.data(),
                                         offerBase64.c_str(), offerBase64.size());
        offerStr.resize(decodePair.first);
        stringstream offerStream;
        offerStream << offerStr;
        ptree pt;
        read_json(offerStream, pt);
        auto sdpStr = pt.get<string>("sdp");
        auto sdp = CreateSessionDescription(SdpType::kOffer, sdpStr);
        peerConnection->SetRemoteDescription(
            DummySetSessionDescriptionObserver::Create(), sdp.release());
        PeerConnectionInterface::RTCOfferAnswerOptions options;
        peerConnection->CreateAnswer(this, options);
    }

  protected:
    /*
    PeerConnectionObserver
    */

    void OnSignalingChange(PeerConnectionInterface::SignalingState new_state) {
        cout << "OnSignalingChange: "
             << PeerConnectionInterface::AsString(new_state) << endl;
    }

    void OnDataChannel(scoped_refptr<DataChannelInterface> data_channel) {
        cout << "OnDataChannel: " << data_channel->label() << endl;
        auto dataChannelObserver = new ::DataChannelObserver(data_channel, ioc);
        data_channel->RegisterObserver(dataChannelObserver);
    }

    void OnNegotiationNeededEvent(uint32_t event_id) {
        cout << "OnNegotiationNeededEvent: " << event_id << endl;
    }

    void
    OnIceConnectionChange(PeerConnectionInterface::IceConnectionState new_state) {
        cout << "OnIceConnectionChange: "
             << PeerConnectionInterface::AsString(new_state) << endl;
    }

    void
    OnConnectionChange(PeerConnectionInterface::PeerConnectionState new_state) {
        cout << "OnConnectionChange: "
             << PeerConnectionInterface::AsString(new_state) << endl;
    }

    void
    OnIceGatheringChange(PeerConnectionInterface::IceGatheringState new_state) {
        cout << "OnIceGatheringChange: "
             << PeerConnectionInterface::AsString(new_state) << endl;
    }

    void OnIceCandidate(const IceCandidateInterface *candidate) {
        cout << "OnIceCandidate" << endl;
    }

    /*
    CreateSessionDescriptionObserver
    */

    void OnSuccess(SessionDescriptionInterface *desc) {
        cout << "OnSuccess" << endl;

        if (desc->GetType() == SdpType::kAnswer) {
            peerConnection->SetLocalDescription(
                DummySetSessionDescriptionObserver::Create(), desc);
            desc = const_cast<SessionDescriptionInterface *>(
                peerConnection->local_description());
            string sdpStr;
            desc->ToString(&sdpStr);
            stringstream answerStream;
            ptree pt;
            pt.add("type", "answer");
            pt.add("sdp", sdpStr);
            write_json(answerStream, pt, false);
            string answerBase64(base64::encoded_size(answerStream.str().size()), ' ');
            auto nwrite = base64::encode(answerBase64.data(),
                                         (void *)answerStream.str().c_str(),
                                         answerStream.str().size());
            answerBase64.resize(nwrite);
            cout << answerBase64 << endl;
        }
    }

    void OnFailure(RTCError error) {
        cout << "OnFailure: " << error.message() << endl;
    }
};

/*
clear
g++ -o server -g -ggdb \
    server.cpp \
    -I/usr/local/include/webrtc \
    -I/usr/local/include/webrtc/third_party/abseil-cpp \
    -std=c++17 \
    -DWEBRTC_POSIX \
    /home/test/webrtc-checkout/src/out/Release-gcc/obj/libwebrtc.a \
    -ldl -lpthread -lX11
g++ -o server -O2 -s \
    server.cpp \
    -I/usr/local/include/webrtc \
    -I/usr/local/include/webrtc/third_party/abseil-cpp \
    -std=c++17 \
    -DWEBRTC_POSIX \
    /home/test/webrtc-checkout/src/out/Release-gcc/obj/libwebrtc.a \
    -ldl -lpthread -lX11
./server

*/
int main(int argc, char const *argv[]) {
    io_context ioc(4);
    scoped_refptr<Client> client{new RefCountedObject<Client>(ioc)};
    client->start();

    boost::asio::executor_work_guard<decltype(ioc.get_executor())> work{ioc.get_executor()};
    ioc.run();

    return 0;
}

speed fast

iperf3 client -> pion/webrtc (go) -> pion/webrtc (go) -> iperf3 server

I tested it but the code is missing.

speed fast

iperf3 client -> webrtc native (c++) -> webrtc native (c++) -> iperf3 server

image

c++ code
#include <iostream>

#include <api/create_peerconnection_factory.h>
#include <boost/asio.hpp>
#include <boost/beast/core/detail/base64.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>

using namespace std;
using namespace webrtc;
using namespace rtc;
using namespace boost::beast::detail;
using namespace boost::property_tree;
using namespace boost::asio;
using namespace boost::asio::ip;

class DummySetSessionDescriptionObserver
    : public SetSessionDescriptionObserver {
  public:
    static DummySetSessionDescriptionObserver *Create() {
        return new RefCountedObject<DummySetSessionDescriptionObserver>();
    }
    virtual void OnSuccess() {}
    virtual void OnFailure(RTCError error) { assert(false); }
};

class DataChannelObserver : public webrtc::DataChannelObserver {
  public:
    string label;
    DataChannelObserver(scoped_refptr<DataChannelInterface> dataChannel,
                        io_context &ioc,
                        shared_ptr<tcp::socket> tcpSocket,
                        string label)
        : dataChannel(dataChannel), ioc(ioc), tcpSocket(tcpSocket), label(label) {}

  protected:
    void OnStateChange() {
        cout << "OnStateChange: " << label << ": " << DataChannelInterface::DataStateString(dataChannel->state()) << endl;

        if (dataChannel->state() == DataChannelInterface::kOpen) {
            if (tcpSocket == nullptr) {
                tcp::socket tcpSocket(ioc);
                tcpSocket.async_connect(
                    tcp::endpoint(ip::make_address("127.0.0.1"), 5201),
                    [this](boost::system::error_code ec) {
                        if (ec) {
                            cout << ec.message() << endl;
                            return;
                        }
                        doRead();
                    });
                this->tcpSocket =
                    shared_ptr<tcp::socket>(new tcp::socket(move(tcpSocket)));
            } else {
                doRead();
            }
        } else if (dataChannel->state() == DataChannelInterface::kClosed) {
            delete (this);
        }
    }

    void OnMessage(const DataBuffer &buffer) {
        async_write(*tcpSocket,
                    boost::asio::buffer(buffer.data.data(), buffer.size()),
                    [this](boost::system::error_code ec, size_t bytesTransferred) {
                        if (ec) {
                            cout << ec.message() << endl;
                            return;
                        }
                    });
    }

  private:
    scoped_refptr<DataChannelInterface> dataChannel;
    io_context &ioc;
    shared_ptr<tcp::socket> tcpSocket;
    vector<char> tcpSocketReadBuf = vector<char>(8 * 1024);

    void doRead() {
        tcpSocket->async_read_some(
            buffer(tcpSocketReadBuf.data(),
                   tcpSocketReadBuf.size()),
            [this](boost::system::error_code ec, size_t nread) {
                if (ec) {
                    cout << ec.message() << endl;
                    return;
                }
                dataChannel->Send(DataBuffer(CopyOnWriteBuffer(tcpSocketReadBuf.data(), nread), true));
                doRead();
            });
    }
};

class Client : public PeerConnectionObserver,
               public CreateSessionDescriptionObserver {
  public:
    scoped_refptr<PeerConnectionInterface> peerConnection;
    io_context &ioc;
    unique_ptr<Thread> signalingThread;
    Client *otherClient = nullptr;
    tcp::acceptor *acceptor;

    Client(io_context &ioc) : ioc(ioc) {
        // 创建 peerConnection
        signalingThread = Thread::Create();
        signalingThread->Start();
        PeerConnectionFactoryDependencies dependencies;
        dependencies.signaling_thread = signalingThread.get();
        auto peerConnectionFactory =
            CreateModularPeerConnectionFactory(move(dependencies));
        PeerConnectionInterface::RTCConfiguration configuration;
        PeerConnectionDependencies connectionDependencies(this);
        auto peerConnectionOrError =
            peerConnectionFactory->CreatePeerConnectionOrError(
                configuration, move(connectionDependencies));
        if (!peerConnectionOrError.ok()) {
            cout << "!peerConnectionOrError.ok()" << endl;
            exit(1);
        }
        peerConnection = peerConnectionOrError.MoveValue();
    }

    void doListen() {
        acceptor = new tcp::acceptor(ioc, tcp::endpoint(tcp::v4(), 5555));
        doAccept();
    }

    void doAccept() {
        acceptor->async_accept([this](boost::system::error_code ec, tcp::socket socket) {
            cout << "acceptor->async_accept" << endl;

            if (ec) {
                cout << ec.message() << endl;
                return;
            }

            signalingThread->Invoke<void>(RTC_FROM_HERE, [this, &socket]() {
                stringstream srcAddr;
                srcAddr << socket.remote_endpoint();
                auto dataChannelOrError = peerConnection->CreateDataChannelOrError(srcAddr.str(), nullptr);
                if (!dataChannelOrError.ok()) {
                    cout << "!dataChannelOrError.ok()" << endl;
                    exit(1);
                }
                auto dataChannel = dataChannelOrError.MoveValue();
                auto dataChannelObserver = new ::DataChannelObserver(dataChannel, ioc, shared_ptr<tcp::socket>(new tcp::socket(move(socket))), "doAccept");
                dataChannel->RegisterObserver(dataChannelObserver);
            });

            doAccept();
        });
    }

    void SetOtherClient(Client *client) {
        otherClient = client;
    }

  protected:
    /*
    PeerConnectionObserver
    */

    void OnSignalingChange(PeerConnectionInterface::SignalingState new_state) {
        cout << "OnSignalingChange: "
             << PeerConnectionInterface::AsString(new_state) << endl;
    }

    void OnDataChannel(scoped_refptr<DataChannelInterface> data_channel) {
        cout << "OnDataChannel: " << data_channel->label() << endl;
        auto dataChannelObserver = new ::DataChannelObserver(data_channel, ioc, nullptr, "OnDataChannel");
        data_channel->RegisterObserver(dataChannelObserver);
    }

    void OnNegotiationNeededEvent(uint32_t event_id) {
        cout << "OnNegotiationNeededEvent: " << event_id << endl;

        PeerConnectionInterface::RTCOfferAnswerOptions options;
        peerConnection->CreateOffer(this, options);
    }

    void
    OnIceConnectionChange(PeerConnectionInterface::IceConnectionState new_state) {
        cout << "OnIceConnectionChange: "
             << PeerConnectionInterface::AsString(new_state) << endl;
    }

    void
    OnConnectionChange(PeerConnectionInterface::PeerConnectionState new_state) {
        cout << "OnConnectionChange: "
             << PeerConnectionInterface::AsString(new_state) << endl;
    }

    void
    OnIceGatheringChange(PeerConnectionInterface::IceGatheringState new_state) {
        cout << "OnIceGatheringChange: "
             << PeerConnectionInterface::AsString(new_state) << endl;
    }

    void OnIceCandidate(const IceCandidateInterface *candidate) {
        cout << "OnIceCandidate" << endl;
    }

    /*
    CreateSessionDescriptionObserver
    */

    void OnSuccess(SessionDescriptionInterface *desc) {
        cout << "OnSuccess" << endl;

        peerConnection->SetLocalDescription(DummySetSessionDescriptionObserver::Create(), desc);
        auto localSDP = const_cast<SessionDescriptionInterface *>(peerConnection->local_description());
        otherClient->peerConnection->SetRemoteDescription(DummySetSessionDescriptionObserver::Create(), localSDP);

        if (desc->GetType() == SdpType::kOffer) {
            PeerConnectionInterface::RTCOfferAnswerOptions options;
            otherClient->peerConnection->CreateAnswer(otherClient, options);
        }
    }

    void OnFailure(RTCError error) {
        cout << "OnFailure: " << error.message() << endl;
    }
};

/*
clear
g++ -o double_side -g -ggdb \
    double_side.cpp \
    -I/usr/local/include/webrtc \
    -I/usr/local/include/webrtc/third_party/abseil-cpp \
    -std=c++17 \
    -DWEBRTC_POSIX \
    /home/test/webrtc-checkout/src/out/Debug-gcc/obj/libwebrtc.a \
    -ldl -lpthread -lX11
g++ -o double_side -O2 -s \
    double_side.cpp \
    -I/usr/local/include/webrtc \
    -I/usr/local/include/webrtc/third_party/abseil-cpp \
    -std=c++17 \
    -DWEBRTC_POSIX \
    /home/test/webrtc-checkout/src/out/Release-gcc/obj/libwebrtc.a \
    -ldl -lpthread -lX11
./double_side

*/
int main(int argc, char const *argv[]) {
    io_context ioc(1);
    scoped_refptr<Client> client1{new RefCountedObject<Client>(ioc)};
    scoped_refptr<Client> client2{new RefCountedObject<Client>(ioc)};
    client1->SetOtherClient(client2.get());
    client2->SetOtherClient(client1.get());
    client2->doListen();

    boost::asio::executor_work_guard<decltype(ioc.get_executor())> work{ioc.get_executor()};
    ioc.run();

    return 0;
}

FH0 avatar Jul 28 '22 03:07 FH0

I did further tests. You can find the code used here.

Pure pion/webrtc took less time while transfering the same amount of data. And fewer packets are used (11808 vs 29838).

image

image

image

FH0 avatar Sep 03 '22 09:09 FH0