go-libp2p icon indicating copy to clipboard operation
go-libp2p copied to clipboard

BUG: Using transport reuse results in an extra listener on top of the original listener instead of replacing it

Open derrandz opened this issue 5 months ago • 12 comments

When using quic reuse to use your own udp listener and transport to intercept raw quic connections, libp2p still creates its own quic transport and doesn't replace it with the one from reuse. This results in having either errors with quic port reported as already being used by the original transport, or it results in having two separate quic addresses with different ports... And that's not what we are trying to achieve with reuse I assume. At least for our use case which inspired the whole quic reuse thing, we should end up with a single quic address that is intercepting raw connections and giving non-raw ones back to libp2p.

You'd think that in order to result in one quic transport, you'd skip giving transport option to the new libp2p instance, but that causes the transport from reuse to not be used and we end up with no quic listening addresses.

derrandz avatar Jul 21 '25 12:07 derrandz

Actually now, when I give a listen address with one the same quic port, and I use the re-use feature, libp2p can't even start I run into this bug:

2025/07/22 16:00:59 failed to sufficiently increase receive buffer size (was: 208 kiB, wanted: 7168 kiB, got: 416 kiB). See https://github.com/quic-go/quic-go/wiki/UDP-Buffer-Sizes for details.
panic: unexpected chan already found in accept muxer

goroutine 148 [running]:
github.com/libp2p/go-libp2p/p2p/transport/quic.(*acceptLoopRunner).AcceptForVersion(0xc000437580, 0x1)
        /home/gustavo/go/pkg/mod/github.com/libp2p/[email protected]/p2p/transport/quic/virtuallistener.go:61 +0x125
github.com/libp2p/go-libp2p/p2p/transport/quic.(*transport).Listen(0xc000172480, {0xc000416580, 0x3, 0x4})
        /home/gustavo/go/pkg/mod/github.com/libp2p/[email protected]/p2p/transport/quic/transport.go:338 +0x595
github.com/libp2p/go-libp2p/p2p/net/swarm.(*Swarm).AddListenAddr(0xc000356a00, {0xc000416580, 0x3, 0x4})
        /home/gustavo/go/pkg/mod/github.com/libp2p/[email protected]/p2p/net/swarm/swarm_listen.go:116 +0x70
github.com/libp2p/go-libp2p/p2p/net/swarm.(*Swarm).Listen(0xc000356a00, {0xc00046c360, 0x3, 0x1399e5d?})
        /home/gustavo/go/pkg/mod/github.com/libp2p/[email protected]/p2p/net/swarm/swarm_listen.go:53 +0x2de
github.com/libp2p/go-libp2p/config.(*Config).NewNode.func3.1({0x1ab7d60?, 0x2fc1020?})
        /home/gustavo/go/pkg/mod/github.com/libp2p/[email protected]/config/config.go:524 +0x30
go.uber.org/fx/internal/lifecycle.(*Lifecycle).runStartHook(0xc000000d20, {0x2fce6b0, 0x4ae4880}, {0xc00013a210, 0xc000049170, {0x0, 0x0}, {0x0, 0x0}, {{0xc000256200, ...}, ...}})
        /home/gustavo/go/pkg/mod/go.uber.org/[email protected]/internal/lifecycle/lifecycle.go:256 +0x1f2
go.uber.org/fx/internal/lifecycle.(*Lifecycle).Start(0xc000000d20, {0x2fce6b0, 0x4ae4880})
        /home/gustavo/go/pkg/mod/go.uber.org/[email protected]/internal/lifecycle/lifecycle.go:216 +0x468
go.uber.org/fx.(*App).start-fm.(*App).start.func1({0x2fce6b0?, 0x4ae4880?})
        /home/gustavo/go/pkg/mod/go.uber.org/[email protected]/app.go:702 +0x31
go.uber.org/fx.(*App).withRollback(0xc000797550, {0x2fce6b0, 0x4ae4880}, 0x2ba00000000?)
        /home/gustavo/go/pkg/mod/go.uber.org/[email protected]/app.go:684 +0x32
go.uber.org/fx.(*App).start(...)
        /home/gustavo/go/pkg/mod/go.uber.org/[email protected]/app.go:701
