Endpoint independent mapping slows down bandwidth
Which commit of
hussainmohd-a/rethink-appcan be built with the latestcelzero/firestack? rethink on my phone is outdated.Interesting. Would such an impl (essentially, half-duplex?) make https://github.com/celzero/firestack/issues/77 work better?
I don't know, but current firestack's Endpoint-Independent Mapping mode need optimization. I tested rethink and my gvisor-playground with quic-go/perf, rethink's handling of UDP ( single flow ) seems slower. https://github.com/quic-go/perf
Commands:
quic-go-perf --server-address=<server ip>:<server port> --upload-bytes=1G quic-go-perf --server-address=<server ip>:<server port> --download-bytes=1G( "upload" means from emulator to host )
rethink 63868e7 + firestack aa87903 + Endpoint-Independent Mapping disabled upload: 91.1 MiB/s download: 89.7 MiB/s
rethink 63868e7 + firestack aa87903 + Endpoint-Independent Mapping enabled upload: 67.3 MiB/s download: 60.8 MiB/s
Lanius-collaris/gvisor-playground 7dce2ae upload: 95.6 MiB/s download: 89.8 MiB/s
Originally posted by @Lanius-collaris in #123
Most likely due to synchronization mechanics (chans/mutexes) in udpmux.go
@Lanius-collaris, I'm currently cleaning up the code and will share the commit details with you in a day or two.
@Lanius-collaris You can use this(0a885cf) commit to get the latest.
Update 1: prevent a nil pointer dereference
This patch improves inbound throughput, but may cause new bugs:
diff --git a/intra/udp.go b/intra/udp.go
index 1add4b0..581a550 100644
--- a/intra/udp.go
+++ b/intra/udp.go
@@ -223,7 +223,23 @@ func (h *udpHandler) proxy(gconn *netstack.GUDPConn, src, dst netip.AddrPort, dm
h.conntracker.Track(cid, gconn, remote)
core.Go("udp.forward: "+cid, func() {
defer h.conntracker.Untrack(cid)
- forward(gconn, &rwext{remote}, h.smmch, h.done, smm)
+ if dmx == nil {
+ forward(gconn, &rwext{remote}, h.smmch, h.done, smm)
+ } else {
+ defer gconn.Close()
+ var buf [65536]byte
+ for {
+ gconn.SetReadDeadline(time.Now().Add(udptimeout))
+ n, err := gconn.Read(buf[:])
+ if err != nil {
+ break
+ }
+ _, err = remote.Write(buf[:n])
+ if err != nil {
+ break
+ }
+ }
+ }
})
return true // ok
}
@@ -318,7 +334,7 @@ func (h *udpHandler) Connect(gconn *netstack.GUDPConn, src, target netip.AddrPor
for i, dstipp := range actualTargets {
selectedTarget = dstipp
if mux { // mux is not supported by all proxies (few like Exit, Base, WG support it)
- pc, err = h.mux.associate(cid, pid, uid, src, selectedTarget, px.Dialer().Announce, vendor(dmx))
+ pc, err = h.mux.associate(cid, pid, uid, src, selectedTarget, px.Dialer().Announce, vendor(dmx), gconn)
} else {
pc, err = px.Dialer().Dial("udp", selectedTarget.String())
}
diff --git a/intra/udpmux.go b/intra/udpmux.go
index 0862e6f..06dfb3a 100644
--- a/intra/udpmux.go
+++ b/intra/udpmux.go
@@ -18,6 +18,8 @@ import (
"github.com/celzero/firestack/intra/core"
"github.com/celzero/firestack/intra/log"
+
+ "bytes"
)
// from: github.com/pion/transport/blob/03c807b/udp/conn.go
@@ -99,6 +101,8 @@ type demuxconn struct {
rt *time.Ticker // read deadline
wto time.Duration // write timeout
rto time.Duration // read timeout
+
+ realConn net.Conn
}
// slice is a byte slice v and its recycler free.
@@ -192,17 +196,25 @@ func (x *muxer) readers() {
_ = x.stop() // stop muxer
}()
+ bptr := core.AllocRegion(core.B65536)
+ b := *bptr
+ b = b[:cap(b)]
+ // todo: if panics are recovered above, free() may never be called
+ free := func() {
+ *bptr = b
+ core.Recycle(bptr)
+ }
+ defer free()
+
+ var t1 [4]byte
+ oldWho := &net.UDPAddr{
+ IP: t1[:],
+ Port: 0,
+ }
+ var dst *demuxconn = nil
timeouterrors := 0
for {
- bptr := core.AllocRegion(core.B65536)
- b := *bptr
- b = b[:cap(b)]
- // todo: if panics are recovered above, free() may never be called
- free := func() {
- *bptr = b
- core.Recycle(bptr)
- }
-
+ x.mxconn.SetReadDeadline(time.Now().Add(udptimeout))
n, who, err := x.mxconn.ReadFrom(b)
x.stats.tx.Add(uint32(n)) // upload
@@ -210,7 +222,6 @@ func (x *muxer) readers() {
if timedout(err) {
timeouterrors++
if timeouterrors < maxtimeouterrors {
- x.extend(time.Now().Add(udptimeout))
log.D("udp: mux: %s read timeout(%d): %v", x.cid, timeouterrors, err)
continue
} // else: err out
@@ -224,15 +235,21 @@ func (x *muxer) readers() {
continue
}
- // may be an existing route or a new route
- if dst := x.route(addr2netip(who), ingress); dst != nil {
- select {
- case dst.incomingCh <- &slice{v: b[:n], free: free}: // incomingCh is never closed
- default: // dst probably closed, but not yet unrouted
- log.W("udp: mux: %s read: drop(sz: %d); route to %s", x.cid, n, dst.raddr)
+ who2 := who.(*net.UDPAddr)
+ if !bytes.Equal(oldWho.IP, who2.IP) || oldWho.Port != who2.Port {
+ oldWho = who2
+ // may be an existing route or a new route
+ dst = x.route(addr2netip(who), ingress)
+ }
+ if dst != nil {
+ if dst.realConn == nil {
+ return
+ }
+ _, err = dst.realConn.Write(b[:n])
+ if err != nil {
+ return
}
- log.V("udp: mux: %s read: n(%d) from %v <= %v; err %v", x.cid, n, dst, who, err)
- } // else: ignore (who is invalid or x is closed)
+ }
}
}
@@ -352,23 +369,10 @@ func (c *demuxconn) Read(p []byte) (int, error) {
// Write implements core.UDPConn.Write
func (c *demuxconn) Write(p []byte) (n int, err error) {
- defer c.wt.Reset(c.wto)
- sz := len(p)
- select {
- case <-c.wt.C:
- log.W("udp: mux: %s demux: write: %v => %v; timeout (sz: %d)",
- c.remux.id(), c.laddr, c.raddr, sz)
- return 0, os.ErrDeadlineExceeded
- case <-c.closed:
- log.W("udp: mux: %s demux: write: %v => %v; closed (sz: %d)",
- c.remux.id(), c.laddr, c.raddr, sz)
- return 0, net.ErrClosed
- default:
- n, err = c.remux.sendto(p, c.raddr)
- logev(err)("udp: mux: %s demux: write: %v => %v; done(sz: %d/%d); err? %v",
- c.remux.id(), c.laddr, c.raddr, n, sz, err)
- return n, err
- }
+ n, err = c.remux.sendto(p, c.raddr)
+ logev(err)("udp: mux: %s demux: write: %v => %v; done(sz: %d/%d); err? %v",
+ c.remux.id(), c.laddr, c.raddr, n, len(p), err)
+ return n, err
}
// ReadFrom implements core.UDPConn.ReadFrom (unused)
@@ -491,7 +495,7 @@ func newMuxTable() *muxTable {
return &muxTable{t: make(map[netip.AddrPort]*muxer)}
}
-func (e *muxTable) associate(cid, pid, uid string, src, dst netip.AddrPort, mk assocFn, v vendor) (_ net.Conn, err error) {
+func (e *muxTable) associate(cid, pid, uid string, src, dst netip.AddrPort, mk assocFn, v vendor, gconn net.Conn) (_ net.Conn, err error) {
e.Lock() // lock
var mxr *muxer
@@ -526,6 +530,23 @@ func (e *muxTable) associate(cid, pid, uid string, src, dst netip.AddrPort, mk a
return nil, errUidMismatch // return
}
+ if _, ok := mxr.routes[dst]; !ok {
+ mxr.routes[dst] = &demuxconn{
+ remux: mxr, // muxer
+ laddr: mxr.mxconn.LocalAddr(), // listen addr
+ raddr: net.UDPAddrFromAddrPort(dst), // remote addr
+ key: dst, // key (same as raddr)
+ incomingCh: make(chan *slice), // read from muxer
+ overflowCh: make(chan *slice), // overflow from read
+ closed: make(chan struct{}), // always unbuffered
+ wt: time.NewTicker(udptimeout),
+ rt: time.NewTicker(udptimeout),
+ wto: udptimeout,
+ rto: udptimeout,
+ realConn: gconn,
+ }
+ }
+
e.Unlock() // unlock
// do not hold e.lock on calls into mxr
c := mxr.route(dst, egress)
Sorry, the patch I posted yesterday includes a serious DoS vulnerability.
stack.Stack has a FindTransportEndpoint() method, it may be an alternative to muxer.route() , because tcpip.Endpoint created by udp.ForwarderRequest.CreateEndpoint() has been registered in stack.Stack
https://github.com/google/gvisor/blob/738e1d995f64/pkg/tcpip/transport/udp/forwarder.go#L88
stack.Stack has a FindTransportEndpoint() method, it may be an alternative to muxer.route()
As setup, to netstack, remote addrs are locally bound. And so, when a new route appears (from a remote ip:port udpmux is seeing for the first time on a unconnected socket), FindTransportEndpoint is unlikely to find any existing route (as none would exist bound to that remote ip:port), no?
As setup, to netstack, remote addrs are locally bound. And so, when a new route appears (from a remote ip:port udpmux is seeing for the first time on a unconnected socket), FindTransportEndpoint is unlikely to find any existing route (as none would exist bound to that remote ip:port), no?
Yes. But you can construct UDP packets manually, creating an Endpoint may need longer code.
But you can construct UDP packets manually, creating an Endpoint may need longer code.
True, but why won't gonet.DialUDP work 😭 So much simpler...
- Channels could be replaced with https://github.com/kelindar/event
- Or: Map/Queues with https://github.com/puzpuzpuz/xsync
https://github.com/Lanius-collaris/firestack/commit/510f04be5632ae15b01311c72c552fbfb8287473 improves both outbound and inbound throughput. Note that when settings.EndpointIndependentMapping == false, https://github.com/celzero/firestack/commit/c439342aec3a4be15bfd1cd4cb3df688b4a6a2dc is even slower than https://github.com/celzero/firestack/commit/aa87903c18f41cb9c5a3b24dcd037621ff3f997f , I can't understand. 🙃
~gonet.DialUDP() doesn't seem to behave like net.ListenUDP()…… If raddr is nil, laddr.IP can't be an unspecified address, and laddr.Port can't be 0, otherwise gonet.UDPConn.WriteTo() will become unusable, this means, currently users can't send UDP packets if settings.EndpointIndependentMapping == true and these packets are routed to a wireguard interface.~
Note that when settings.EndpointIndependentMapping == false, https://github.com/celzero/firestack/commit/c439342aec3a4be15bfd1cd4cb3df688b4a6a2dc is even slower than https://github.com/celzero/firestack/commit/aa87903c18f41cb9c5a3b24dcd037621ff3f997f , I can't understand.
Don't mind, the reason is some early versions of my CLI frontend can't change the log level.
True, but why won't gonet.DialUDP work 😭 So much simpler...
currently users can't send UDP packets if settings.EndpointIndependentMapping == true and these packets are routed to a wireguard interface
Hm, I kind of get what you're saying, but WireGuard doesn't use/rely on udpmux? udpmux via handler.ReverseProxy(tcp / udp) doesn't wire back any packets into WireGuard.
If raddr is nil, laddr.IP can't be an unspecified address, and laddr.Port can't be 0, otherwise gonet.UDPConn.WriteTo() will become unusable
I don't follow. gonet.DialUDP is used for ingress only when udpmux's raddr (to) (gonet's laddr) is already known (while gonet's raddr is fixed)?
currently users can't send UDP packets if settings.EndpointIndependentMapping == true and these packets are routed to a wireguard interface
Hm, I kind of get what you're saying, but WireGuard doesn't use/rely on udpmux? udpmux via
handler.ReverseProxy(tcp / udp) doesn't wire back any packets into WireGuard.
~You didn't get what I was saying, I mean Dialer().Announce("udp","0.0.0.0:0") or Dialer().Announce("udp","[::]:0") can't be convert into gonet.DialUDP() directly, you must choose local IP and port for it explicitly.~
https://github.com/celzero/firestack/blob/ebf2895126dbd4fc63102d70cbdf5f94912e8a7a/intra/udp.go#L310
https://github.com/celzero/firestack/blob/ebf2895126dbd4fc63102d70cbdf5f94912e8a7a/intra/udpmux.go#L616-L628
Gotcha. The comment in the code says, src will be assigned if left nil (which it is when listening on unspecified ip:port) iff Netstack's NIC addrs are set...
https://github.com/celzero/firestack/blob/5485aa5b32591fb6f0763de47f0e19def44810e7/intra/ipn/wgnet.go#L253-L256
Possible I was mistaken when I wrote that comment.
If raddr is nil, laddr.IP can't be an unspecified address, and laddr.Port can't be 0, otherwise gonet.UDPConn.WriteTo() will become unusable
Sorry, in fact gvisor can handle them, maybe I did something wrong when comparing the ipn module of firestack with the tun/netstack module of wireguard-go.
Don't mind, the reason is some early versions of my CLI frontend can't change the log level.
The default log level is Info, which shouldn't (ideally) hurt bandwidth measures that much.
Is the CLI you're talking about in this PR? https://github.com/celzero/firestack/pull/189