crossbeam
crossbeam copied to clipboard
Lossy Channels
Is there any current work for adding lossy bounded channels? Or any strong reason to not add them? If not, I might open a PR in the next few weeks with an implementation.
What do you mean by lossy
? If „drop stuff when it's full“, shouldn't try_send serve your needs?
Apologies for not being clear. By "lossy channel", I mean one that discards old data when the buffer is full. This is useful for things like robotics where new sensor information is more important than old information and likely makes the old information completely worthless.
The try_send
method sort of works but it either adds an unnecessary lock or a racey loop.
I see, try_send
is more suitable to drop the new data than the old one.
I'm not sure if you really need a full channel for such thing ‒ do you need a „buffer“ for several updates, then? If I was doing something like sensor data, I'd probably have some kind of atomic storage for single (newest) snapshot of each sensor, then a mechanism to wake up the thread that consumes the data. The wake-up could be a single-element bounded channel with element = ()
. That one could use try_send
, since it doesn't matter if I throw out the old or new ()
.
I'm not sure if you really need a full channel for such thing ‒ do you need a „buffer“ for several updates, then?
You may want the last N
updates. You might also want all the updates you can get but the sensor is vastly outpacing the processing and so you have to put a hard cap on it. Once you start doing that, you either have to start locking your buffer or make it atomic, which is essentially the lossy channel.
A sizeable chunk of the robotics community, and probably most of robotics in acedamia, heavily utilize lossy channels like this (see ROS). The select
macro has been a nice change of pace from callbacks, which don't work very well in Rust and so I would like to see something like lossy channels usable with the macro.
@neachdainn In case the channel is full and you want to "overwrite" the oldest element in it, what if you called .recv()
to drop the oldest element and then follow up with .send()
to send the new element?
I wonder if that would work for you?
That is what I'm doing now but it requires a racey loop.
I can see @neachdainn 's point here: a common use case of channels usually require separation of the sender and the receiver, where the sender will be owned by the data producer, while the receiver owned by the consumer. And in this pattern, the producer usually don't have access to the receiver to pop one or more messages when the channel is full (except making a receiver clone and use it solely for full-inbox-poping, seems a waste), and it is also hard for the consumer to decide if it shall discard some "old" messages to clean up space for "newer" messages, as it can't foresee if more messages are coming.
I think a naive implementation of the lossy
sender would just be a wrapper to pop a message from the full channel before sending a new one, but that could save some headache for the channel users.
Just to be clear: I am not asking for someone to implement this for me. I am willing to spend time implementing this, I just want to know if a PR for something like this would be (potentially) accepted or if someone is already working on a similar channel flavor.
I don't think adding a whole new channel flavor is worth it. At best, we might introduce a new method, perhaps named force_send
. Unlike try_send
, it would overwrite the last element in the channel instead of failing when the channel is full.
But even so, I find this a bit of a niche use case and the problem is not difficult to get around manually. I'm wary of adding small helper methods like these -- the channel interface is already complex enough, and there's always a bunch of new methods we could add.
I don't think adding a whole new channel flavor is worth it. At best, we might introduce a new method, perhaps named force_send. Unlike try_send, it would overwrite the last element in the channel instead of failing when the channel is full.
That would definitely cover my use-case (assuming "last element" means the oldest element) and a new flavor is probably overkill.
But even so, I find this a bit of a niche use case and the problem is not difficult to get around manually.
My example might be niche but I think the concept of a channel that drops the oldest item is not uncommon. And while it isn't difficult to get around manually, it is very inelegant. At the very least, the problem of figuring out when to break out of the try_send
/recv
loop is not trivial. Currently, my code either tries to send twice before giving up or I have had to add a timestamp into the structure, both of which are way less elegant than having a force_send
style method.
I'm wary of adding small helper methods like these -- the channel interface is already complex enough, and there's always a bunch of new methods we could add.
That is fair. I really like how Crossbeam and select
have been able to clean up my code and would like to keep using it, but I can definitely refactor and write my own queue if I need to.
FWIW, I added an asynchronous variant of such a channel type to https://github.com/Matthias247/futures-intrusive, which I called StateBroadcastChannel.
The motivation is more or less what @neachdainn was asking for. A certain component generating state updates which must be distributed to potentially more than 1 consumer. I had use-cases for that before, when working on embedded (soft)realtime system.
In my case I did not add any additional buffering to the channel. I guess if that would be required it would be more of a buffer per consumer instead of on inside the channel.
Perhaps a Mutex<VecDeque>
could be more appropriate to implement the shared data buffer.
You could try to use a technique similar to graphics programming where you would swap buffers, see https://computergraphics.stackexchange.com/questions/4550/how-double-buffers-works-in-opengl
For example, let's say the consumer always wants a batch of N
updates. Then you could have the producer keep push_back
updates into a shared-queue, and when the consumer is ready to take the next batch, it could:
- let
shared_queue
be aArc<Mutex<VecDeque<Update>>>
, shared between consumer and producer. - let
consumer_queue
be aVecDeque<Update>
, found in the consumer thread, initially empty. - let
consumer_queue
be the result ofmem::replace(*shared_queue.lock(), consumer_queue);
. - Resize
consumer_queue
to your needs ofN
updates, for example doingrotate_left(N)
followed bytruncate(N)
. - Process
consumer_queue
. - When
consumer_queue
is empty, go back to 3.
Alternatively, you could maybe do something from the producer thread that would combine buffering updates locally for the "next batch", while doing a send_timeout
of the current batch to the consumer. Then when the timeout hits, that means the consumer has missed a batch, so you discard it and take a new one from the buffer that has accumulated updates in the meantime. That might require a third thread, depending on what the source of updates is.
I really like how Crossbeam and select have been able to clean up my code and would like to keep using it, but I can definitely refactor and write my own queue if I need to.
You could still use a channel and select
to signal various control operation between consumer and producer, and using the channel as the underlying buffer might be less good of a fit.
What I have now works well enough for me - I opened the issue originally because I thought it would be a feature that people (including myself) would appreciate having. I've since started digging through the code and I've come to two conclusions:
- Out of the three flavors, this really only makes sense for one of them and as a result I am less convinced this functionality belongs here.
- Implementing this myself will probably take more time than I will be able to muster up for a long time.
Based on those conclusions, I would be fine if this issue is closed.
We are still interested in this feature.
This should be able to be implemented just by porting https://github.com/crossbeam-rs/crossbeam/pull/789 to channel.