libzmq-rs icon indicating copy to clipboard operation
libzmq-rs copied to clipboard

Tokio integration

Open jean-airoldie opened this issue 5 years ago • 7 comments

Blocked until async await stabilization.

jean-airoldie avatar Jun 25 '19 20:06 jean-airoldie

The general concept should be pretty simple I think. We can base our design on https://rust-lang.github.io/async-book/02_execution/04_executor.html.

We create an executor thread that will hold a Poller instance that will poll the sockets and call the waker when the desired event occurs.

jean-airoldie avatar Jul 21 '19 10:07 jean-airoldie

The executor would use a channel that receive taks containing a RawFd and the desired state (Read / Write). This task would be added to the Poller. Upon completion the waker would be called, then the RawFd would be removed from the poller.

However, for the channel to be polled in Poller::poll (so that we wait forever until any of the task complete or we receive a new task), it will need its own RawFd. Something like http://man7.org/linux/man-pages/man2/eventfd.2.html would do the trick. I think mio had something similar, so we should look into it.

jean-airoldie avatar Jul 21 '19 11:07 jean-airoldie

We could use a tweaked version of https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-channel.

jean-airoldie avatar Jul 21 '19 12:07 jean-airoldie

We could implement a channel like crossbeam's version, but also use a Server socket that recv and send empty messages.

jean-airoldie avatar Jul 21 '19 23:07 jean-airoldie

We could event write an abtraction like a Pipe<Server> so that it can work over abitrary socket types. Basically we would only use the underlying zmq socket for I/O, but we would not use it to send messages. This way we should have performance comparable to crossbeam's implementation.

jean-airoldie avatar Jul 21 '19 23:07 jean-airoldie

This should be pretty fully unblocked.

Any idea of where to start in the current landscape? So far I've been trying to write my own impl Future for a ScatterSend struct that wraps a Scatter, msg, and Poller and then spawns a thread that blocks on Poller::poll and tries to wake on WRITABLE. This is my first interaction with lower level async/await stuff though, so I have no idea if I'm approaching it from the right direction or not. Probably not, since the types aren't easily working out :D.

skeet70 avatar Apr 06 '20 21:04 skeet70

I'm not using libzmq anymore, which is why this hasn't been implemented yet, but from what I recall its quite tricky to do well.

Ideally you would want some sort of reactor that polls all the sockets via the Poller interface and then wakes up the appropriate socket. I took a quick look into the async-zmq crate and it seems like a good start, although they implement the old socket types.

If you are looking for channels within the same address space, I would recommand the futures-intrusive crate since they are way easier to use & way faster and support proper async.

Finally I would suggest against using zmq in general unless your porting legacy code (read more here). However I don't have any alternative to offer you since I'm using my own async networking lib that is currently closed sourced, I'm afraid.

jean-airoldie avatar Apr 06 '20 22:04 jean-airoldie