go.uber.org/fx.withTimeout.func1()
        /home/gustavo/go/pkg/mod/go.uber.org/[email protected]/app.go:801 +0x6b
created by go.uber.org/fx.withTimeout in goroutine 1
        /home/gustavo/go/pkg/mod/go.uber.org/[email protected]/app.go:789 +0xc5

derrandz avatar Jul 22 '25 16:07 derrandz

I am having a hard time reproducing this with a code snippet. This happens in a virtualized environment with a full build of our software. In unit tests I am able to use one single quic address and the problem described above doesn't show up, but in our e2e tests "port already in use" shows up again with conflicting transports. And in a virtualized environment for e2e tests, we see the panic mentioned above.

derrandz avatar Jul 23 '25 17:07 derrandz

After debugging this, I've come to find that if you supply duplicate quic addresses in the listen address, swarm.Listen in the start hook of fx will panic because it tries to AcceptVersion for an existing listener, which results in the panic.

derrandz avatar Jul 28 '25 16:07 derrandz

While trying to reproduce this issue, I ran into another BUG which I filed an issue for in https://github.com/libp2p/go-libp2p/issues/3345

derrandz avatar Jul 28 '25 16:07 derrandz

I can finally reproduce this bug using the following script:

package main

import (
	"context"
	"crypto/rand"
	"fmt"
	"log"
	"net"
	"time"

	"crypto/tls"

	"crypto/ed25519"
	"crypto/x509"
	"math/big"
	"slices"

	"github.com/libp2p/go-libp2p"
	dht "github.com/libp2p/go-libp2p-kad-dht"
	"github.com/libp2p/go-libp2p/core/crypto"
	"github.com/libp2p/go-libp2p/core/host"
	"github.com/libp2p/go-libp2p/core/peer"
	"github.com/libp2p/go-libp2p/core/protocol"
	"github.com/libp2p/go-libp2p/core/routing"
	"github.com/libp2p/go-libp2p/p2p/host/autorelay"
	"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
	rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
	"github.com/libp2p/go-libp2p/p2p/net/connmgr"
	"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
	"github.com/libp2p/go-libp2p/p2p/security/noise"
	libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls"
	libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic"
	"github.com/libp2p/go-libp2p/p2p/transport/quicreuse"
	"github.com/libp2p/go-libp2p/p2p/transport/tcp"
	ws "github.com/libp2p/go-libp2p/p2p/transport/websocket"
	webtransport "github.com/libp2p/go-libp2p/p2p/transport/webtransport"
	ma "github.com/multiformats/go-multiaddr"
	mafmt "github.com/multiformats/go-multiaddr-fmt"
	manet "github.com/multiformats/go-multiaddr/net"
	"github.com/quic-go/quic-go"
)



