bus
bus copied to clipboard
Separate add_rx from Bus
This might be related to or a duplicate of #19 , but it's not clear based on the wording of the original issue.
The problem with add_rx
being tied to Bus
occurs in my use case where I want to:
- Send the
Bus
to a separate "dispatch" thread. - Dynamically add/remove receivers over time on the main thread.
Once the Bus
has been moved to the dispatch thread, it can't be used on the main thread to create more receivers, thus fixing the number that I can create. It is technically possible to send messages to create more and send them back via channels, but I have another idea I'd like to pursue to see if it's any better.
What I propose is creating a secondary interface (let's call it ReadHandle
for now), which could be created by a method on Bus
and would also implement add_rx
.
Internally, it would only involve taking clones of Bus.state
and the senders Bus.leaving.0
, Bus.waiting.0
when constructed, so it would be mostly zero-cost. The one major change would be making Bus.readers
atomic and moving it to BusInner
, but we can at least try it and see how it affects benchmarks.
I've been doing some research today; I don't mess around with lock-free code very often so it's an interesting learning experience!
I quickly found out that making readers
atomic is more complicated than replacing all accesses with load
and store
. In the current implementation, it's guaranteed that you can't simultaneously broadcast and add new readers. Making add_rx
concurrent with send
could create problems in areas where it might otherwise be assumed that the number of readers remains constant.
Hehe, yes, I ran into this a while back too, and had the same realization you had that making that change isn't as straightforward as I first thought. It might be that the solution here is to add a lock that is only taken to add readers, and on every call to broadcast
. In the common case, the lock will be uncontended (since it'll only be held by the broadcaster), so the cost shouldn't be too great.
That's one idea to try. Another would be to send "add" messages via a channel, similar to the way that it currently handles dropped readers. That way the broadcaster can be notified where each new reader starts, resolving the issue of figuring out whether the new reader was started on the tail of the queue before or after the broadcast takes place.
Okay, the lock implementation is in a working state, here are the benchmarks on my machine:
master
$ cargo run --bin bench --release
1 1000 0.26 μs/op
2 1000 0.54 μs/op
3 1000 0.74 μs/op
4 1000 1.59 μs/op
5 1000 1.15 μs/op
6 1000 1.44 μs/op
7 1000 1.34 μs/op
8 1000 1.20 μs/op
9 1000 0.92 μs/op
10 1000 0.98 μs/op
11 1000 1.24 μs/op
12 1000 1.99 μs/op
13 1000 2.11 μs/op
14 1000 2.46 μs/op
15 1000 2.51 μs/op
$ cargo bench --features bench
test bench_bus_one_to_one ... bench: 333 ns/iter (+/- 84)
test bench_crossbeam_bounded_one_to_one ... bench: 113 ns/iter (+/- 52)
test bench_crossbeam_one_to_one ... bench: 30 ns/iter (+/- 3)
test bench_syncch_one_to_one ... bench: 217 ns/iter (+/- 23)
read_handle/lock
$ cargo run --bin bench --release
1 1000 0.33 μs/op
2 1000 1.04 μs/op
3 1000 0.83 μs/op
4 1000 1.04 μs/op
5 1000 1.22 μs/op
6 1000 1.42 μs/op
7 1000 1.34 μs/op
8 1000 1.12 μs/op
9 1000 0.95 μs/op
10 1000 1.16 μs/op
11 1000 1.47 μs/op
12 1000 2.04 μs/op
13 1000 2.31 μs/op
14 1000 2.50 μs/op
15 1000 2.83 μs/op
$ cargo bench --features bench
test bench_bus_one_to_one ... bench: 374 ns/iter (+/- 65)
test bench_crossbeam_bounded_one_to_one ... bench: 120 ns/iter (+/- 43)
test bench_crossbeam_one_to_one ... bench: 31 ns/iter (+/- 11)
test bench_syncch_one_to_one ... bench: 214 ns/iter (+/- 30)
There seems to be a small but measurable increase in benchmark times, but there are a lot of other things to consider, such as choosing a different lock (this uses std::sync::Mutex
) and also comparing this to other synchronization strategies.
Also, I had to add #[cfg(feature = "bench")] extern crate test;
at the top of lib.rs
to get benchmarks to compile, not sure why.
A slight increase is to be expected I think. I wonder if with this lock, we can now also get rid of the additional thread that does dropping? That might bring back some savings.
You could also try parking_lot::Mutex
and see if that makes much of a difference?
With parking_lot::Mutex
there are improvements of around 10-30ns across the board.
I wonder if with this lock, we can now also get rid of the additional thread that does dropping? That might bring back some savings.
The only thread I found was for unparking, which should be unrelated. Do you mean removing the leaving
channel and making dropped readers update rleft
directly?
Ah, you're right, there isn't a drop thread, I was just getting things mixed up in my head. But yes, I wonder whether we can get rid of the leaving
channel now that we can take advantage of the lock?
I have encountered the same issue. Thank you @agausmann for your fork, I have used it and it works for me. Do you still plan to do a PR to this repo? It would be useful for others I think.
After also coming across the same issue, I settled upon using this fork which seems to work perfectly. I completely agree with @romainreignier, having this merged into the main crate would be very useful as it is a fairly vital feature for a lot of use-cases. I know I'm a bit late to the party but again any chance of this being merged @agausmann, perhaps as an optional feature?
@jonhoo @agausmann Running into a similar issue and was wondering if merging this in to the main crate would be possibel?
Yep, I think it does make sense to merge this! Per my last comment though, I'd still like to see it also get rid of the leaving
channel since that'll no longer be necessary with this. If someone is willing to open a PR and push this over the finish line, I'll happily review!
Been a while since I used this, but I may be able to finish that this week.
Hey, any updates?