reth
reth copied to clipboard
feat: use broadcast channel for event listeners
EventListeners
implements a multi producer multi consumer queue where each sent value is seen by all consumers.
To achieve this EventListeners
allocates a std::Vec
to be filled with tokio::sync::UnboundedSender every time EventListeners::new_listener is called.
As every value sent via EventListeners
is cloned to each UnboundedReceiver and the channels are unbounded this is prone to unlimited memory growth and eventual OOM attacks.
To prevent this, in this PR tokio's tokio::sync::broadcast multi producer multi consumer queue is used instead.
For now the size of all the broadcast channels is set to 1000, would be good to measure how much is needed for each. Pending adding metrics as suggested in this comment https://github.com/paradigmxyz/reth/pull/8193#issuecomment-2105016272 will be done in a follow up.
check out these types we have https://github.com/paradigmxyz/reth/blob/ef01d502387310462e2c312c39a3548d2f149cf3/crates/metrics/src/common/mpsc.rs#L36-L43 https://github.com/paradigmxyz/reth/blob/ef01d502387310462e2c312c39a3548d2f149cf3/crates/metrics/src/common/mpsc.rs#L75-L82
we use them so far only for the channel NetworkManager
->TransactionsManager
for incoming transaction gossip, but I'd like to see more of them in the codebase.
we have a panel for observing this channel https://reth.paradigm.xyz/d/d47d679c-c3b8-40b6-852d-cbfaa2dcdb37/reth---transaction-pool?orgId=1&refresh=30s&viewPanel=95
@emhane awesome thx! will check how to include something similar for the broadcast channels, the metrics can be very useful to assign the proper size to each
I'd also like a new function/stream variant that does not return results but rather skips the lag error,
makes total sense, done ptal
this is great! broadcast is def better for this