func main() {
	// Generate a private key for testing
	privKey, _, err := crypto.GenerateEd25519Key(rand.Reader)
	if err != nil {
		log.Fatalf("Failed to generate private key: %v", err)
	}

	// Create the libp2p config with duplicate QUIC listen addresses
	listenAddresses := []string{
		"/ip4/0.0.0.0/tcp/0",
		"/ip4/0.0.0.0/udp/1234/quic-v1", // First QUIC address
		"/ip4/0.0.0.0/udp/1234/quic-v1", // Duplicate QUIC address - this triggers the bug
	}

	// Create connection manager
	connmgr, err := connmgr.NewConnManager(
		100,
		400,
		connmgr.WithGracePeriod(10*time.Second),
	)
	if err != nil {
		log.Fatalf("Failed to create connection manager: %v", err)
	}

	// Create peerstore
	ps, err := pstoremem.NewPeerstore()
	if err != nil {
		log.Fatalf("Failed to create peerstore: %v", err)
	}

	// Set up resource manager
	mem := int64(1024 * 1024 * 1024) // 1GB
	fds := 512

	limits := rcmgr.DefaultLimits
	limits.SystemBaseLimit.ConnsInbound = 512
	limits.SystemBaseLimit.ConnsOutbound = 512
	limits.SystemBaseLimit.Conns = 1024
	limits.SystemBaseLimit.StreamsInbound = 8192
	limits.SystemBaseLimit.StreamsOutbound = 8192
	limits.SystemBaseLimit.Streams = 16384
	scaled := limits.Scale(mem, fds)

	mgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(scaled))
	if err != nil {
		log.Fatalf("Failed to create resource manager: %v", err)
	}

	// Create UDP connection for RawQuicTransport
	udpConn, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.IPv4zero, Port: 1234})
	if err != nil {
		log.Fatalf("Failed to create UDP listener: %v", err)
	}

	// Create RawQuicTransport
	rawTransport := NewRawQUICTransport(udpConn)

	// Create QUIC reuse function that uses RawQuicTransport
	newReuse := func(statelessResetKey quic.StatelessResetKey, tokenGeneratorKey quic.TokenGeneratorKey) (*quicreuse.ConnManager, error) {
		reuse, err := quicreuse.NewConnManager(statelessResetKey, tokenGeneratorKey)
		if err != nil {
			return nil, fmt.Errorf("failed to create reuse: %w", err)
		}

		// This is where the bug should occur - trying to lend the same transport twice
		trDone, err := reuse.LendTransport("udp4", rawTransport, udpConn)
		if err != nil {
			return nil, fmt.Errorf("failed to add transport to reuse: %w", err)
		}

		go func() {
			<-trDone
			fmt.Println("closing UDP connection")
			udpConn.Close()
		}()

		// Use standard libp2p QUIC listener
		_, err = reuse.ListenQUIC(
			ma.StringCast("/ip4/0.0.0.0/udp/1234/quic-v1"),
			&tls.Config{NextProtos: []string{"raw"}},
			func(*quic.Conn, uint64) bool { return false },
		)
		if err != nil {
			return nil, fmt.Errorf("failed to listen quic: %w", err)
		}

		return reuse, nil
	}

	fmt.Printf("Final listen addresses: %v\n", listenAddresses)

	// Create libp2p options
	var libp2pOpts []libp2p.Option
	dhtOpts := []dht.Option{
		dht.ProtocolPrefix(protocol.ID("/nunet")),
		dht.Mode(dht.ModeAutoServer),
	}

	libp2pOpts = append(libp2pOpts,
		libp2p.ListenAddrStrings(listenAddresses...),
		libp2p.ResourceManager(mgr),
		libp2p.Identity(privKey),
		libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
			idht, err := dht.New(context.Background(), h, dhtOpts...)
			return idht, err
		}),
		libp2p.Peerstore(ps),
		libp2p.Security(libp2ptls.ID, libp2ptls.New),
		libp2p.Security(noise.ID, noise.New),
		libp2p.ChainOptions(
			libp2p.Transport(tcp.NewTCPTransport),
			libp2p.Transport(libp2pquic.NewTransport),
			libp2p.Transport(webtransport.New),
			libp2p.Transport(ws.New),
		),
		libp2p.ConnectionManager(connmgr),
		libp2p.EnableRelay(),
		libp2p.EnableHolePunching(),
		libp2p.EnableRelayService(
			relay.WithLimit(&relay.RelayLimit{
				Duration: 5 * time.Minute,
				Data:     1 << 21, // 2 MiB
			}),
		),
		libp2p.EnableAutoRelayWithPeerSource(
			func(ctx context.Context, num int) <-chan peer.AddrInfo {
				r := make(chan peer.AddrInfo)
				go func() {
					defer close(r)
					for i := 0; i < num; i++ {
						select {
						case <-ctx.Done():
							return
						default:
							// No peers to provide
						}
					}
				}()
				return r
			},
			autorelay.WithBootDelay(time.Minute),
			autorelay.WithBackoff(30*time.Second),
			autorelay.WithMinCandidates(2),
			autorelay.WithMaxCandidates(3),
			autorelay.WithNumRelays(2),
		),
		libp2p.QUICReuse(newReuse),
	)

	// This is where the bug should occur
	host, err := libp2p.New(libp2pOpts...)
	if err != nil {
		fmt.Printf("Error creating libp2p host: %v\n", err)
		fmt.Println("This confirms the bug is reproducible with duplicate QUIC addresses!")
		return
	}

	// Clean up
	if host != nil {
		host.Close()
	}
}

