libzmq-rs
libzmq-rs copied to clipboard
Tokio integration
Blocked until async await stabilization.
The general concept should be pretty simple I think. We can base our design on https://rust-lang.github.io/async-book/02_execution/04_executor.html.
We create an executor thread that will hold a Poller
instance that will poll the sockets and call the waker
when the desired event occurs.
The executor would use a channel that receive taks containing a RawFd
and the desired state (Read / Write). This task would be added to the Poller
. Upon completion the waker would be called, then the RawFd
would be removed from the poller.
However, for the channel to be polled in Poller::poll
(so that we wait forever until any of the task complete or we receive a new task), it will need its own RawFd
. Something like http://man7.org/linux/man-pages/man2/eventfd.2.html would do the trick. I think mio
had something similar, so we should look into it.
We could use a tweaked version of https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-channel.
We could implement a channel like crossbeam's version, but also use a Server
socket that recv and send empty messages.
We could event write an abtraction like a Pipe<Server>
so that it can work over abitrary socket types. Basically we would only use the underlying zmq socket for I/O, but we would not use it to send messages. This way we should have performance comparable to crossbeam's implementation.
This should be pretty fully unblocked.
Any idea of where to start in the current landscape? So far I've been trying to write my own impl Future
for a ScatterSend
struct that wraps a Scatter
, msg
, and Poller
and then spawns a thread that blocks on Poller::poll
and tries to wake on WRITABLE
. This is my first interaction with lower level async/await stuff though, so I have no idea if I'm approaching it from the right direction or not. Probably not, since the types aren't easily working out :D.
I'm not using libzmq anymore, which is why this hasn't been implemented yet, but from what I recall its quite tricky to do well.
Ideally you would want some sort of reactor that polls all the sockets via the Poller
interface and then wakes up the appropriate socket. I took a quick look into the async-zmq
crate and it seems like a good start, although they implement the old socket types.
If you are looking for channels within the same address space, I would recommand the futures-intrusive crate since they are way easier to use & way faster and support proper async
.
Finally I would suggest against using zmq
in general unless your porting legacy code (read more here). However I don't have any alternative to offer you since I'm using my own async networking lib that is currently closed sourced, I'm afraid.