reth icon indicating copy to clipboard operation
reth copied to clipboard

caps mux

Open emhane opened this issue 1 year ago • 2 comments

tracking https://github.com/paradigmxyz/reth/issues/791

emhane avatar Jun 04 '23 21:06 emhane

If I understood this correctly, then this refactors how the core eth-wire types EthStream and P2PStream work.

most importantly the P2PStream will do the multiplexing over channels designated to the capabilities.

The Ethstream is then wired to P2PStream via channels.

This would allow us to register multiple capability streams in P2PStream.

This comes with some drawbacks, the P2Pstream now operates on its own, for example needs to spawned on a new task. This results in additional overhead, especially if you don't need multiplexing and only want eth.

not necessarily in its own thread, I thought of it like this: ethstream is responsible for en-/decoding eth messages. p2p stream handles bytes. p2pstream is connected in 3 directions: active session, tcp stream, ethstream. active session would poll p2pstream, which would in term poll_flush the tcp stream and the ethstream. active session would also have a bi-directional channel to the en-/decoding ethstream. active session would also poll the receiver req_handler_rx for new messages from the wire, which will be sent there by polling the driving p2pstream. this allows for multiple threads but is not necessary, but so does a Arc<RwLock<P2PStream>> on the stream and containing it in each capability stream as EthStream<Arc<RwLock<P2PStream>>> and is possibly more intuitive to poll the capability streams instead of only receivers from them + the p2pstream p2p_conn. that way let polling the capability streams in active session poll the underlying shared p2pstream which then polls the tcp stream. however a nice detail is still that polling the p2pstream sends any messages received over the wire on a channel to each capability instead of storing them in the p2pstream, as messages for any capability would be received over the wire. at the end of the day, a channel is just a queue.

the lock on p2pstream is a nice way to keep the EthStream<RwLock<S>> as you describe below and be able to have also LesStream<RwLock<S>>, etc., collecting these capstreams in a map in active session, polling all when polling active session. otherwise if that generic is juts a channel to the p2pstream and not a reference to it, we have the problem of "who polls the p2pstream?", hence why I was polling p2pstream as p2p_conn in active session now. mostly i just did exactly what the issue states: contain the capabilities in the p2pstream.

I want to explore a hybrid design where we have multiplexing support but can still use the ethstream like we're doing now.

previously the EthStream<St> was generic over a stream+Sink type this should be flexible enough to either be a bundled Sender+Receiver pair to a multiplexed connection or the P2Pstream itself (like it is on main).

I think we can also make a distinction between:

  • P2PStream: a Stream+Sink type that writes messages to the wire and emits messages via Stream::next
  • A new MuxConnection type (Future) that wraps the P2Pstream and does the multiplexing (basically what the modified P2PStream currently does)

wdyt @Rjected

emhane avatar Jun 05 '23 17:06 emhane

not necessarily in its own thread, I thought of it like this: ethstream is responsible for en-/decoding eth messages. p2p stream handles bytes.

This sounds right, this is also what we're currently doing p2pstream --stream bytes --> ethstream --streams decoded EthMessage-->

but the p2p currently is pinned to a specific capability (eth), which prevents cap mux atm which this PR tries to address.

p2pstream is connected in 3 directions: active session, tcp stream, ethstream. active session would poll p2pstream, which would in term poll_flush the tcp stream and the ethstream.

okay, making the p2p emit raw bytes only makes sense, what we do with that afterwards now depends on how we envision cap multiplexing to be used. If we want to handle them on the same task then this would require a stream type that decodes the raw bytes and emits cap variants, (basically how EthStream behaves but can also decode additional caps) if we want them on different task then we need a way to send the raw bytes based on cap message id, that's what the MuxConnection type would be for.

ActiveSession should only use the cap handler that it requires, for now it only needs Eth.

And I don't want to introduce locking here if we can compose capability handlers either by:

  • wrapping the P2PStream into a Stream type that emits decoded variants (enum CapMessage {Eth(...),...}), if all handlers are on the same task
  • add a mux type that sends messages over a channel if we need handlers that live on different tasks.

I hope this kinda makes sense

