gtk-rs-core icon indicating copy to clipboard operation
gtk-rs-core copied to clipboard

Add convenience futures API

Open sdroege opened this issue 4 years ago • 24 comments

Compared to async-std/tokio it's rather verbose and inconvenient to use. Something like the following, inspired by the beforementioned APIs and how gio works. We would then step by step extend this for other types and with more functions.

pub struct SocketClient(gio::SocketClient);

impl SocketClient {
    pub fn new() -> Self {
        SocketClient(gio::SocketClient::new())
    }

    pub async fn connect<P: IsA<gio::SocketConnectable> + Clone + 'static>(
        &self,
        connectable: &P,
    ) -> Result<SocketConnection, glib::Error> {
        let connection = self.0.connect_async_future(connectable).await?;

        // Get the input/output streams and convert them to the AsyncRead and AsyncWrite adapters
        let ostream = connection
            .get_output_stream()
            .unwrap()
            .dynamic_cast::<gio::PollableOutputStream>()
            .unwrap();
        let write = ostream.into_async_write().unwrap();

        let istream = connection
            .get_input_stream()
            .unwrap()
            .dynamic_cast::<gio::PollableInputStream>()
            .unwrap();
        let read = istream.into_async_read().unwrap();

        Ok(SocketConnection {
            connection,
            read,
            write,
        })
    }
}

pub struct SocketConnection {
    connection: gio::SocketConnection,
    read: gio::InputStreamAsyncRead<gio::PollableInputStream>,
    write: gio::OutputStreamAsyncWrite<gio::PollableOutputStream>,
}

// Proxy to the internal AsyncRead
impl AsyncRead for SocketConnection {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<Result<usize, io::Error>> {
        Pin::new(&mut Pin::get_mut(self).read).poll_read(cx, buf)
    }
}

// Proxy to the internal AsyncWrite
impl AsyncWrite for SocketConnection {
    fn poll_write(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<Result<usize, io::Error>> {
        Pin::new(&mut Pin::get_mut(self).write).poll_write(cx, buf)
    }

    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
        Pin::new(&mut Pin::get_mut(self).write).poll_close(cx)
    }

    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
        Pin::new(&mut Pin::get_mut(self).write).poll_flush(cx)
    }
}

pub struct SocketListener(gio::SocketListener);

impl SocketListener {
    pub fn new() -> Self {
        SocketListener(gio::SocketListener::new())
    }

    pub fn add_inet_port(&self, port: u16) -> Result<(), glib::Error> {
        self.0.add_inet_port(port, None::<&glib::Object>)
    }

    pub async fn accept(&self) -> Result<SocketConnection, glib::Error> {
        let connection = self.0.accept_async_future().await?.0;

        // Get the input/output streams and convert them to the AsyncRead and AsyncWrite adapters
        let ostream = connection
            .get_output_stream()
            .unwrap()
            .dynamic_cast::<gio::PollableOutputStream>()
            .unwrap();
        let write = ostream.into_async_write().unwrap();

        let istream = connection
            .get_input_stream()
            .unwrap()
            .dynamic_cast::<gio::PollableInputStream>()
            .unwrap();
        let read = istream.into_async_read().unwrap();

        Ok(SocketConnection {
            connection,
            read,
            write,
        })
    }

    pub fn incoming(&self) -> Incoming {
        Incoming(self)
    }
}

pub struct Incoming<'a>(&'a SocketListener);

impl<'a> Stream for Incoming<'a> {
    type Item = Result<SocketConnection, glib::Error>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let future = self.0.accept();
        pin_utils::pin_mut!(future);

        let socket = futures::ready!(future.poll(cx))?;
        Poll::Ready(Some(Ok(socket)))
    }
}

Opinions? CC @russel @GuillaumeGomez @EPashkin

sdroege avatar Apr 29 '20 12:04 sdroege

I'll make use of these in my application to see how they go. Let me now if they evolve, so I can update.

russel avatar Apr 29 '20 12:04 russel

@russel You'll get all updates if you subscribe to this issue :) Also the above code was not fully tested, just to give an idea of what I was thinking of.

This would then probably end up in a gio::futures module with new types (we need new types because some more state has to be stored, for example) and From/TryFrom impls from/to the normal gio types, AsRef to the gio types I guess, plus various API passthrough.

sdroege avatar Apr 29 '20 13:04 sdroege

I have been trying to avoid unreleased/Git repository crates recently, but for this I can get back into it so as to test and try things out.

russel avatar Apr 29 '20 14:04 russel

I had to add:

use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};

use futures::{AsyncRead, AsyncWrite};
use futures_util::io::AsyncReadExt;
use futures_util::io::AsyncWriteExt;

hopefully this is all as expected.

