firestack icon indicating copy to clipboard operation
firestack copied to clipboard

Endpoint independent mapping slows down bandwidth

Open ignoramous opened this issue 11 months ago • 16 comments

Which commit of hussainmohd-a/rethink-app can be built with the latest celzero/firestack? rethink on my phone is outdated.

Interesting. Would such an impl (essentially, half-duplex?) make https://github.com/celzero/firestack/issues/77 work better?

I don't know, but current firestack's Endpoint-Independent Mapping mode need optimization. I tested rethink and my gvisor-playground with quic-go/perf, rethink's handling of UDP ( single flow ) seems slower. https://github.com/quic-go/perf

Commands:

quic-go-perf --server-address=<server ip>:<server port> --upload-bytes=1G
quic-go-perf --server-address=<server ip>:<server port> --download-bytes=1G

( "upload" means from emulator to host )

rethink 63868e7 + firestack aa87903 + Endpoint-Independent Mapping disabled upload: 91.1 MiB/s download: 89.7 MiB/s

rethink 63868e7 + firestack aa87903 + Endpoint-Independent Mapping enabled upload: 67.3 MiB/s download: 60.8 MiB/s

Lanius-collaris/gvisor-playground 7dce2ae upload: 95.6 MiB/s download: 89.8 MiB/s

Originally posted by @Lanius-collaris in #123

Most likely due to synchronization mechanics (chans/mutexes) in udpmux.go

ignoramous avatar Feb 08 '25 14:02 ignoramous

@Lanius-collaris, I'm currently cleaning up the code and will share the commit details with you in a day or two.

hussainmohd-a avatar Feb 09 '25 15:02 hussainmohd-a

@Lanius-collaris You can use this(0a885cf) commit to get the latest.

hussainmohd-a avatar Feb 15 '25 14:02 hussainmohd-a

Update 1: prevent a nil pointer dereference

This patch improves inbound throughput, but may cause new bugs:

diff --git a/intra/udp.go b/intra/udp.go
index 1add4b0..581a550 100644
--- a/intra/udp.go
+++ b/intra/udp.go
@@ -223,7 +223,23 @@ func (h *udpHandler) proxy(gconn *netstack.GUDPConn, src, dst netip.AddrPort, dm
 	h.conntracker.Track(cid, gconn, remote)
 	core.Go("udp.forward: "+cid, func() {
 		defer h.conntracker.Untrack(cid)
-		forward(gconn, &rwext{remote}, h.smmch, h.done, smm)
+		if dmx == nil {
+			forward(gconn, &rwext{remote}, h.smmch, h.done, smm)
+		} else {
+			defer gconn.Close()
+			var buf [65536]byte
+			for {
+				gconn.SetReadDeadline(time.Now().Add(udptimeout))
+				n, err := gconn.Read(buf[:])
+				if err != nil {
+					break
+				}
+				_, err = remote.Write(buf[:n])
+				if err != nil {
+					break
+				}
+			}
+		}
 	})
 	return true // ok
 }