mattsse avatar Jun 08 '23 13:06 mattsse

Codecov Report

Merging #2981 (05f334a) into main (1e7d028) will decrease coverage by 0.09%. Report is 15 commits behind head on main. The diff coverage is 50.00%.

:exclamation: Current head 05f334a differs from pull request most recent head 77eeaa1. Consider uploading reports for the commit 77eeaa1 to get more accurate results

Impacted file tree graph

Files Coverage Δ
crates/net/eth-wire/src/capability.rs 76.76% <100.00%> (+0.50%) :arrow_up:
crates/net/eth-wire/src/errors/p2p.rs 70.00% <ø> (ø)
crates/net/eth-wire/src/ethstream.rs 83.97% <ø> (ø)
crates/net/eth-wire/src/types/message.rs 55.17% <ø> (ø)
crates/net/network/src/error.rs 76.04% <ø> (ø)
crates/net/network/src/session/mod.rs 69.88% <100.00%> (+0.05%) :arrow_up:
crates/net/eth-wire/src/p2pstream.rs 78.28% <87.87%> (-0.02%) :arrow_down:
crates/net/eth-wire/src/sharedstream.rs 42.98% <42.98%> (ø)

... and 12 files with indirect coverage changes

Flag Coverage Δ
integration-tests 15.44% <11.19%> (-0.04%) :arrow_down:
unit-tests 62.57% <50.00%> (-0.07%) :arrow_down:

Flags with carried forward coverage won't be shown. Click here to find out more.

Components Coverage Δ
reth binary 32.95% <ø> (ø)
blockchain tree 80.64% <ø> (+0.09%) :arrow_up:
pipeline 88.45% <ø> (ø)
storage (db) 73.30% <ø> (+<0.01%) :arrow_up:
trie 94.52% <ø> (+0.03%) :arrow_up:
txpool 48.90% <ø> (-0.49%) :arrow_down:
networking 75.66% <50.00%> (-0.47%) :arrow_down:
rpc 57.70% <ø> (-0.01%) :arrow_down:
consensus 61.05% <ø> (+0.19%) :arrow_up:
revm 28.47% <ø> (ø)
payload builder 8.14% <ø> (ø)
primitives 85.39% <ø> (+0.01%) :arrow_up:

codecov[bot] avatar Oct 03 '23 13:10 codecov[bot]

@mattsse can this be closed given #5363?

gakonst avatar Nov 09 '23 22:11 gakonst

sorry this took so long 🙈

I think I kinda understand how this works.

One concern is that, if I understood this correctly, is that this limits us to pre-defined caps in the capabilities enum?

predefined caps messages, like EthMessage type is used for al combos of eth66, 67 and 68 caps atm.

I wonder if we can simplify this a bit so that we have a single stream, something like modified CapStream

struct CapStream {
  inner: S,
  mux_rx,
  mux_tx,
  cap_streams: Vec<(SharedCap, Sender)>,
  shared_caps
}

which is then a Stream and Sink of (Cap, Bytes) which can only carry the raw messages

however, this assumes that sub-protocol streams will be spawned right? so maybe we can find a way to stack them together and keep the stream specific types

struct StackedCaps<S,L,R> {
   inner: S,
   from_streams_rx: UnboundedRec<MaskedBytes>,
   left: L,
   right: R,
   shared_caps
}

where L,R are a new trait that is both Stream+Sink and knows of the message id is a match. This way we could even layer this and stack multiple handlers in R the benefit would be that this could be A Stream of Either(L::Out, R::Out)

sure. my goal was to change as little of the session code as possible, but we can have ActiveSession listen for CapsMessage instead with EthMessage as a variant, I believe @Rjected suggested this. all caps message types that are then later added must pass through ActiveSession, are we ok with that?

or maybe we abandon the Stream+Sink completely and just do message conversions.

keeping stream and sink is good for not changing much of the session code.

just some ideas at this point...

emhane avatar Nov 14 '23 13:11 emhane

closed in favour of simplification https://github.com/paradigmxyz/reth/pull/5577

emhane avatar Nov 26 '23 12:11 emhane