russel avatar Apr 30 '20 17:04 russel

Please don't clutter this issue with unrelated support requests, let's keep this here for API discussion.

(Your problem is that you use a Rc<RefCell<_>> at all, that hints at having things structured the wrong way in your code)

sdroege avatar May 02 '20 06:05 sdroege

Any specific other APIs people want to see wrapped in something more Rust'y with futures, which parts (other than the obvious socket / file APIs) should get focus?

sdroege avatar May 05 '20 10:05 sdroege

The code deals with SocketClient and SocketConnection, is something needed for SocketListener?

russel avatar May 06 '20 17:05 russel

is something needed for SocketListener

The code contains something for SocketListener :) If it's needed would be a question for you though, but I think so.

sdroege avatar May 07 '20 08:05 sdroege

Your code is, unsurprisingly, an awful better than what I was putting together. Thanks.

russel avatar May 07 '20 09:05 russel

Given that the field connection in SocketConnection is private, is there a need for a method such as get_remote_address to get details of the remote end of the connection that results from the accept/incoming?

Also does SocketListener need a close method?

russel avatar May 07 '20 09:05 russel

Given that the field connection in SocketConnection is private, is there a need for a method such as get_remote_address to get details of the remote end of the connection that results from the accept/incoming?

Yes, such API should be added.

Also does SocketListener need a close method?

How would that behave different from dropping it? async-std/tokio TcpListener have no such thing either.

sdroege avatar May 07 '20 09:05 sdroege

The documentation for gio::SocketListener states that an explicit close is required after calling add_address or add_inet_port

russel avatar May 07 '20 09:05 russel

That would happen as part of dropping, or do you see a reason to do it explicetly?

sdroege avatar May 07 '20 09:05 sdroege

The line:

let socket = futures::ready!(future.poll(cx))?;

is giving the error:

error[E0599]: no method named `poll` found for struct `std::pin::Pin<&mut impl core::future::future::Future>` in the current scope

UPDATE: Seemingly solved by adding the statement:

use std::future::Future;

russel avatar May 07 '20 09:05 russel

I think I have been distracted by the comments about calling close: the comments are actually C and not Rust focused. As long as close automatically gets called at end of scope, there is no problem.

russel avatar May 07 '20 09:05 russel

As at 2020-05-08, the line:

for connection in server.incoming() {

given the error:

error[E0277]: `arcamclient::socket_support::Incoming<'_>` is not an iterator

whilst the line:

for connection in server.incoming().await {

(I am just experimenting here) gives the error:

error[E0277]: the trait bound `arcamclient::socket_support::Incoming<'_>: std::future::Future` is not satisfied

russel avatar May 08 '20 11:05 russel

Needs to be while let Some(connection) = incoming.next().await { ... }. Same thing as in all the async-std/tokio examples, that's what it is modeled after.

sdroege avatar May 08 '20 11:05 sdroege

@sdroege Thanks. I was looking at the wrong incoming() documentation for ideas.

I am wondering if it might be worth creating a temporary Git repository for this as a work in progress pending getting stuff into the gio repository.

russel avatar May 08 '20 11:05 russel

I was going to create a fork of the gio repo when I got time for it.

sdroege avatar May 08 '20 11:05 sdroege

It appears that:

for socket_connection in server.incoming().next().await {

blocks permanently even when a connection request arrives. :-(

Update: Actually, it is exactly the opposite, the loop terminates immediately.

Further update: Or rather the await causes the task to terminate: the loop never terminates but the task stops.

And more:

Putting the sequence:

    let mut incoming = server.incoming();
    let next = incoming.next();
    let x = next.await;
    eprintln!("mock_avr850: got an x.");

into an async function which gets started with a spawn_local, the code up to the await executes but the await silently terminates the task without the output being printed.

russel avatar May 08 '20 12:05 russel

Needs some debugging then, please go ahead :) I didn't actually test the above code much, it was only meant as an illustration how the API could look like / be implemented.

sdroege avatar May 08 '20 12:05 sdroege

The code can be found here for now: https://github.com/sdroege/gio-futures

@russel This also doesn't have the bug anymore that you noticed, see the example in src/main.rs.

sdroege avatar May 27 '20 09:05 sdroege

I am switching to using this Git repository rather than the version I had in my repository. I'll send in issues as (if) I come across them. This requires using the Git repositories for gdk-pixbuf, glib, gio, and gtk, which is fine per se. Though it seems that the get_text method on gtk::Label now returns GString not Option<GString>.

russel avatar May 27 '20 09:05 russel

Though it seems that the get_text method on gtk::Label now returns GString not Option<GString>.

That's intentional, it could never return None before.

sdroege avatar May 27 '20 12:05 sdroege