webrtc
webrtc copied to clipboard
Communication with webrtc native using data channel is slow
We use webrtc native on ios and android platforms. Below are the issue we found. For this, we had to use webrtc native in our go project.
pion/webrtc/v3 v3.1.43 webrtc native latest (I don't know how to get the version) WSL2(Utuntu-20.04)
speed unstable
iperf3 client -> pion/webrtc (go) -> webrtc native (c++) -> iperf3 server
go code
package main
import (
"encoding/base64"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"net"
"github.com/pion/webrtc/v3"
)
/*
iperf3 -s
go run ./build/ -listen=:5555
iperf3 -c 127.0.0.1 -p 5555 -b 500M
*/
func main() {
listenAddress := flag.String("listen", ":0", "address to listen on")
flag.Parse()
// 创建 peerConnection
config := webrtc.Configuration{}
var err error
peerConnection, err := webrtc.NewPeerConnection(config)
if err != nil {
panic(err)
}
// 注册 peerConnection 回调
peerConnection.OnICEConnectionStateChange(func(is webrtc.ICEConnectionState) {
fmt.Println("peerConnection.OnICEConnectionStateChange: ", is.String())
})
peerConnection.OnNegotiationNeeded(func() {
fmt.Println("peerConnection.OnNegotiationNeeded")
offer, err := peerConnection.CreateOffer(nil)
if err != nil {
panic(err)
}
err = peerConnection.SetLocalDescription(offer)
if err != nil {
panic(err)
}
})
peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) {
// 跳过,直到最后一次
if i != nil {
return
}
// 发送 offer
offer := peerConnection.LocalDescription()
offerBytes, err := json.Marshal(offer)
if err != nil {
panic(err)
}
offerBase64 := base64.StdEncoding.EncodeToString(offerBytes)
fmt.Println(offerBase64)
// 接收 answer
fmt.Println("please enter answer")
var answerBase64 string
fmt.Scanln(&answerBase64)
answerBytes, err := base64.StdEncoding.DecodeString(answerBase64)
if err != nil {
panic(err)
}
var answer webrtc.SessionDescription
err = json.Unmarshal(answerBytes, &answer)
if err != nil {
panic(err)
}
err = peerConnection.SetRemoteDescription(answer)
if err != nil {
panic(err)
}
})
// 监听端口
listener, err := net.Listen("tcp", *listenAddress)
if err != nil {
panic(err)
}
for {
conn, err := listener.Accept()
if err != nil {
panic(err)
}
// 创建 dataChannel
dataChan, err := peerConnection.CreateDataChannel(conn.RemoteAddr().String(), nil)
if err != nil {
panic(err)
}
dataChan.OnClose(func() {
fmt.Println("dataChan.OnClose")
})
dataChan.OnOpen(func() {
fmt.Printf("Data channel '%s'-'%d' open.\n", dataChan.Label(), dataChan.ID())
// 转发数据
buf := make([]byte, 8*1024)
for {
nread, err := conn.Read(buf)
if nread > 0 {
err = dataChan.Send(buf[:nread])
if err != nil {
panic(err)
}
}
if err != nil {
if errors.Is(err, io.EOF) {
return
}
panic(err)
}
}
})
dataChan.OnMessage(func(msg webrtc.DataChannelMessage) {
// 转发数据
_, err = conn.Write(msg.Data)
if err != nil {
panic(err)
}
})
}
}
c++ code
#include <iostream>
#include <api/create_peerconnection_factory.h>
#include <boost/asio.hpp>
#include <boost/beast/core/detail/base64.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
using namespace std;
using namespace webrtc;
using namespace rtc;
using namespace boost::beast::detail;
using namespace boost::property_tree;
using namespace boost::asio;
using namespace boost::asio::ip;
class DummySetSessionDescriptionObserver
: public SetSessionDescriptionObserver {
public:
static DummySetSessionDescriptionObserver *Create() {
return new RefCountedObject<DummySetSessionDescriptionObserver>();
}
virtual void OnSuccess() {}
virtual void OnFailure(RTCError error) { assert(false); }
};
class DataChannelObserver : public webrtc::DataChannelObserver {
public:
DataChannelObserver(scoped_refptr<DataChannelInterface> dataChannel,
io_context &ioc)
: dataChannel(dataChannel), ioc(ioc) {}
protected:
void OnStateChange() {
cout << "OnStateChange: " << dataChannel->state() << endl;
if (dataChannel->state() == DataChannelInterface::kOpen) {
tcp::socket tcpSocket(ioc);
tcpSocket.async_connect(
tcp::endpoint(ip::make_address("127.0.0.1"), 5201),
[this](boost::system::error_code ec) {
if (ec) {
cout << ec.message() << endl;
return;
}
doRead();
});
this->tcpSocket =
shared_ptr<tcp::socket>(new tcp::socket(move(tcpSocket)));
} else if (dataChannel->state() == DataChannelInterface::kClosed) {
delete (this);
}
}
void OnMessage(const DataBuffer &buffer) {
async_write(*tcpSocket,
boost::asio::buffer(buffer.data.data(), buffer.size()),
[this](boost::system::error_code ec, size_t bytesTransferred) {
if (ec) {
cout << ec.message() << endl;
return;
}
});
}
private:
scoped_refptr<DataChannelInterface> dataChannel;
io_context &ioc;
shared_ptr<tcp::socket> tcpSocket;
vector<char> tcpSocketReadBuf = vector<char>(8 * 1024);
void doRead() {
tcpSocket->async_read_some(
buffer(tcpSocketReadBuf.data(),
tcpSocketReadBuf.size()),
[this](boost::system::error_code ec, size_t nread) {
if (ec) {
cout << ec.message() << endl;
return;
}
dataChannel->Send(DataBuffer(CopyOnWriteBuffer(tcpSocketReadBuf.data(), nread), true));
doRead();
});
}
};
class Client : public PeerConnectionObserver,
public CreateSessionDescriptionObserver {
public:
scoped_refptr<PeerConnectionInterface> peerConnection;
io_context &ioc;
unique_ptr<Thread> signalingThread;
Client(io_context &ioc) : ioc(ioc) {
// 创建 peerConnection
signalingThread = Thread::Create();
signalingThread->Start();
PeerConnectionFactoryDependencies dependencies;
dependencies.signaling_thread = signalingThread.get();
auto peerConnectionFactory =
CreateModularPeerConnectionFactory(move(dependencies));
PeerConnectionInterface::RTCConfiguration configuration;
PeerConnectionDependencies connectionDependencies(this);
auto peerConnectionOrError =
peerConnectionFactory->CreatePeerConnectionOrError(
configuration, move(connectionDependencies));
if (!peerConnectionOrError.ok()) {
cout << "!peerConnectionOrError.ok()" << endl;
exit(1);
}
peerConnection = peerConnectionOrError.MoveValue();
}
void start() {
// 读取 offer
cout << "please enter offer" << endl;
string offerBase64;
getline(cin, offerBase64);
string offerStr(base64::decoded_size(offerBase64.size()), ' ');
auto decodePair = base64::decode((void *)offerStr.data(),
offerBase64.c_str(), offerBase64.size());
offerStr.resize(decodePair.first);
stringstream offerStream;
offerStream << offerStr;
ptree pt;
read_json(offerStream, pt);
auto sdpStr = pt.get<string>("sdp");
auto sdp = CreateSessionDescription(SdpType::kOffer, sdpStr);
peerConnection->SetRemoteDescription(
DummySetSessionDescriptionObserver::Create(), sdp.release());
PeerConnectionInterface::RTCOfferAnswerOptions options;
peerConnection->CreateAnswer(this, options);
}
protected:
/*
PeerConnectionObserver
*/
void OnSignalingChange(PeerConnectionInterface::SignalingState new_state) {
cout << "OnSignalingChange: "
<< PeerConnectionInterface::AsString(new_state) << endl;
}
void OnDataChannel(scoped_refptr<DataChannelInterface> data_channel) {
cout << "OnDataChannel: " << data_channel->label() << endl;
auto dataChannelObserver = new ::DataChannelObserver(data_channel, ioc);
data_channel->RegisterObserver(dataChannelObserver);
}
void OnNegotiationNeededEvent(uint32_t event_id) {
cout << "OnNegotiationNeededEvent: " << event_id << endl;
}
void
OnIceConnectionChange(PeerConnectionInterface::IceConnectionState new_state) {
cout << "OnIceConnectionChange: "
<< PeerConnectionInterface::AsString(new_state) << endl;
}
void
OnConnectionChange(PeerConnectionInterface::PeerConnectionState new_state) {
cout << "OnConnectionChange: "
<< PeerConnectionInterface::AsString(new_state) << endl;
}
void
OnIceGatheringChange(PeerConnectionInterface::IceGatheringState new_state) {
cout << "OnIceGatheringChange: "
<< PeerConnectionInterface::AsString(new_state) << endl;
}
void OnIceCandidate(const IceCandidateInterface *candidate) {
cout << "OnIceCandidate" << endl;
}
/*
CreateSessionDescriptionObserver
*/
void OnSuccess(SessionDescriptionInterface *desc) {
cout << "OnSuccess" << endl;
if (desc->GetType() == SdpType::kAnswer) {
peerConnection->SetLocalDescription(
DummySetSessionDescriptionObserver::Create(), desc);
desc = const_cast<SessionDescriptionInterface *>(
peerConnection->local_description());
string sdpStr;
desc->ToString(&sdpStr);
stringstream answerStream;
ptree pt;
pt.add("type", "answer");
pt.add("sdp", sdpStr);
write_json(answerStream, pt, false);
string answerBase64(base64::encoded_size(answerStream.str().size()), ' ');
auto nwrite = base64::encode(answerBase64.data(),
(void *)answerStream.str().c_str(),
answerStream.str().size());
answerBase64.resize(nwrite);
cout << answerBase64 << endl;
}
}
void OnFailure(RTCError error) {
cout << "OnFailure: " << error.message() << endl;
}
};
/*
clear
g++ -o server -g -ggdb \
server.cpp \
-I/usr/local/include/webrtc \
-I/usr/local/include/webrtc/third_party/abseil-cpp \
-std=c++17 \
-DWEBRTC_POSIX \
/home/test/webrtc-checkout/src/out/Release-gcc/obj/libwebrtc.a \
-ldl -lpthread -lX11
g++ -o server -O2 -s \
server.cpp \
-I/usr/local/include/webrtc \
-I/usr/local/include/webrtc/third_party/abseil-cpp \
-std=c++17 \
-DWEBRTC_POSIX \
/home/test/webrtc-checkout/src/out/Release-gcc/obj/libwebrtc.a \
-ldl -lpthread -lX11
./server
*/
int main(int argc, char const *argv[]) {
io_context ioc(4);
scoped_refptr<Client> client{new RefCountedObject<Client>(ioc)};
client->start();
boost::asio::executor_work_guard<decltype(ioc.get_executor())> work{ioc.get_executor()};
ioc.run();
return 0;
}
speed fast
iperf3 client -> pion/webrtc (go) -> pion/webrtc (go) -> iperf3 server
I tested it but the code is missing.
speed fast
iperf3 client -> webrtc native (c++) -> webrtc native (c++) -> iperf3 server
c++ code
#include <iostream>
#include <api/create_peerconnection_factory.h>
#include <boost/asio.hpp>
#include <boost/beast/core/detail/base64.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
using namespace std;
using namespace webrtc;
using namespace rtc;
using namespace boost::beast::detail;
using namespace boost::property_tree;
using namespace boost::asio;
using namespace boost::asio::ip;
class DummySetSessionDescriptionObserver
: public SetSessionDescriptionObserver {
public:
static DummySetSessionDescriptionObserver *Create() {
return new RefCountedObject<DummySetSessionDescriptionObserver>();
}
virtual void OnSuccess() {}
virtual void OnFailure(RTCError error) { assert(false); }
};
class DataChannelObserver : public webrtc::DataChannelObserver {
public:
string label;
DataChannelObserver(scoped_refptr<DataChannelInterface> dataChannel,
io_context &ioc,
shared_ptr<tcp::socket> tcpSocket,
string label)
: dataChannel(dataChannel), ioc(ioc), tcpSocket(tcpSocket), label(label) {}
protected:
void OnStateChange() {
cout << "OnStateChange: " << label << ": " << DataChannelInterface::DataStateString(dataChannel->state()) << endl;
if (dataChannel->state() == DataChannelInterface::kOpen) {
if (tcpSocket == nullptr) {
tcp::socket tcpSocket(ioc);
tcpSocket.async_connect(
tcp::endpoint(ip::make_address("127.0.0.1"), 5201),
[this](boost::system::error_code ec) {
if (ec) {
cout << ec.message() << endl;
return;
}
doRead();
});
this->tcpSocket =
shared_ptr<tcp::socket>(new tcp::socket(move(tcpSocket)));
} else {
doRead();
}
} else if (dataChannel->state() == DataChannelInterface::kClosed) {
delete (this);
}
}
void OnMessage(const DataBuffer &buffer) {
async_write(*tcpSocket,
boost::asio::buffer(buffer.data.data(), buffer.size()),
[this](boost::system::error_code ec, size_t bytesTransferred) {
if (ec) {
cout << ec.message() << endl;
return;
}
});
}
private:
scoped_refptr<DataChannelInterface> dataChannel;
io_context &ioc;
shared_ptr<tcp::socket> tcpSocket;
vector<char> tcpSocketReadBuf = vector<char>(8 * 1024);
void doRead() {
tcpSocket->async_read_some(
buffer(tcpSocketReadBuf.data(),
tcpSocketReadBuf.size()),
[this](boost::system::error_code ec, size_t nread) {
if (ec) {
cout << ec.message() << endl;
return;
}
dataChannel->Send(DataBuffer(CopyOnWriteBuffer(tcpSocketReadBuf.data(), nread), true));
doRead();
});
}
};
class Client : public PeerConnectionObserver,
public CreateSessionDescriptionObserver {
public:
scoped_refptr<PeerConnectionInterface> peerConnection;
io_context &ioc;
unique_ptr<Thread> signalingThread;
Client *otherClient = nullptr;
tcp::acceptor *acceptor;
Client(io_context &ioc) : ioc(ioc) {
// 创建 peerConnection
signalingThread = Thread::Create();
signalingThread->Start();
PeerConnectionFactoryDependencies dependencies;
dependencies.signaling_thread = signalingThread.get();
auto peerConnectionFactory =
CreateModularPeerConnectionFactory(move(dependencies));
PeerConnectionInterface::RTCConfiguration configuration;
PeerConnectionDependencies connectionDependencies(this);
auto peerConnectionOrError =
peerConnectionFactory->CreatePeerConnectionOrError(
configuration, move(connectionDependencies));
if (!peerConnectionOrError.ok()) {
cout << "!peerConnectionOrError.ok()" << endl;
exit(1);
}
peerConnection = peerConnectionOrError.MoveValue();
}
void doListen() {
acceptor = new tcp::acceptor(ioc, tcp::endpoint(tcp::v4(), 5555));
doAccept();
}
void doAccept() {
acceptor->async_accept([this](boost::system::error_code ec, tcp::socket socket) {
cout << "acceptor->async_accept" << endl;
if (ec) {
cout << ec.message() << endl;
return;
}
signalingThread->Invoke<void>(RTC_FROM_HERE, [this, &socket]() {
stringstream srcAddr;
srcAddr << socket.remote_endpoint();
auto dataChannelOrError = peerConnection->CreateDataChannelOrError(srcAddr.str(), nullptr);
if (!dataChannelOrError.ok()) {
cout << "!dataChannelOrError.ok()" << endl;
exit(1);
}
auto dataChannel = dataChannelOrError.MoveValue();
auto dataChannelObserver = new ::DataChannelObserver(dataChannel, ioc, shared_ptr<tcp::socket>(new tcp::socket(move(socket))), "doAccept");
dataChannel->RegisterObserver(dataChannelObserver);
});
doAccept();
});
}
void SetOtherClient(Client *client) {
otherClient = client;
}
protected:
/*
PeerConnectionObserver
*/
void OnSignalingChange(PeerConnectionInterface::SignalingState new_state) {
cout << "OnSignalingChange: "
<< PeerConnectionInterface::AsString(new_state) << endl;
}
void OnDataChannel(scoped_refptr<DataChannelInterface> data_channel) {
cout << "OnDataChannel: " << data_channel->label() << endl;
auto dataChannelObserver = new ::DataChannelObserver(data_channel, ioc, nullptr, "OnDataChannel");
data_channel->RegisterObserver(dataChannelObserver);
}
void OnNegotiationNeededEvent(uint32_t event_id) {
cout << "OnNegotiationNeededEvent: " << event_id << endl;
PeerConnectionInterface::RTCOfferAnswerOptions options;
peerConnection->CreateOffer(this, options);
}
void
OnIceConnectionChange(PeerConnectionInterface::IceConnectionState new_state) {
cout << "OnIceConnectionChange: "
<< PeerConnectionInterface::AsString(new_state) << endl;
}
void
OnConnectionChange(PeerConnectionInterface::PeerConnectionState new_state) {
cout << "OnConnectionChange: "
<< PeerConnectionInterface::AsString(new_state) << endl;
}
void
OnIceGatheringChange(PeerConnectionInterface::IceGatheringState new_state) {
cout << "OnIceGatheringChange: "
<< PeerConnectionInterface::AsString(new_state) << endl;
}
void OnIceCandidate(const IceCandidateInterface *candidate) {
cout << "OnIceCandidate" << endl;
}
/*
CreateSessionDescriptionObserver
*/
void OnSuccess(SessionDescriptionInterface *desc) {
cout << "OnSuccess" << endl;
peerConnection->SetLocalDescription(DummySetSessionDescriptionObserver::Create(), desc);
auto localSDP = const_cast<SessionDescriptionInterface *>(peerConnection->local_description());
otherClient->peerConnection->SetRemoteDescription(DummySetSessionDescriptionObserver::Create(), localSDP);
if (desc->GetType() == SdpType::kOffer) {
PeerConnectionInterface::RTCOfferAnswerOptions options;
otherClient->peerConnection->CreateAnswer(otherClient, options);
}
}
void OnFailure(RTCError error) {
cout << "OnFailure: " << error.message() << endl;
}
};
/*
clear
g++ -o double_side -g -ggdb \
double_side.cpp \
-I/usr/local/include/webrtc \
-I/usr/local/include/webrtc/third_party/abseil-cpp \
-std=c++17 \
-DWEBRTC_POSIX \
/home/test/webrtc-checkout/src/out/Debug-gcc/obj/libwebrtc.a \
-ldl -lpthread -lX11
g++ -o double_side -O2 -s \
double_side.cpp \
-I/usr/local/include/webrtc \
-I/usr/local/include/webrtc/third_party/abseil-cpp \
-std=c++17 \
-DWEBRTC_POSIX \
/home/test/webrtc-checkout/src/out/Release-gcc/obj/libwebrtc.a \
-ldl -lpthread -lX11
./double_side
*/
int main(int argc, char const *argv[]) {
io_context ioc(1);
scoped_refptr<Client> client1{new RefCountedObject<Client>(ioc)};
scoped_refptr<Client> client2{new RefCountedObject<Client>(ioc)};
client1->SetOtherClient(client2.get());
client2->SetOtherClient(client1.get());
client2->doListen();
boost::asio::executor_work_guard<decltype(ioc.get_executor())> work{ioc.get_executor()};
ioc.run();
return 0;
}
I did further tests. You can find the code used here.
Pure pion/webrtc took less time while transfering the same amount of data. And fewer packets are used (11808 vs 29838).