Fix usrsctp usage in Rust
WIP
Fixes https://github.com/versatica/mediasoup/issues/1352
Details
- Basically as described in the ticket. But not everything is done at all.
- Also, I'm testing this in Node by using UV async stuff (which doesn't make sense in mediasoup for Node but anyway).
TODO
-
None of these changes should take effect when in Node, so we need to pass (or to NOT pass) some
defineonly from Rust to enable this in the C++ code. We don't want to deal with UV async stuff when in Node because it's not needed at all, so let's see how to do it. -
Missing thread X to initialize usrsctp and run the
Checkersingleton. And many other things.
Issue 1: test-node-sctp.ts fails
I've added some console logs: https://github.com/versatica/mediasoup/pull/1353/commits/069f78e62c618afa37622e119b4ea0d522fc9da1
The test fails because first sent message is not later received by the data consumer:
npx jest --testPathPattern node/src/test/test-node-sctp.ts
console.log
TODO: Revert numMessages to 200
at node/src/test/test-node-sctp.ts:116:10
console.log
---- sending id 1
at node/src/test/test-node-sctp.ts:143:12
console.log
---- sending id 2
at node/src/test/test-node-sctp.ts:143:12
console.log
---- sending id 3
at node/src/test/test-node-sctp.ts:143:12
console.log
---- received id 2
at SCTPStreamReadable.<anonymous> (node/src/test/test-node-sctp.ts:175:13)
FAIL test/test-node-sctp.ts (7.075 s)
✕ ordered DataProducer delivers all SCTP messages to the DataConsumer (4267 ms)
● ordered DataProducer delivers all SCTP messages to the DataConsumer
id 2 in message should match numReceivedMessages 1
177 | if (id !== numReceivedMessages) {
178 | reject(
> 179 | new Error(
| ^
180 | `id ${id} in message should match numReceivedMessages ${numReceivedMessages}`
181 | )
182 | );
at SCTPStreamReadable.<anonymous> (node/src/test/test-node-sctp.ts:179:7)
UPDATE: Issue found. Problem is that when usrsctp send callback is called, there we store sending data into a map and then invoke uv async (which will happen time later) and such a uv async will read from that storage to send the data. Problem is that, if two sequential messages must be sent to a peer, the second one will override the first one in the storage so when uv async callback is executed it will only read the second one. Here the issue (see how onSendSctpData() is called twice sequentially):
---- test | sending id 1
---- test | sending id 2
RTC::Transport::OnSctpAssociationMessageReceived() | ------ [streamId:123, ppid:53, msg:'1']
RTC::DataProducer::ReceiveMessage() | ------ [msg:'1']
RTC::Router::OnTransportDataProducerMessageReceived() | ------ [ppid:53, msg:'1']
RTC::DataConsumer::SendMessage() | ------ [ppid:53, msg:'1']
RTC::Transport::OnDataConsumerSendMessage() | ------ [ppid:53, msg:'1']
RTC::PlainTransport::SendMessage() | ------ [ppid:53, msg:'1']
RTC::SctpAssociation::SendSctpMessage() | ------ [ppid:53, msg:'1']
DepUsrSCTP::onSendSctpData() | ****
DepUsrSCTP::SendSctpData() | **** previous store.data: 0x0 [len:0]
DepUsrSCTP::onSendSctpData() | ****
DepUsrSCTP::SendSctpData() | **** previous store.data: 0x600001a88040 [len:32]
DepUsrSCTP::onAsync() | ****
RTC::Transport::OnSctpAssociationMessageReceived() | ------ [streamId:123, ppid:53, msg:'2']
RTC::DataProducer::ReceiveMessage() | ------ [msg:'2']
RTC::Router::OnTransportDataProducerMessageReceived() | ------ [ppid:53, msg:'2']
RTC::DataConsumer::SendMessage() | ------ [ppid:53, msg:'2']
RTC::Transport::OnDataConsumerSendMessage() | ------ [ppid:53, msg:'2']
RTC::PlainTransport::SendMessage() | ------ [ppid:53, msg:'2']
RTC::SctpAssociation::SendSctpMessage() | ------ [ppid:53, msg:'2']
DepUsrSCTP::onSendSctpData() | ****
DepUsrSCTP::SendSctpData() | **** previous store.data: 0x0 [len:0]
DepUsrSCTP::onAsync() | ****
---- test | received id 2
RUNS test/test-node-sctp.ts
FAIL test/test-node-sctp.ts
✕ ordered DataProducer delivers all SCTP messages to the DataConsumer (1645 ms)
● ordered DataProducer delivers all SCTP messages to the DataConsumer
id 2 in message should match numReceivedMessages 1
Solution is: Store all pending messages to be sent in a container.
Maybe related to this?:
https://docs.libuv.org/en/v1.x/async.html
libuv will coalesce calls to uv_async_send(), that is, not every call to it will yield an execution of the callback. For example: if uv_async_send() is called 5 times in a row before the callback is called, the callback will only be called once. If uv_async_send() is called again after the callback was called, it will be called again.
Ok, so I've solved the problem as follows: https://github.com/versatica/mediasoup/pull/1353/commits/1a25bedd503f2a96f3d745f7f9af203e02688ab9
Test now works (for simplicity is temporary reduced to just 2 messages) and all logs are good:
DepUsrSCTP::SendSctpDataStore() | ---------- store constructor [fooId:1]
DepUsrSCTP::onSendSctpData() | **** onSendSctpData() called
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back()... [items.size:0]
DepUsrSCTP::SendSctpDataItem() | ---------- item constructor
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back() DONE [item.fooId:1, items.size:1]
DepUsrSCTP::onSendSctpData() | **** onSendSctpData() called
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back()... [items.size:1]
DepUsrSCTP::SendSctpDataItem() | ---------- item constructor
DepUsrSCTP::~SendSctpDataItem() | ---------- item destructor [fooId:1, data:0x6000014fc620]
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back() DONE [item.fooId:2, items.size:2]
DepUsrSCTP::onAsync() | **** onAsync() called
DepUsrSCTP::onAsync() | ------------- sending pening messages [items.size:2]
DepUsrSCTP::ClearItems() | ---------- store ClearItems() [fooId:1, items.size():2]
DepUsrSCTP::~SendSctpDataItem() | ---------- item destructor [fooId:2, data:0x7f78ff705690]
DepUsrSCTP::~SendSctpDataItem() | ---------- item destructor [fooId:1, data:0x6000014fc620]
DepUsrSCTP::onSendSctpData() | **** onSendSctpData() called
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back()... [items.size:0]
DepUsrSCTP::SendSctpDataItem() | ---------- item constructor
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back() DONE [item.fooId:3, items.size:1]
DepUsrSCTP::onAsync() | **** onAsync() called
DepUsrSCTP::onAsync() | ------------- sending pening messages [items.size:1]
DepUsrSCTP::ClearItems() | ---------- store ClearItems() [fooId:1, items.size():1]
DepUsrSCTP::~SendSctpDataItem() | ---------- item destructor [fooId:3, data:0x600003df8090]
TODO: Revert numMessages to 200
---- test | sending id 1
---- test | sending id 2
RTC::Transport::OnSctpAssociationMessageReceived() | ------ [streamId:123, ppid:53, msg:'1']
RTC::DataProducer::ReceiveMessage() | ------ [msg:'1']
RTC::DataConsumer::SendMessage() | ------ [ppid:53, msg:'1']
RTC::SctpAssociation::SendSctpMessage() | ------ [ppid:53, msg:'1']
DepUsrSCTP::onSendSctpData() | **** onSendSctpData() called
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back()... [items.size:0]
DepUsrSCTP::SendSctpDataItem() | ---------- item constructor
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back() DONE [item.fooId:4, items.size:1]
DepUsrSCTP::onSendSctpData() | **** onSendSctpData() called
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back()... [items.size:1]
DepUsrSCTP::SendSctpDataItem() | ---------- item constructor
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back() DONE [item.fooId:5, items.size:2]
DepUsrSCTP::onAsync() | **** onAsync() called
DepUsrSCTP::onAsync() | ------------- sending pening messages [items.size:2]
DepUsrSCTP::ClearItems() | ---------- store ClearItems() [fooId:1, items.size():2]
DepUsrSCTP::~SendSctpDataItem() | ---------- item destructor [fooId:5, data:0x600003fe0c00]
DepUsrSCTP::~SendSctpDataItem() | ---------- item destructor [fooId:4, data:0x600003fe08e0]
---- test | received id 1
RTC::Transport::OnSctpAssociationMessageReceived() | ------ [streamId:123, ppid:53, msg:'2']
RTC::DataProducer::ReceiveMessage() | ------ [msg:'2']
RTC::DataConsumer::SendMessage() | ------ [ppid:53, msg:'2']
RTC::SctpAssociation::SendSctpMessage() | ------ [ppid:53, msg:'2']
DepUsrSCTP::onSendSctpData() | **** onSendSctpData() called
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back()... [items.size:0]
DepUsrSCTP::SendSctpDataItem() | ---------- item constructor
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back() DONE [item.fooId:6, items.size:1]
DepUsrSCTP::onAsync() | **** onAsync() called
DepUsrSCTP::onAsync() | ------------- sending pening messages [items.size:1]
DepUsrSCTP::ClearItems() | ---------- store ClearItems() [fooId:1, items.size():1]
DepUsrSCTP::~SendSctpDataItem() | ---------- item destructor [fooId:6, data:0x600003ffeec0]
---- test | received id 2
PASS test/test-node-sctp.ts (5.395 s)
✓ ordered DataProducer delivers all SCTP messages to the DataConsumer (1608 ms)
And OF COURSE a double free bug that just happens in Rust in CI in Ubuntu and not in MacOS, to make me spend yet another 2 extra days on this: https://github.com/versatica/mediasoup/actions/runs/8172985292/job/22344414084?pr=1353
Are you making sure that threadA is not writing to a SendSctpDataStore at the when at the same time the uv main thread is reading it?
Are you making sure that threadA is not writing to a
SendSctpDataStoreat the when at the same time theuvmain thread is reading it?
ClassDestroy() method should also use the mutex since it clears the storage map so every store is deallocated so it deletes all items in the items array. It maybe that.
Also take into account that this PR doesn't yet implement the thing about running the Checker in a separate thread.
Are you making sure that threadA is not writing to a
SendSctpDataStoreat the when at the same time theuvmain thread is reading it?ClassDestroy() method should also use the mutex since it clears the storage map so every store is deallocated so it deletes all items in the items array. It maybe that.
No, false alarm, it already calls const std::lock_guard<std::mutex> lock(GlobalSyncMutex);.
I've done many changes in https://github.com/versatica/mediasoup/pull/1353/commits/ca5f222e914cde25622de368ca927a9b3dbaf1a8:
- Move creation of
DepUsrSCTP::CheckerfromWorkerconstructor/destructor toDepUsrSCTP::ClassInit()andDepUsrSCTP::ClassDestroy(). This comes with an issue (see Issue 2 in next comment). - Make the
DepUsrSCTP::Checkersingleton bestaticrather thanthread_local static. Why? Because there must be only one and not one per thread (in Rust).
Issue 2
When running any test in Node:
npx jest --testPathPattern node/src/test/test-node-sctp.ts
mediasoup:ERROR:Worker (stderr) DepLibUV::onWalk() | alive UV handle found (this shouldn't happen) [type:timer, active:0, closing:1, has_ref:1] +0ms
492 | for (const line of buffer.toString('utf8').split('\n')) {
493 | if (line) {
> 494 | workerLogger.error(`(stderr) ${line}`);
| ^
495 | }
496 | }
497 | });
This is because DepUsrSCTP::CloseChecker() is now called from DepUsrSCTP::ClassDestroy() rather than from Worker::Close() method. Not sure why it happens honestly, not sure what the difference is. I've added logs to file in /tmp/ms_log.txt and the problem is clear:
---- Timer created
---- Timer created
---- onCloseTimer
---- WORKER CLOSED
---- DepUsrSCTP::ClassDestroy()---- DepUsrSCTP::ClassDestroy() | calling mapAsyncHandlerSendSctpData.clear()
---- onWalk
---- onCloseTimer
As you can see, when onWalk() runs (which is defined in DepLibUV.cpp and it's called by lib.cpp when the Worker instance ends) the timer of the Checker singleton hasn't been yet freed. It's done later.
After a meeting with Jose we have decided that, as originally planned, the timer of the usrsctp Checker singleton must run in a separate thread X and when the first SctpAssociation is created or when the last one is destroyed (in any Worker thread) then DepUsrSctp must use uv_async_send to tell the Checker singleton to start, stop or restart the timer. And same when DepUsrSctp::HandleUsrSctpTimers() is called if PR https://github.com/versatica/mediasoup/pull/1351 is merged.
Are you making sure that threadA is not writing to a SendSctpDataStore at the when at the same time the uv main thread is reading it?
Running a test with ThreadSanitizer enabled may help confirm that.