// RawQUICTransport is a custom QUIC transport that can intercept raw QUIC connections
type RawQUICTransport struct {
	*quic.Transport
	listener *interceptingListener
	network  *Network // Simplified network reference

	listenerReady chan struct{}
}

type interceptingListener struct {
	intercept []string

	acceptQueue chan *quic.Conn
	quicreuse.QUICListener
}

// Network is a simplified network structure for the bug reproduction
type Network struct {
	config *Config
}

// Config holds the configuration for the network
type Config struct {
	PrivateKey PrivateKeyInterface
}

// PrivateKeyInterface defines the interface for private keys
type PrivateKeyInterface interface {
	Raw() ([]byte, error)
}

// testPrivateKey is a simplified private key implementation for testing
type testPrivateKey struct{}

func (c *testPrivateKey) Raw() ([]byte, error) {
	// Generate a random private key for testing
	priv, _, err := ed25519.GenerateKey(rand.Reader)
	if err != nil {
		return nil, err
	}
	return priv, nil
}

func NewRawQUICTransport(udpConn *net.UDPConn) *RawQUICTransport {
	return &RawQUICTransport{
		Transport:     &quic.Transport{Conn: udpConn},
		listenerReady: make(chan struct{}),
	}
}

func (t *RawQUICTransport) Listen(tlsConf *tls.Config, conf *quic.Config) (quicreuse.QUICListener, error) {
	wrappedConf := &tls.Config{
		GetConfigForClient: func(info *tls.ClientHelloInfo) (*tls.Config, error) {
			// In this example, we assume that we want to intercept QUIC connection that use the "raw" ALPN token.
			// Any field on the tls.ClientHelloInfo can be used to select the tls.Config to use,
			// for example the server name.
			if slices.Contains(info.SupportedProtos, "raw") {
				priv, err := t.network.config.PrivateKey.Raw()
				if err != nil {
					return nil, fmt.Errorf("failed to get priv key to generate tls cert: %w", err)
				}

				var cert *tls.Certificate
				cert, err = generateSelfSignedCert(ed25519.PrivateKey(priv), []string{info.ServerName})
				if err != nil {
					return nil, fmt.Errorf("failed to generate self signed cert: %w", err)
				}
				return &tls.Config{
					ClientAuth:            tls.RequireAnyClientCert,
					Certificates:          []tls.Certificate{*cert},
					NextProtos:            []string{"raw"},
					InsecureSkipVerify:    false,
					VerifyPeerCertificate: makeVerifySubnetPeerCertificateFn(t.network),
				}, nil
			}
			// use libp2p's tls.Config
			if tlsConf.GetConfigForClient != nil {
				return tlsConf.GetConfigForClient(info)
			}
			return tlsConf, nil
		},
	}
	ln, err := t.Transport.Listen(wrappedConf, conf)
	if err != nil {
		return nil, err
	}
	t.listener = newInterceptingListener(ln, []string{"raw"})

	close(t.listenerReady)

	return t.listener, nil
}

func newInterceptingListener(ln quicreuse.QUICListener, intercept []string) *interceptingListener {
	return &interceptingListener{
		intercept:    intercept,
		acceptQueue:  make(chan *quic.Conn, 32),
		QUICListener: ln,
	}
}

func (l *interceptingListener) Accept(ctx context.Context) (*quic.Conn, error) {
start:
	conn, err := l.QUICListener.Accept(ctx)
	if err != nil {
		return nil, err
	}
	if conn.ConnectionState().TLS.NegotiatedProtocol == "raw" {
		fmt.Printf("intercepting a raw connection from: %s\n", conn.RemoteAddr())
		l.acceptQueue <- conn
		goto start
	}
	fmt.Printf("accepting a non-raw connection from: %s\n", conn.RemoteAddr())
	return conn, nil
}