@@ -318,7 +334,7 @@ func (h *udpHandler) Connect(gconn *netstack.GUDPConn, src, target netip.AddrPor
 	for i, dstipp := range actualTargets {
 		selectedTarget = dstipp
 		if mux { // mux is not supported by all proxies (few like Exit, Base, WG support it)
-			pc, err = h.mux.associate(cid, pid, uid, src, selectedTarget, px.Dialer().Announce, vendor(dmx))
+			pc, err = h.mux.associate(cid, pid, uid, src, selectedTarget, px.Dialer().Announce, vendor(dmx), gconn)
 		} else {
 			pc, err = px.Dialer().Dial("udp", selectedTarget.String())
 		}
diff --git a/intra/udpmux.go b/intra/udpmux.go
index 0862e6f..06dfb3a 100644
--- a/intra/udpmux.go
+++ b/intra/udpmux.go
@@ -18,6 +18,8 @@ import (
 
 	"github.com/celzero/firestack/intra/core"
 	"github.com/celzero/firestack/intra/log"
+
+	"bytes"
 )
 
 // from: github.com/pion/transport/blob/03c807b/udp/conn.go
@@ -99,6 +101,8 @@ type demuxconn struct {
 	rt  *time.Ticker  // read deadline
 	wto time.Duration // write timeout
 	rto time.Duration // read timeout
+
+	realConn net.Conn
 }
 
 // slice is a byte slice v and its recycler free.
@@ -192,17 +196,25 @@ func (x *muxer) readers() {
 		_ = x.stop() // stop muxer
 	}()
 
+	bptr := core.AllocRegion(core.B65536)
+	b := *bptr
+	b = b[:cap(b)]
+	// todo: if panics are recovered above, free() may never be called
+	free := func() {
+		*bptr = b
+		core.Recycle(bptr)
+	}
+	defer free()
+
+	var t1 [4]byte
+	oldWho := &net.UDPAddr{
+		IP:   t1[:],
+		Port: 0,
+	}
+	var dst *demuxconn = nil
 	timeouterrors := 0
 	for {
-		bptr := core.AllocRegion(core.B65536)
-		b := *bptr
-		b = b[:cap(b)]
-		// todo: if panics are recovered above, free() may never be called
-		free := func() {
-			*bptr = b
-			core.Recycle(bptr)
-		}
-
+		x.mxconn.SetReadDeadline(time.Now().Add(udptimeout))
 		n, who, err := x.mxconn.ReadFrom(b)
 
 		x.stats.tx.Add(uint32(n)) // upload
@@ -210,7 +222,6 @@ func (x *muxer) readers() {
 		if timedout(err) {
 			timeouterrors++
 			if timeouterrors < maxtimeouterrors {
-				x.extend(time.Now().Add(udptimeout))
 				log.D("udp: mux: %s read timeout(%d): %v", x.cid, timeouterrors, err)
 				continue
 			} // else: err out
@@ -224,15 +235,21 @@ func (x *muxer) readers() {
 			continue
 		}
 
-		// may be an existing route or a new route
-		if dst := x.route(addr2netip(who), ingress); dst != nil {
-			select {
-			case dst.incomingCh <- &slice{v: b[:n], free: free}: // incomingCh is never closed
-			default: // dst probably closed, but not yet unrouted
-				log.W("udp: mux: %s read: drop(sz: %d); route to %s", x.cid, n, dst.raddr)
+		who2 := who.(*net.UDPAddr)
+		if !bytes.Equal(oldWho.IP, who2.IP) || oldWho.Port != who2.Port {
+			oldWho = who2
+			// may be an existing route or a new route
+			dst = x.route(addr2netip(who), ingress)
+		}
+		if dst != nil {
+			if dst.realConn == nil {
+				return
+			}
+			_, err = dst.realConn.Write(b[:n])
+			if err != nil {
+				return
 			}
-			log.V("udp: mux: %s read: n(%d) from %v <= %v; err %v", x.cid, n, dst, who, err)
-		} // else: ignore (who is invalid or x is closed)
+		}
 	}
 }
 
@@ -352,23 +369,10 @@ func (c *demuxconn) Read(p []byte) (int, error) {
 
 // Write implements core.UDPConn.Write
 func (c *demuxconn) Write(p []byte) (n int, err error) {
-	defer c.wt.Reset(c.wto)
-	sz := len(p)
-	select {
-	case <-c.wt.C:
-		log.W("udp: mux: %s demux: write: %v => %v; timeout (sz: %d)",
-			c.remux.id(), c.laddr, c.raddr, sz)
-		return 0, os.ErrDeadlineExceeded
-	case <-c.closed:
-		log.W("udp: mux: %s demux: write: %v => %v; closed (sz: %d)",
-			c.remux.id(), c.laddr, c.raddr, sz)
-		return 0, net.ErrClosed
-	default:
-		n, err = c.remux.sendto(p, c.raddr)
-		logev(err)("udp: mux: %s demux: write: %v => %v; done(sz: %d/%d); err? %v",
-			c.remux.id(), c.laddr, c.raddr, n, sz, err)
-		return n, err
-	}
+	n, err = c.remux.sendto(p, c.raddr)
+	logev(err)("udp: mux: %s demux: write: %v => %v; done(sz: %d/%d); err? %v",
+		c.remux.id(), c.laddr, c.raddr, n, len(p), err)
+	return n, err
 }
 
 // ReadFrom implements core.UDPConn.ReadFrom (unused)
@@ -491,7 +495,7 @@ func newMuxTable() *muxTable {
 	return &muxTable{t: make(map[netip.AddrPort]*muxer)}
 }
 
-func (e *muxTable) associate(cid, pid, uid string, src, dst netip.AddrPort, mk assocFn, v vendor) (_ net.Conn, err error) {
+func (e *muxTable) associate(cid, pid, uid string, src, dst netip.AddrPort, mk assocFn, v vendor, gconn net.Conn) (_ net.Conn, err error) {
 	e.Lock() // lock
 
 	var mxr *muxer
@@ -526,6 +530,23 @@ func (e *muxTable) associate(cid, pid, uid string, src, dst netip.AddrPort, mk a
 		return nil, errUidMismatch // return
 	}
 
+	if _, ok := mxr.routes[dst]; !ok {
+		mxr.routes[dst] = &demuxconn{
+			remux:      mxr,                          // muxer
+			laddr:      mxr.mxconn.LocalAddr(),       // listen addr
+			raddr:      net.UDPAddrFromAddrPort(dst), // remote addr
+			key:        dst,                          // key (same as raddr)
+			incomingCh: make(chan *slice),            // read from muxer
+			overflowCh: make(chan *slice),            // overflow from read
+			closed:     make(chan struct{}),          // always unbuffered
+			wt:         time.NewTicker(udptimeout),
+			rt:         time.NewTicker(udptimeout),
+			wto:        udptimeout,
+			rto:        udptimeout,
+			realConn:   gconn,
+		}
+	}
+
 	e.Unlock() // unlock
 	// do not hold e.lock on calls into mxr
 	c := mxr.route(dst, egress)

Lanius-collaris avatar Feb 19 '25 00:02 Lanius-collaris

Sorry, the patch I posted yesterday includes a serious DoS vulnerability.

Lanius-collaris avatar Feb 20 '25 03:02 Lanius-collaris

stack.Stack has a FindTransportEndpoint() method, it may be an alternative to muxer.route() , because tcpip.Endpoint created by udp.ForwarderRequest.CreateEndpoint() has been registered in stack.Stack https://github.com/google/gvisor/blob/738e1d995f64/pkg/tcpip/transport/udp/forwarder.go#L88

Lanius-collaris avatar Mar 21 '25 18:03 Lanius-collaris

stack.Stack has a FindTransportEndpoint() method, it may be an alternative to muxer.route()

As setup, to netstack, remote addrs are locally bound. And so, when a new route appears (from a remote ip:port udpmux is seeing for the first time on a unconnected socket), FindTransportEndpoint is unlikely to find any existing route (as none would exist bound to that remote ip:port), no?

ignoramous avatar Mar 25 '25 06:03 ignoramous

As setup, to netstack, remote addrs are locally bound. And so, when a new route appears (from a remote ip:port udpmux is seeing for the first time on a unconnected socket), FindTransportEndpoint is unlikely to find any existing route (as none would exist bound to that remote ip:port), no?

Yes. But you can construct UDP packets manually, creating an Endpoint may need longer code.

Lanius-collaris avatar Mar 27 '25 05:03 Lanius-collaris

But you can construct UDP packets manually, creating an Endpoint may need longer code.

True, but why won't gonet.DialUDP work 😭 So much simpler...

ignoramous avatar Mar 27 '25 07:03 ignoramous

  • Channels could be replaced with https://github.com/kelindar/event
  • Or: Map/Queues with https://github.com/puzpuzpuz/xsync

ignoramous avatar Jul 03 '25 20:07 ignoramous

https://github.com/Lanius-collaris/firestack/commit/510f04be5632ae15b01311c72c552fbfb8287473 improves both outbound and inbound throughput. Note that when settings.EndpointIndependentMapping == false, https://github.com/celzero/firestack/commit/c439342aec3a4be15bfd1cd4cb3df688b4a6a2dc is even slower than https://github.com/celzero/firestack/commit/aa87903c18f41cb9c5a3b24dcd037621ff3f997f , I can't understand. 🙃

Lanius-collaris avatar Jul 08 '25 04:07 Lanius-collaris

~gonet.DialUDP() doesn't seem to behave like net.ListenUDP()…… If raddr is nil, laddr.IP can't be an unspecified address, and laddr.Port can't be 0, otherwise gonet.UDPConn.WriteTo() will become unusable, this means, currently users can't send UDP packets if settings.EndpointIndependentMapping == true and these packets are routed to a wireguard interface.~

Note that when settings.EndpointIndependentMapping == false, https://github.com/celzero/firestack/commit/c439342aec3a4be15bfd1cd4cb3df688b4a6a2dc is even slower than https://github.com/celzero/firestack/commit/aa87903c18f41cb9c5a3b24dcd037621ff3f997f , I can't understand.

Don't mind, the reason is some early versions of my CLI frontend can't change the log level.

Lanius-collaris avatar Oct 17 '25 08:10 Lanius-collaris

True, but why won't gonet.DialUDP work 😭 So much simpler...

currently users can't send UDP packets if settings.EndpointIndependentMapping == true and these packets are routed to a wireguard interface

Hm, I kind of get what you're saying, but WireGuard doesn't use/rely on udpmux? udpmux via handler.ReverseProxy(tcp / udp) doesn't wire back any packets into WireGuard.

If raddr is nil, laddr.IP can't be an unspecified address, and laddr.Port can't be 0, otherwise gonet.UDPConn.WriteTo() will become unusable

I don't follow. gonet.DialUDP is used for ingress only when udpmux's raddr (to) (gonet's laddr) is already known (while gonet's raddr is fixed)?

ignoramous avatar Oct 17 '25 12:10 ignoramous

currently users can't send UDP packets if settings.EndpointIndependentMapping == true and these packets are routed to a wireguard interface

Hm, I kind of get what you're saying, but WireGuard doesn't use/rely on udpmux? udpmux via handler.ReverseProxy(tcp / udp) doesn't wire back any packets into WireGuard.

~You didn't get what I was saying, I mean Dialer().Announce("udp","0.0.0.0:0") or Dialer().Announce("udp","[::]:0") can't be convert into gonet.DialUDP() directly, you must choose local IP and port for it explicitly.~ https://github.com/celzero/firestack/blob/ebf2895126dbd4fc63102d70cbdf5f94912e8a7a/intra/udp.go#L310 https://github.com/celzero/firestack/blob/ebf2895126dbd4fc63102d70cbdf5f94912e8a7a/intra/udpmux.go#L616-L628

Lanius-collaris avatar Oct 17 '25 16:10 Lanius-collaris

Gotcha. The comment in the code says, src will be assigned if left nil (which it is when listening on unspecified ip:port) iff Netstack's NIC addrs are set...

https://github.com/celzero/firestack/blob/5485aa5b32591fb6f0763de47f0e19def44810e7/intra/ipn/wgnet.go#L253-L256

Possible I was mistaken when I wrote that comment.

ignoramous avatar Oct 18 '25 12:10 ignoramous

If raddr is nil, laddr.IP can't be an unspecified address, and laddr.Port can't be 0, otherwise gonet.UDPConn.WriteTo() will become unusable

Sorry, in fact gvisor can handle them, maybe I did something wrong when comparing the ipn module of firestack with the tun/netstack module of wireguard-go.

Lanius-collaris avatar Oct 19 '25 16:10 Lanius-collaris

Don't mind, the reason is some early versions of my CLI frontend can't change the log level.

The default log level is Info, which shouldn't (ideally) hurt bandwidth measures that much.

Is the CLI you're talking about in this PR? https://github.com/celzero/firestack/pull/189

ignoramous avatar Oct 22 '25 10:10 ignoramous