lnd icon indicating copy to clipboard operation
lnd copied to clipboard

lnp2p: add new simplified package for interacting with the p2p network

Open Roasbeef opened this issue 3 months ago • 1 comments

I was working on creating some ad hoc tests to examine the p2p performance behavior of some new PRs I was working on, and reached for the peer.Brontide package. However, it fell short, as the package is very tightly coupled to all the sub-systems it interacts with, and also carries several assumptions re the way that lnd is set up.

This PR is an attempt to introduce a new pacakge lnp2p, that implements the bare essentials of p2p connectivity (brontide handshake, init sending, ping handling, etc). This can be used to write ad hoc tests, and even expand our integration tests to test things like protocol violations, as we can use this package to basically MiTM between sub systems.

I've also built on top of my actor package to show what a version of the peer as an actor would look like. The final version is pretty barebores, but it's easy to see how goroutines like the ping checker can actually be implemented as an actor that uses a MessageSink to only filter for ping messages, then issues a disconnect message if violated, etc, etc.

Keeping it in draft for now, as it'll likely evolve as I continue to write these p2p tests.

Diff looks large, but it's mostly tests as I was shooting for 90% + test coverage. Likely some duplication there as well.

Ideally we can eventually use this directly in peer.Brontide, but I think we shouldn't rush quite into that, as we've seen the recent impact of p2p bugs and how that can destabalize nodes. So for now, we can have it just be a package for experiments and tests.

Roasbeef avatar Sep 17 '25 01:09 Roasbeef

An example of the base API:

target, err := lnp2p.ParseNodeAddress("[email protected]:9735")
if err != nil {
    return err
}

cfg := lnp2p.SimplePeerConfig{
    KeyGenerator: &lnp2p.EphemeralKeyGenerator{},
    Target:       *target,
    Features:     lnp2p.DefaultFeatures(),
    Timeouts:     lnp2p.DefaultTimeouts(),
}

peer, err := lnp2p.NewSimplePeer(cfg)
if err != nil {
    return err
}
defer peer.Close()

if err := peer.Connect(context.Background()); err != nil {
    return err
}

This'll connect out, do the init dance, etc, etc. There's also a mode that lets you control exactly when messages are read off the wire, which can be useful for types of hybrid integration tests where we want to run assertions betwen messages, or simulate invalid protocol behavior.


There's an iterator that can be used to recv messages, and also connection events:

    for msg := range peer.ReceiveMessages() {
        switch m := msg.(type) {
        case *lnwire.ChannelUpdate:
            processUpdate(m)
        case *lnwire.Error:
            log.Printf("Peer error: %s", m.Data)
        case *lnwire.Ping:
        }
for event := range peer.ConnectionEvents() {
    log.Printf("[%s] State: %s", event.Timestamp, event.State)

    if event.State == lnp2p.StateConnected {
        // Connection established.
    } else if event.State == lnp2p.StateDisconnected && event.Error != nil {
        // Handle disconnection.
    }
}

There's also an API that's actor based. You can use it to allow any sub-system to register that messages shoud be sent to it, without obtaining a direct pointer/reference. A MessageSink can also use a filter pedicate to filter out which messages get sent to it.

target, err := lnp2p.ParseNodeAddress("[email protected]:9735")
if err != nil {
    return err
}

cfg := lnp2p.ActorWithConnConfig{
    SimplePeerCfg: lnp2p.SimplePeerConfig{
        KeyGenerator: &lnp2p.EphemeralKeyGenerator{},
        Target:       *target,
        Features:     lnp2p.DefaultFeatures(),
        Timeouts:     lnp2p.DefaultTimeouts(),
    },
    ActorSystem: system,
    ServiceKey:  channelKey,
    ActorName:   "channel-peer",
    MessageSinks: []*lnp2p.MessageSink{
        {
            ServiceKey: channelKey,
            Filter:     channelFilter,
        },
    },
}

peerActor, actorRef, err := lnp2p.NewActorWithConn(cfg)
if err != nil {
    return err
}

startResult := peerActor.Start(ctx)
if resp, err := startResult.Unpack(); err != nil || !resp.Success {
    return err
}

Here's a glimpse re how the actor system can be used to re-work the way sub-systems discover, and iteract with each other:

system := actor.NewActorSystem()
defer system.Shutdown()

// Make a new service key to handle channel update messges. 
channelKey := actor.NewServiceKey[lnp2p.PeerMessage, lnp2p.PeerResponse](
    "channel-handler",
)

// We'll use a simple handler that just processes the updates directly, and register it with the main system. 
behavior := actor.NewFunctionBehavior(func(ctx context.Context,  msg lnp2p.PeerMessage) fn.Result[lnp2p.PeerResponse] {
    if m, ok := msg.(*lnp2p.MessageReceived); ok {
        if update, ok := m.Message.(*lnwire.ChannelUpdate); ok {
            // Process channel update.
            processChannelUpdate(update)
            return fn.Ok[lnp2p.PeerResponse](
                &lnp2p.MessageReceivedAck{Processed: true},
            )
        }
    }
    return fn.Ok[lnp2p.PeerResponse](nil)
})
actor.RegisterWithSystem(system, "channel-handler", channelKey, behavior)


// Make a MessageSink with a filter, then register it w/ the peer actor. 
channelFilter := func(msg lnwire.Message) bool {
    switch msg.(type) {
    case *lnwire.ChannelUpdate, *lnwire.ChannelAnnouncement:
        return true
    default:
        return false
    }
}
updateSink := &lnp2p.MessageSink{
    ServiceKey: channelKey,
    Filter: channelFilter, 
}
success := peerActor.AddMessageSink(gossipSink)

Roasbeef avatar Sep 18 '25 20:09 Roasbeef