// Simplified certificate verification function for the bug reproduction
func makeVerifySubnetPeerCertificateFn(n *Network) func([][]byte, [][]*x509.Certificate) error {
	return func(rawCerts [][]byte, _ [][]*x509.Certificate) error {
		// Simplified verification - just check if certificate exists
		if len(rawCerts) == 0 {
			return fmt.Errorf("no certificate provided")
		}

		cert, err := x509.ParseCertificate(rawCerts[0])
		if err != nil {
			return fmt.Errorf("failed to parse certificate: %v", err)
		}

		// Check expiration
		if time.Now().Before(cert.NotBefore) || time.Now().After(cert.NotAfter) {
			return fmt.Errorf("certificate is expired or not yet valid")
		}

		// For the bug reproduction, we'll accept any valid certificate
		return nil
	}
}

func generateSelfSignedCert(priv ed25519.PrivateKey, dnsNames []string) (*tls.Certificate, error) {
	// Generate a new ed25519 key pair
	pub := priv.Public().(ed25519.PublicKey)

	// Create a certificate template
	template := &x509.Certificate{
		SerialNumber: big.NewInt(1),
		NotBefore:    time.Now().Add(-time.Hour),           // Valid from 1 hour ago
		NotAfter:     time.Now().Add(24 * time.Hour * 365), // Valid for 1 year
		KeyUsage:     x509.KeyUsageDigitalSignature,
		ExtKeyUsage:  []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
		DNSNames:     dnsNames,
	}

	// Create the certificate
	certDER, err := x509.CreateCertificate(rand.Reader, template, template, pub, priv)
	if err != nil {
		return nil, err
	}

	// Create the tls.Certificate
	cert := &tls.Certificate{
		Certificate: [][]byte{certDER},
		PrivateKey:  priv,
	}

	return cert, nil
}

derrandz avatar Jul 28 '25 20:07 derrandz

A couple of issues in your code snippet:

"/ip4/0.0.0.0/udp/1234/quic-v1", // First QUIC address
"/ip4/0.0.0.0/udp/1234/quic-v1", // Duplicate QUIC address - this triggers the bug

It's an error to pass duplicate addresses to ListenAddrs.

  1. Why are you calling reuse.ListenQUIC( inside your newReuse function?

MarcoPolo avatar Jul 28 '25 22:07 MarcoPolo

  1. Whether it's an error or not, it results in an unclear panic which seems to me like an unhandled case. Passing two identical addresses should maybe be flagged? or simply ignored? dunno

  2. Isn't this how it's supposed to be? I remember taking this from the reuse example from marten if I am not mistaken. Please tell me whats the correct usage.

derrandz avatar Jul 28 '25 22:07 derrandz

Passing two identical addresses should maybe be flagged?

Sure! Feel free to open a PR for this.

I remember taking this from the reuse example from marten if I am not mistaken. Please tell me whats the correct usage.

LendTransport is a sharp API. What are you trying actually trying to do? Would https://github.com/libp2p/go-libp2p/pull/3338 help instead?

MarcoPolo avatar Jul 28 '25 22:07 MarcoPolo

Not too sure about #3338 but what we are trying to do is what inspired the whole reuse feature, we are implementing the RFC 9484 with raw quic conns and stuff. Are you saying that reuse.ListenQUIC is unnecessary? I need to control how we listen to pass in raw as a value for tls NextProtos next to libp2p

In our implementation, when we don't manually call reuse.ListenQUIC the raw transport that's supposed to replace the default quic transport in libp2p never becomes ready and never listens.. hence why I do this.

derrandz avatar Jul 29 '25 14:07 derrandz

I don't think there's an actual bug here (besides a friendlier error message). This has become a support issue on a pretty sharp API meant for advanced use cases. That said, I'll still try to help you out with some pointers.

  • RFC 9484 is about running IP over HTTP. You probably want to use h3 as your next protos rather than raw.
  • You shouldn't be manually calling reuse.ListenQUIC. You aren't replacing a default quic transport, you are providing an alternative (already instantiated) QUIC transport.
  • If you're trying to modify the TLS config, you should wrap your quic transport with a new type that intercepts the call to Listen, and make your changes there.

MarcoPolo avatar Jul 29 '25 19:07 MarcoPolo

Thank you for your response and clarification. I will close this issue with the friendlier error msg PR.

derrandz avatar Jul 29 '25 21:07 derrandz

I noticed that supplying listen addresses as 127.0.0.1 results in the transport never starting or listening, until changed to 0.0.0.0, can you explain why?

derrandz avatar Jul 29 '25 21:07 derrandz