Fix concurrency races, ghost children, and goroutine lifecycle issues
Summary
This PR fixes critical concurrency issues, memory leaks, and goroutine lifecycle problems in core packet forwarding and Tapo client implementation.
Problems Fixed
1. Data Races in Packet Processing
- Symptom: Deadlocks under high load with multiple consumers
-
Cause:
Receiver.Inputiterated over children without lock while other threads modified the slice - Fix: Lock-and-copy pattern for safe iteration
2. Ghost Children Accumulation
- Symptom: Producers not stopping when consumers disconnect, memory leaks
- Cause: Closed child nodes remained in parent's children list
-
Fix: Active ghost cleanup with
cleanGhostChildren(), closed state tracking
3. Orphaned Senders
- Symptom: Tapo streams couldn't be reused after disconnect
- Cause: Backchannel sender kept alive with no parent reference
- Fix: Clear sender reference on Stop()
4. Uncancellable Goroutines
- Symptom: Tapo Handle() and producer workers stuck running after Stop()
- Cause: No cancellation mechanism, blocking I/O couldn't be interrupted
- Fix: Context cancellation, stop channels, monitor goroutines
See screenshot below for stuck Tapo streams that kept sending data without any consumers:
Changes
Commit 1: Core Concurrency Fixes
-
pkg/core/track.go: Lock-and-copy inReceiver.Input, thread-safeHasSenders() -
pkg/core/node.go: Protected parent/child access with locks -
internal/streams/stream.go: Use new thread-safeHasSenders()
Commit 2: Ghost Children Prevention
-
pkg/core/node.go: Addclosedflag,cleanGhostChildren(), prevent append to closed parent -
pkg/core/track.go: Call ghost cleanup inHasSenders() -
pkg/tapo/producer.go: Clear sender reference
Commit 3: Goroutine Lifecycle Management
-
pkg/tapo/client.go: Add context cancellation, monitor goroutine, safe Close() -
pkg/tapo/backchannel.go: Context checks before operations -
internal/streams/producer.go: Stop channel for worker termination
Just to add a bit more context, I've been working on this in the past two weeks and have been running a build with the final commits for several days with no issues. These bugs were not easy to troubleshoot (my test builds had tons of extra trace logs), but I could pretty easily create stuck children by rapidly enabling/disabling two-way audio in Frigate for my Tapo cameras, eg. switching back-and forth between the tapo protocol and rtsp.
This PR would likely fix strange issues like #1461, #1933, #1611.
Data races can also be checked with go run -race. There are quite a few, but I've been focusing on the core and streams packages, as well as Tapo (since I have Tapo stuff). Cherry picking a few examples:
WARNING: DATA RACE
Read at 0x00c0001f5c20 by goroutine 15:
runtime.mapaccess2_faststr()
/usr/local/go/src/internal/runtime/maps/runtime_faststr_swiss.go:162 +0x0
github.com/AlexxIT/go2rtc/internal/streams.GetProducer()
/root/go2rtc/internal/streams/handlers.go:64 +0x114
github.com/AlexxIT/go2rtc/internal/streams.(*Producer).Dial()
/root/go2rtc/internal/streams/producer.go:62 +0xee
github.com/AlexxIT/go2rtc/internal/streams.(*Stream).AddConsumer()
/root/go2rtc/internal/streams/add_consumer.go:36 +0x7f1
github.com/AlexxIT/go2rtc/internal/rtsp.tcpHandler.func1()
/root/go2rtc/internal/rtsp/rtsp.go:219 +0x158e
github.com/AlexxIT/go2rtc/pkg/core.(*Listener).Fire()
/root/go2rtc/pkg/core/listener.go:16 +0x1dab
github.com/AlexxIT/go2rtc/pkg/rtsp.(*Conn).Accept()
/root/go2rtc/pkg/rtsp/server.go:110 +0xb0b
github.com/AlexxIT/go2rtc/internal/rtsp.tcpHandler()
/root/go2rtc/internal/rtsp/rtsp.go:256 +0x3ca
github.com/AlexxIT/go2rtc/internal/rtsp.Init.func1.gowrap1()
/root/go2rtc/internal/rtsp/rtsp.go:76 +0x33
Previous write at 0x00c0001f5c20 by main goroutine:
runtime.mapassign_faststr()
/usr/local/go/src/internal/runtime/maps/runtime_faststr_swiss.go:263 +0x0
github.com/AlexxIT/go2rtc/internal/streams.HandleFunc()
/root/go2rtc/internal/streams/handlers.go:16 +0x47
github.com/AlexxIT/go2rtc/internal/tapo.Init()
/root/go2rtc/internal/tapo/tapo.go:11 +0x1c
main.main()
/root/go2rtc/main.go:107 +0x5e2
WARNING: DATA RACE
Read at 0x00c0000e0771 by goroutine 24:
github.com/AlexxIT/go2rtc/pkg/rtsp.(*Conn).packetWriter.func2()
/root/go2rtc/pkg/rtsp/consumer.go:141 +0x595
github.com/AlexxIT/go2rtc/pkg/core.NewSender.func2()
/root/go2rtc/pkg/core/track.go:112 +0x48
github.com/AlexxIT/go2rtc/pkg/core.(*Sender).Start.func1()
/root/go2rtc/pkg/core/track.go:145 +0x6a
github.com/AlexxIT/go2rtc/pkg/core.(*Sender).Start.gowrap2()
/root/go2rtc/pkg/core/track.go:148 +0x41
Previous write at 0x00c0000e0771 by goroutine 23:
github.com/AlexxIT/go2rtc/pkg/rtsp.(*Conn).Accept()
/root/go2rtc/pkg/rtsp/server.go:207 +0x16d5
github.com/AlexxIT/go2rtc/internal/rtsp.tcpHandler()
/root/go2rtc/internal/rtsp/rtsp.go:256 +0x3ca
github.com/AlexxIT/go2rtc/internal/rtsp.Init.func1.gowrap1()
/root/go2rtc/internal/rtsp/rtsp.go:76 +0x33
WARNING: DATA RACE
Write at 0x00c00021c1b8 by goroutine 15:
github.com/AlexxIT/go2rtc/pkg/rtsp.(*Conn).Accept()
/root/go2rtc/pkg/rtsp/server.go:170 +0x45e
github.com/AlexxIT/go2rtc/internal/rtsp.tcpHandler()
/root/go2rtc/internal/rtsp/rtsp.go:256 +0x3ca
github.com/AlexxIT/go2rtc/internal/rtsp.Init.func1.gowrap1()
/root/go2rtc/internal/rtsp/rtsp.go:76 +0x33
Previous read at 0x00c00021c1b8 by goroutine 17:
github.com/AlexxIT/go2rtc/pkg/rtsp.(*Conn).packetWriter.func2()
/root/go2rtc/pkg/rtsp/consumer.go:96 +0x8b
github.com/AlexxIT/go2rtc/pkg/core.NewSender.func2()
/root/go2rtc/pkg/core/track.go:112 +0x48
github.com/AlexxIT/go2rtc/pkg/core.(*Sender).Start.func1()
/root/go2rtc/pkg/core/track.go:145 +0x6a
github.com/AlexxIT/go2rtc/pkg/core.(*Sender).Start.gowrap2()
/root/go2rtc/pkg/core/track.go:148 +0x41
This is a very complex and confusing part of go2rtc. I myself am confused about it and plan to rewrite it in the future.
From an outsider perspective, these look to be fairly safe changes to merge and could potentially solve some odd corner case issues (like the ones mentioned by the author). I have a mix of RTSP cameras that exhibit weird issues with two way audio sometimes killing streams, and I suspect that they may fix that.
Speaking from my own dev experience, a rewrite of a portion of code that has issues is always a good thing, but its also good to address issues with the existing code that impacts users in the meantime. At the very least, it could reduce the immediate need for the improvements resulting from a rewrite.
Thanks for the feedback. I've updated the PR to resolve conflicts.