reth
reth copied to clipboard
caps mux
tracking https://github.com/paradigmxyz/reth/issues/791
If I understood this correctly, then this refactors how the core eth-wire types
EthStream
and P2PStream work.most importantly the
P2PStream
will do the multiplexing over channels designated to the capabilities.The
Ethstream
is then wired to P2PStream via channels.This would allow us to register multiple capability streams in P2PStream.
This comes with some drawbacks, the P2Pstream now operates on its own, for example needs to spawned on a new task. This results in additional overhead, especially if you don't need multiplexing and only want eth.
not necessarily in its own thread, I thought of it like this: ethstream is responsible for en-/decoding eth messages. p2p stream handles bytes. p2pstream is connected in 3 directions: active session, tcp stream, ethstream. active session would poll p2pstream, which would in term poll_flush
the tcp stream and the ethstream. active session would also have a bi-directional channel to the en-/decoding ethstream. active session would also poll the receiver req_handler_rx
for new messages from the wire, which will be sent there by polling the driving p2pstream. this allows for multiple threads but is not necessary, but so does a Arc<RwLock<P2PStream>>
on the stream and containing it in each capability stream as EthStream<Arc<RwLock<P2PStream>>>
and is possibly more intuitive to poll the capability streams instead of only receivers from them + the p2pstream p2p_conn
. that way let polling the capability streams in active session poll the underlying shared p2pstream which then polls the tcp stream. however a nice detail is still that polling the p2pstream sends any messages received over the wire on a channel to each capability instead of storing them in the p2pstream, as messages for any capability would be received over the wire. at the end of the day, a channel is just a queue.
the lock on p2pstream is a nice way to keep the EthStream<RwLock<S>>
as you describe below and be able to have also LesStream<RwLock<S>>
, etc., collecting these capstreams in a map in active session, polling all when polling active session. otherwise if that generic is juts a channel to the p2pstream and not a reference to it, we have the problem of "who polls the p2pstream?", hence why I was polling p2pstream as p2p_conn
in active session now. mostly i just did exactly what the issue states: contain the capabilities in the p2pstream.
I want to explore a hybrid design where we have multiplexing support but can still use the ethstream like we're doing now.
previously the
EthStream<St>
was generic over a stream+Sink type this should be flexible enough to either be a bundled Sender+Receiver pair to a multiplexed connection or the P2Pstream itself (like it is on main).I think we can also make a distinction between:
P2PStream
: a Stream+Sink type that writes messages to the wire and emits messages viaStream::next
- A new
MuxConnection
type (Future) that wraps the P2Pstream and does the multiplexing (basically what the modified P2PStream currently does)wdyt @Rjected
not necessarily in its own thread, I thought of it like this: ethstream is responsible for en-/decoding eth messages. p2p stream handles bytes.
This sounds right, this is also what we're currently doing p2pstream --stream bytes --> ethstream --streams decoded EthMessage-->
but the p2p currently is pinned to a specific capability (eth), which prevents cap mux atm which this PR tries to address.
p2pstream is connected in 3 directions: active session, tcp stream, ethstream. active session would poll p2pstream, which would in term poll_flush the tcp stream and the ethstream.
okay, making the p2p emit raw bytes only makes sense, what we do with that afterwards now depends on how we envision cap multiplexing to be used. If we want to handle them on the same task then this would require a stream type that decodes the raw bytes and emits cap variants, (basically how EthStream
behaves but can also decode additional caps)
if we want them on different task then we need a way to send the raw bytes based on cap message id, that's what the MuxConnection
type would be for.
ActiveSession
should only use the cap handler that it requires, for now it only needs Eth
.
And I don't want to introduce locking here if we can compose capability handlers either by:
- wrapping the
P2PStream
into a Stream type that emits decoded variants (enum CapMessage {Eth(...),...}
), if all handlers are on the same task - add a mux type that sends messages over a channel if we need handlers that live on different tasks.
I hope this kinda makes sense
Codecov Report
Merging #2981 (05f334a) into main (1e7d028) will decrease coverage by
0.09%
. Report is 15 commits behind head on main. The diff coverage is50.00%
.
:exclamation: Current head 05f334a differs from pull request most recent head 77eeaa1. Consider uploading reports for the commit 77eeaa1 to get more accurate results
Files | Coverage Δ | |
---|---|---|
crates/net/eth-wire/src/capability.rs | 76.76% <100.00%> (+0.50%) |
:arrow_up: |
crates/net/eth-wire/src/errors/p2p.rs | 70.00% <ø> (ø) |
|
crates/net/eth-wire/src/ethstream.rs | 83.97% <ø> (ø) |
|
crates/net/eth-wire/src/types/message.rs | 55.17% <ø> (ø) |
|
crates/net/network/src/error.rs | 76.04% <ø> (ø) |
|
crates/net/network/src/session/mod.rs | 69.88% <100.00%> (+0.05%) |
:arrow_up: |
crates/net/eth-wire/src/p2pstream.rs | 78.28% <87.87%> (-0.02%) |
:arrow_down: |
crates/net/eth-wire/src/sharedstream.rs | 42.98% <42.98%> (ø) |
... and 12 files with indirect coverage changes
Flag | Coverage Δ | |
---|---|---|
integration-tests | 15.44% <11.19%> (-0.04%) |
:arrow_down: |
unit-tests | 62.57% <50.00%> (-0.07%) |
:arrow_down: |
Flags with carried forward coverage won't be shown. Click here to find out more.
Components | Coverage Δ | |
---|---|---|
reth binary | 32.95% <ø> (ø) |
|
blockchain tree | 80.64% <ø> (+0.09%) |
:arrow_up: |
pipeline | 88.45% <ø> (ø) |
|
storage (db) | 73.30% <ø> (+<0.01%) |
:arrow_up: |
trie | 94.52% <ø> (+0.03%) |
:arrow_up: |
txpool | 48.90% <ø> (-0.49%) |
:arrow_down: |
networking | 75.66% <50.00%> (-0.47%) |
:arrow_down: |
rpc | 57.70% <ø> (-0.01%) |
:arrow_down: |
consensus | 61.05% <ø> (+0.19%) |
:arrow_up: |
revm | 28.47% <ø> (ø) |
|
payload builder | 8.14% <ø> (ø) |
|
primitives | 85.39% <ø> (+0.01%) |
:arrow_up: |
@mattsse can this be closed given #5363?
sorry this took so long 🙈
I think I kinda understand how this works.
One concern is that, if I understood this correctly, is that this limits us to pre-defined caps in the capabilities enum?
predefined caps messages, like EthMessage
type is used for al combos of eth66, 67 and 68 caps atm.
I wonder if we can simplify this a bit so that we have a single stream, something like modified CapStream
struct CapStream { inner: S, mux_rx, mux_tx, cap_streams: Vec<(SharedCap, Sender)>, shared_caps }
which is then a Stream and Sink of (Cap, Bytes) which can only carry the raw messages
however, this assumes that sub-protocol streams will be spawned right? so maybe we can find a way to stack them together and keep the stream specific types
struct StackedCaps<S,L,R> { inner: S, from_streams_rx: UnboundedRec<MaskedBytes>, left: L, right: R, shared_caps }
where L,R are a new trait that is both Stream+Sink and knows of the message id is a match. This way we could even layer this and stack multiple handlers in R the benefit would be that this could be A Stream of
Either(L::Out, R::Out)
sure. my goal was to change as little of the session code as possible, but we can have ActiveSession
listen for CapsMessage
instead with EthMessage
as a variant, I believe @Rjected suggested this. all caps message types that are then later added must pass through ActiveSession
, are we ok with that?
or maybe we abandon the Stream+Sink completely and just do message conversions.
keeping stream and sink is good for not changing much of the session code.
just some ideas at this point...
closed in favour of simplification https://github.com/paradigmxyz/reth/pull/5577