fix(kad): enforce a timeout for inbound substreams
Description
In the kad substream handler, outbound substreams have a 10s timeout, but inbound substreams don't have any timeout.
This results in large numbers of warnings under specific heavy load conditions, which we have encountered in subspace:
2025-04-08T06:24:27.293722Z WARN Consensus: libp2p_kad::handler: New inbound substream to peer exceeds inbound substream limit. No older substream waiting to be reused. Dropping new substream. peer=PeerId("12D3KooWN6kFp2Ev181UGq3BUDfk1jfjaNu6sDTqxCZUBpmp8kRQ")
After this fix, if a substream times out, it is dropped from the set. The existing reusable substream behaviour is preserved: idle substreams are replaced when the substream limit is reached.
Fixes #5981
Cleanups
The substreams are pinned during iteration, so we can't remove them from the set. But we can replace them with a new substream. Since reusable streams are replaced directly, the Canceled state is no longer needed.
This PR also includes an upgrade to futures-bounded 0.3.0, in a separate commit.
Notes & open questions
Should the substream be closed on a timeout? The existing code doesn't close them on (most) substream errors, so this PR handles timeouts the same way, by dropping the substream without closing it.
I have been unable to replicate the specific conditions leading to this bug in a test or my local subspace node, but we've confirmed the fix works on multiple nodes in the subspace network.
Change checklist
- [x] I have performed a self-review of my own code
- [x] I have made corresponding changes to the documentation
- [ ] I have added tests that prove my fix is effective or that my feature works
- [x] A changelog entry has been made in the appropriate crates
I'm getting an incorrect CI error:
Patch version has been bumped even though minor isn't released yet. https://github.com/libp2p/rust-libp2p/actions/runs/14656299889/job/41308047380?pr=6009#step:13:54
We need a new patch version, because https://crates.io/crates/libp2p-kad is at 0.47.0.
It looks like there might be a missing libp2p-kad-v0.47.0 tag in this repository, which is producing the incorrect CI result.
I've updated Cargo.lock with the new patch version.
Thank you for the PR @teor2345!
For outbound substreams, the timeout is implemented by using the futures_bounded::FuturesTupleSet (or just futures_bounded::FuturesSet) that limits the number of futures in the set and implements a timeout for each individual future.
Can we not just use that for inbound substreams as well?
That would also match the implementation in other protocols, as you already stated in #5981:
Here is how other protocols implement matching inbound and outbound timeouts: https://github.com/libp2p/rust-libp2p/blob/1206fef09885d024323479d0383c37c4fe281c7c/protocols/relay/src/behaviour/handler.rs#L384
For outbound substreams, the timeout is implemented by using the
futures_bounded::FuturesTupleSet(or justfutures_bounded::FuturesSet) that limits the number of futures in the set and implements a timeout for each individual future. Can we not just use that for inbound substreams as well?
Unfortunately not (or not without a larger refactor). FuturesTupleSet holds Futures, but inbound substreams are implemented as Streams in kad:
https://github.com/libp2p/rust-libp2p/blob/537755826bad541b5a21acfc0def67bb9f99ff48/protocols/kad/src/handler.rs#L906-L907
It would be possible to hold the streams in a FuturesTupleSet by calling StreamExt::into_future() on them.
But we can't iterate through a FuturesTupleSet to find reusable inbound substreams. Which makes this inbound substream reuse code difficult to implement:
https://github.com/libp2p/rust-libp2p/blob/537755826bad541b5a21acfc0def67bb9f99ff48/protocols/kad/src/handler.rs#L557-L582
So the only solution I could find is to add a timeout field to some of the inbound substream states. (Cancelled and Poisoned never need timeouts, and WaitingMessage only needs a timeout for the first request. So we save a bit on timers there.)
The underlying issue is that the code couples the state of the substream with the stream of items from it. A refactor could put KadInStreamSink::into_future() into a FuturesTupleSet, and store the rest of the state as the associated data. We'd also need a separate list of substreams which can/can't be reused for the substream reuse check. And a way of dropping a substream from the FuturesTupleSet when it gets reused, probably via a oneshot.
Is this a change that would be acceptable in a bug fix? Particularly one that other users might want backported?
Unfortunately not (or not without a larger refactor).
FuturesTupleSetholdsFutures, but inbound substreams are implemented asStreams inkad:
Could we not use StreamSet from futures-bounded?
EDIT: Or maybe poll the stream in the future for the set?
Unfortunately not (or not without a larger refactor).
FuturesTupleSetholdsFutures, but inbound substreams are implemented asStreams inkad:Could we not use
StreamSetfromfutures-bounded?EDIT: Or maybe poll the stream in the future for the set?
Sure, but that doesn't deal with substream reuse, because the futures_bounded types do not allow iteration to find reusable substreams (they're a wrapper for SelectAll).
There are two pieces of functionality that this code needs:
- work out when a substream becomes available for reuse
- when a new substream is created, and the substream limit has been reached, replace a reusable substream with the new substream (or drop the new substream if no substreams are reusable)
Here's one possible way to implement that:
- use a
futures_bounded::StreamSetto poll the next item from each substream - on a timeout in the
WaitingMessagestate, mark the substream as available for reuse using a oneshot (in any other state, the substream can't be used further, so just end the stream and it will get dropped) - when a new substream is created, find a substream that's marked for reuse, and replace the original substream with the new substream
This will involve some quite weird types, like Oneshot<Oneshot<InboundSubstreamState>>. The outer Oneshot is for substream reuse availability, and the inner is for sending back the replacement substream. But it should work, and the changes might be less complicated than the existing PR.
Is this a change that would be acceptable in a bug fix? Particularly one that other users might want backported?
I also can't guarantee I'll have time to work on this any time soon, because the current PR code works, and fixes our downstream bug.
Thank you for the follow-ups and detailed explanation @teor2345.
Sure, but that doesn't deal with substream reuse, because the
futures_boundedtypes do not allow iteration to find reusable substreams (they're a wrapper forSelectAll).
Opened thomaseizinger/rust-futures-bounded#8 to see if we can implement iterator for StreamSet. If that PR won't be merged then I'd go with your current solution.
friendly ping @teor2345, I saw https://github.com/thomaseizinger/rust-futures-bounded/pull/10#issuecomment-2871447946 but just to confirm, do you plan on continuing this? Cheers!
friendly ping @teor2345, I saw thomaseizinger/rust-futures-bounded#10 (comment) but just to confirm, do you plan on continuing this? Cheers!
I think that https://github.com/thomaseizinger/rust-futures-bounded/pull/10 will be a better alternative once it's released. But we're also waiting on https://github.com/thomaseizinger/rust-futures-bounded/pull/12 to merge.
This pull request has merge conflicts. Could you please resolve them @teor2345? 🙏