portable-interoperable icon indicating copy to clipboard operation
portable-interoperable copied to clipboard

async trait for runtime

Open NobodyXu opened this issue 3 years ago • 92 comments

I propose to add a new trait for the runtime, which provides portable APIs for creating sockets, pipe, AsyncFd, timers and etc.

Motivation

As mentioned in this comment:

I don't think having async IO traits like Read/Write/BufRead is enough to unify the ecosystem.

Many crates, like reqwest, http server, openssh-mux-client need to create a network socket (tcp/udp/unix socket) and without a way to create it in a portable manner, they will resort back to use a specific runtime.

There is also crates like tokio-pipe which wraps the pipe for tokio users, I think we need to have some way to create AsyncFd in a portable manner.

portable runtime trait 0.2.0

Executor 0.3: https://github.com/nrc/portable-interoperable/issues/13#issuecomment-1186439456 Reactor 0.2: https://github.com/nrc/portable-interoperable/issues/13#issuecomment-1185521802

portable runtime trait 0.1.0

Here is a rough scratch of what I expected, many APIs are elided because there are too many APIs:

(NOTE that these APIs are heavily influenced by tokio because it is the most familiar async runtime API to me)

use std::io::Result;

trait Runtime {
    type Handle: Handle;

    async fn yield_now(&self);

    fn block_in_place<F: FnOnce() -> R, R>(&self, f: F) -> R;

    fn handle(&self) -> &Self::Handle;
}

trait Handle: Clone {
    type JoinHandle<T>: JoinHandle<T>;

    fn block_on<T>(&self, f: impl Future<Output = T>) -> T;

    fn spawn<T>(&self, f: impl Future<Output = T> + Send + 'static) -> Self::JoinHandle<T>;
    fn spawn_local<T>(&self, f: impl Future<Output = T> + 'static) -> Self::JoinHandle<T>;

    /// Provided function, automatically call spawn if f is Send, otherwise call spawn_local.
    /// 
    /// # Motivation
    /// 
    /// Suppose that a user want to write some data into TcpStream at background,
    /// they will likely to use spawn.
    /// 
    /// However, spawn requires Send, in order to use it, they would have to write
    /// `RuntimeNet<TcpStream: Send>`, which is cumbersome.
    /// 
    /// Another option is to just use spawn_local, but it only gives concurrency, not
    /// parallelism.
    /// 
    /// Thus, Runtime should have a provided method spawn_auto to automatically
    /// spawn it on local or spawn it on any thread.
    /// 
    /// This will make the portable Runtime trait much easier to use, and
    /// it would even improve the portability, since users do not have to worry
    /// about the "Send" bound anymore.
    fn spawn_auto<T>(&self, f: impl Future<Output = T> + 'static) -> Self::JoinHandle<T>;
}

trait HandleStd: Handle {
    fn spawn_blocking<F: FnOnce() -> R, R>(&self, f: F) -> Self::JoinHandle<R>;
}

enum JoinErrorKind {
    /// Out of memor when spawning
    Oom,
    /// The future panic
    Panic(Box<dyn Any + Send>),
    /// Cancelled
    Cancelled,
    /// Other
    Other(Box<dyn core::error::Error + Send>),
}
trait JoinError: core::error::Error {
    fn error_kind(self) -> JoinErrorKind;
}

trait JoinHandleBase {
    type Error: JoinError;
}
trait JoinHandle<T>: Future<Output = core::result::Result<T, Self::Error>> + JoinHandleBase {
    fn cancel(&self);
    fn is_done(&self) -> bool;
}

/// Provides AsyncHandle
trait RuntimeAsyncHandle: Runtime {
    type AsyncHandle: AsyncHandle;

    #[cfg(unix)]
    fn from_fd(&self, fd: std::os::unix::io::OwnedFd) -> Result<Self::AsyncHandle>;

    #[cfg(windows)]
    fn from_handle(&self, fd: std::os::windows::io::OwnedHandle) -> Result<Self::AsyncHandle>;
}

trait AsyncHandle: Read + Write + Seek {
    type ReadBuffered: AsyncHandle + BufRead;
    type WriteBuffered: AsyncHandle + BufWrite;
    type ReadWriteBuffered: AsyncHandle + BufRead + BufWrite;

    fn try_clone(&self) -> Result<Self>;

    async fn close(self) -> Result<()>;

    fn into_buffered_read(self, n: usize) -> Self::ReadBuffered;
    fn into_buffered_write(self, n: usize) -> Self::WriteBuffered;

    fn into_buffered(self, read_buffer_size: usize, write_buffer_size: usize) -> Self::ReadWriteBuffered;
}

/// Optional
trait AsyncHandleNet: AsyncHandle {
    type AcceptStream: Stream<Item = Result<AsyncHandle>>;
    type RecvMsgStream: Stream<Item = Result<(Buffer, Auxiliary)>>;

    async fn accept(&self) -> Result<AsyncHandle>;

    /// For multi-shot accept supported by io-uring
    /// Provides a default implementation.
    async fn into_accept_stream(self) -> Result<Self::AcceptStream>;

    async fn sendmsg(&self, buffer: IoSlice<'_>, aux: Auxiliary) -> Result<usize>;
    async fn recvmsg(&self, buffer: IoSliceMut<'_>, aux: Auxiliary) -> Result<usize>;

    /// For multi-shot recvmsg supported by io-uring
    /// Provides a default implementation.
    async fn into_recvmsg_stream(self) -> Result<Self::RecvMsgStream>;

    // Provides default implementation which fallback to read + write if the runtime/target does not support sendfile syscall.
    async fn sendfile(&self, ...) -> Result<usize>;
}

/// Optional, for io-uring
trait AsyncHandleNetExt: AsyncHandleNet {
    async fn connect(&self, ...) -> Result<()>;
}

/// Optional
trait AsyncHandleFile: AsyncHandle + Seek {
    async fn fsync(&self) -> Result<()>;

    /// If the runtime/os does not support this, then it can issue fsync instead.
    async fn fdatasync(&self) -> Result<()>;

    /// If the runtime/os does not support this, then it can issue fdatasync instead.
    async fn sync_file_range(&self, off: u64, n: u64) -> Result<()>;

    /// If the runtime/os does not support this, then it can use read + write to simulate this.
    async fn copy_file_range(&self, off_in: u64, other: &Self, off_out: u64, len: u64) -> Result<()>;

    async fn metadata(&self) -> Result<MetaData>;
    async fn set_metadata(&self, metadata: MetaData) -> Result<()>;
}

/// fs operations, like open, optional
trait RuntimeFs: Runtime {
    async fn open(&self) -> Result<Self::AsyncHandle>;
}

/// signal handling, optional
trait RuntimeSignal: Runtime {
    fn ctrl_c(&self) -> Result<()>;
}

/// spawn process, optional
trait RuntimeProcess: Runtime {
    type Child: ...;

    fn wait(&self, child: std::process::Child) -> Result<Self::Child>;
}

/// Perhaps the same set of methods as tokio::time::Interval?
trait Interval {...}

/// time handling, optional but most likely implemented
trait RuntimeTime: Runtime {
    type Interval: Interval;

    fn interval_at(&self, start: Option<Instant>, period: Duration) -> Self::Interval;
}

#[cfg(unix)]
mod unix {
    use super::*;

    enum Madvice { ... }
    trait RuntimeMem: Runtime {
        async unsafe fn madvice(&self, addr: *mut (), len: usize, advice: Madvice) -> Result<()>;
    }

    trait RuntimeAsyncHandleExt: RuntimeAsyncHandle {
        async fn openat2(&self, ...) -> Result<Self::AsyncHandle>;
    }

    enum SignalKind { ... }
    trait RuntimeSignalExt: RuntimeSignal {
        type Signal: Stream<Item = Result<SigInfo>>;

        fn signal(&self, kind: SinglaKind) -> Result<Self::Signal>;
    }
}

#[cfg(linux)]
mod linux {
    trait AsyncHandlePipe: AsyncHandle {
        async fn splice(&self, ...) -> Result<usize>;

        async fn tee(&self, ...) -> Result<usize>;
    }
}

Amendment Proposal: Archive zero-cost abstraction

After I written up this proposal, I realized that it is not zero-cost. So I created the Amendment Proposal: Archive zero-cost abstraction.

NobodyXu avatar Jun 29 '22 15:06 NobodyXu

It would be simpler to not have separate process/net/file/stdio APIs but rather provide a single unified Socket type (actually, might be better to have Fd on Unix and Socket + Handle on Windows) similar to socket2’s API as well as a spawn_blocking function, and then TcpStream, File, Process etc can all build on that generically. So, more like:

pub trait Runtime {
    #[cfg(unix)]
    type Fd: Fd;

    // true for io_uring, false for epoll, used to determine whether file APIs should just use `spawn_blocking`
    fn supports_non_socket_fds(&self) -> bool;
}

// unix-only
pub trait Fd: From<OwnedFd> + Into<OwnedFd> + AsyncRead + AsyncWrite {}

and std can define a TcpStream<Inner>, where on Unix Inner: Fd and on Windows Inner: Socket.

SabrinaJewson avatar Jun 29 '22 15:06 SabrinaJewson

It would be simpler to not have separate process/net/file/stdio APIs

I doubt that will be a good idea, since the async executor for embedded environment or for some special use cases, e.g. runtime for the main loop of games, does not need to support process/net/file/stdio APIs.

Instead, they could simply just implement Runtime for spawning tasks and RuntimeTime for managing time.

NobodyXu avatar Jun 29 '22 15:06 NobodyXu

Ah, you misunderstood me. I meant that process/net/file/stdio should all be together, but time and task spawning should still be separate.

SabrinaJewson avatar Jun 29 '22 15:06 SabrinaJewson

Ah, you misunderstood me. I meant that process/net/file/stdio should all be together, but time and task spawning should still be separate.

Sorry for that, integrating these traits does sound reasonable, but I am still not completely sure this is a good idea though.

NobodyXu avatar Jun 29 '22 15:06 NobodyXu

Oops...

NobodyXu avatar Jun 29 '22 15:06 NobodyXu

Having this method:

    // true for io_uring, false for epoll, used to determine whether file APIs should just use `spawn_blocking`
    fn supports_non_socket_fds(&self) -> bool;

in Runtime will definitely be necessary.

NobodyXu avatar Jun 29 '22 15:06 NobodyXu

Also, we need to find a way to obtain the runtime.

I personally think the proposal of having a "context" is the best solution for this, since it is zero-cost (does not require thread-local variable) and the crate can easily put a bound on the "context".

NobodyXu avatar Jun 29 '22 15:06 NobodyXu

Ah, you misunderstood me. I meant that process/net/file/stdio should all be together, but time and task spawning should still be separate.

After a second thought, I think it will be better to keep it separated.

While it's true that process/net/file/stdio can be put together, the implementation of the runtime often split them into multiple features.

For example, tokio provides feature flags for each one of them to speedup compilation and reduce bloat.

Similarly, async-std also provides feature flags.

Thus, I think it will be better to leave them as separate traits so that the runtime can choose to provide feature flags for each of them.

NobodyXu avatar Jun 30 '22 02:06 NobodyXu

Under my design, types like TcpStream, process::Child, Stdin etc would still be equally feature flagged. All that wouldn't be feature flagged would be the underlying low-level system call wrappers which would be common between the high level net/process/io interfaces anyway.

SabrinaJewson avatar Jun 30 '22 08:06 SabrinaJewson

Under my design, types like TcpStream, process::Child, Stdin etc would still be equally feature flagged. All that wouldn't be feature flagged would be the underlying low-level system call wrappers which would be common between the high level net/process/io interfaces anyway.

I am a bit confused.

Do you mean that process/net/file/stdio would be in the same trait be feature gated?

NobodyXu avatar Jun 30 '22 08:06 NobodyXu

Each runtime would just provide an implementation of Socket/Handle/Fd, which provides all the functionality of a socket/handle/file descriptor registered with io_uring/IOCP/epoll/etc under an asynchronous interface. It does not specify or keep track of whether said FD refers to a process, or a network socket, or a file, since the underlying interface is the same for them all.

On top of it, the standard library can provide high-level typed wrappers like TcpStream which would hold an Fd and give an API that keeps track of stuff like FD type in the type system. So:

// generic, runtime-independent file API
pub struct File<F: Fd> {
    fd: F,
}
impl<F: Fd> File<F> {
    pub async fn open(runtime: &F::Runtime, path: &Path) -> io::Result<Self> {
        let fd = runtime.open(path).await?;
        Ok(Self { fd })
    }
}
impl<F: Fd> async Read for File<F> {
    async fn read(&mut self, buf: ReadBufRef<'_, '_>) -> io::Result<()> {
        self.fd.read(buf).await
    }
}
// etc

SabrinaJewson avatar Jun 30 '22 08:06 SabrinaJewson

This actually sounds good.

I think we need the runtime to implement AsyncHandle (can be constructed from OwnedFd or windows's OwnedHandle), Socket for async accept, (multishot) async recv/send implementation, process spawning, timer, fs operations.

NobodyXu avatar Jun 30 '22 09:06 NobodyXu

@SabrinaJewson I've adopted your suggestion and updated the proposal.

NobodyXu avatar Jun 30 '22 13:06 NobodyXu

@nrc I have written down a detailed proposal and it seems that we need at least type GAT for spawning tasks.

NobodyXu avatar Jul 02 '22 09:07 NobodyXu

Amendment Proposal: Archive zero-cost abstraction

Motivation

Currently, in order to implement the portable Runtime trait, the async runtime must either use reference counting (Rc/Arc) or global static variables.

None of the solutions above is zero-cost.

Reference counting requires boxing and every time a new AsyncHandle and other types or Future is created, it needs to increase the reference counting and it also needs to decrease it on Drop.

Global static variable is also not zero-cost since it might makes the executable larger or requires use of once_cell.

Thus, I decided to propose this to archive zero-cost abstraction.

Modification

In order to archive zero-cost, we must eliminate the need of reference counting or global static variables and replace them with a reference to the runtime.

To do so, we must add lifetime to every type that implements the portable runtime trait.

There is two way to accomplish this, one is to use lifetime GAT and the other one is to have a new rust language feature that provides 'self lifetime.

Lifetime GAT

We would need to apply the following modifications:

trait Handle: Clone {
    type JoinHandle<'a, T>: JoinHandle<T>;
}
trait RuntimeAsyncHandle: Runtime {
    type AsyncHandle<'a>: AsyncHandle;
}
trait AsyncHandle: Read + Write + Seek {
    type ReadBuffered<'a>: AsyncHandle + BufRead;
    type WriteBuffered<'a>: AsyncHandle + BufWrite;
    type ReadWriteBuffered<'a>: AsyncHandle + BufRead + BufWrite;
}
trait AsyncHandleNet: AsyncHandle {
    type AcceptStream<'a>: Stream<Item = Result<AsyncHandle>>;
    type RecvMsgStream<'a>: Stream<Item = Result<(Buffer, Auxiliary)>>;
}
trait RuntimeProcess: Runtime {
    type Command<'a>: ...;
}
trait RuntimeTime: Runtime {
    type Interval<'a>: Interval;
}
#[cfg(unix)]
mod unix {
    trait RuntimeSignalExt: RuntimeSignal {
        type Signal<'a>: Stream<Item = Result<SigInfo>>;
    }
}

This requires each type in the traits to be modified and add <'a> bound to them.

This also means that we would have to depend on GAT and it might further complicate the portable async story, especially for new users.

'self lifetime

On the contrary, using 'self lifetime is trival.

No modification to triat, we just need to add 'self to the type specified when implementing these traits.

spawn

In additional to these changes, we also need to relax the 'static lifetime bound in spawn:

fn spawn<'this, 'future: 'this, F, T>(&'this self, f: F)
    where F: Future<Output = T> + Send + Sync + 'future -> Self::JoinHandle<T>;

This makes sure that the futures' lifetime is at least as long as the runtime.

Though the implementation of runtime will be more complicated, as they have to handle the order of dropping correctly, manually create the vtable for the future since they cannot use dyn Future<...> anymore and they also need to Pin their runtime to ensure that it cannot forgotten safely.

Expected implementation

I would expect the following implementation:

struct SampleRtInner {
    ...
}

struct SampleRt {
    /// Declare thread_pool first to ensure it will
    /// be dropped before inner.
    thread_pool: ScopedThreadPool,
    inner: SampleRtInner,
}

impl Runtime for Pin<&SampleRt> {
    ...
}

impl SampleRt {
    pub fn new() -> Self {
        todo!()
    }

    pub fn new_scoped<F, T>(f: F) -> T
        where F: FnOnce(Pin<&Self>) -> T
    {
        let this = Self::new();
        std::pin::pin!(this);

        f(this.deref())
    }
}

Call for review

I know this amendment proposal must be controversial as this requires either GAT or a new feature 'self, but I think if we want truely portable and zero-cost runtime traits, it is the way to go.

@nrc @SabrinaJewson Can you guys review this please? Thanks in advance!

NobodyXu avatar Jul 04 '22 15:07 NobodyXu

Hey @NobodyXu I haven't looked in detail at the amendment above, I'm still at the information and requirements gathering stage of work on executors. But two comments: I expect using lifetime GATs is fine in general, they seem on a solid path towards stabilisation. I don't think being zero-cost in the sense of having no runtime overhead at all is necessary, the cost of a few statics or an occasional reference count incr/decr is well worth it for the sake of improved ergonomics (if that is in fact the trade-off here).

nrc avatar Jul 05 '22 05:07 nrc

I expect using lifetime GATs is fine in general, they seem on a solid path towards stabilisation.

It's good to hear that!

the cost of a few statics or an occasional reference count incr/decr is well worth it for the sake of improved ergonomics (if that is in fact the trade-off here).

Well, I don't think using a few statics or reference counting improve ergonomics.

The problem here is zero-cost runtime abstraction requires lifetime GAT, but if that is fine, then I don't think it will have a lot impact on ergonomics.

In fact, I would argue the current API makes more sense.

Users of this portable runtime can clearly see how their AsyncHandle, Interval and futures are created and understand it instead of treating it like black magic and get confused when it fails.

For example, beginners might attempt to create AsyncFd or TcpSocket in non-async context, like a regular fn f() -> TcpSocket.

When using that function f, it will work fine most of the time, but it is easy for beginners to abuse and call that in non-async place where #[tokio::main]/#[tokio::test] has not been used or a tokio::runtime::Runtime has been created but Runtim::enter is not called.

In that case, the code will compile, but fail at runtime with a panic.

With this portable runtime proposal, this won't happen, because users need a Runtime trait (probably as context) to create AsyncHandle, TcpSocket or etc and it will fail at compile time if they forget to add that as a context.

NobodyXu avatar Jul 05 '22 06:07 NobodyXu

the cost of a few statics or an occasional reference count incr/decr is well worth it for the sake of improved ergonomics (if that is in fact the trade-off here).

In additional to the ergonomic improvement I mentioned in the last comment, the amendment proposal also improves ergonomic for spawn.

Currently, spawn has to be 'static, which is understandable but too strict. With this amendment proposal, spawn can accept any future that lives as long as the runtime itself.

NobodyXu avatar Jul 05 '22 06:07 NobodyXu

@nrc The original proposal also uses specialisation:

// Provided function, automatically call spawn if f is  Send, otherwise call spawn_local.
fn spawn_auto<T>(&self, f: impl Future<Output = T> + 'static) -> Self::JoinHandle<T>;

Motivation

Suppose that a user want to write some data into TcpStream at background, they will likely to use spawn.

However, spawn requires Send. In order to use it, they would have to write RuntimeNet<TcpStream: Send>, which is cumbersome.

Another option is to just use spawn_local, but it only gives concurrency, not parallelism.

Thus, Runtime should have a provided method spawn_auto to automatically spawn it on local or spawn it on any thread.

This will make the portable Runtime trait much easier to use, and it would even improve the portability, since users do not have to worry about the Send bound anymore.

NobodyXu avatar Jul 06 '22 08:07 NobodyXu

This proposal definitely needs more feedbacks, espeically from whom maintains the async runtime.

@rust-lang/wg-async, @rust-lang/libs-api @Darksonn @joshtriplett @ihciah @Noah-Kennedy @HippoBaro

NobodyXu avatar Jul 13 '22 06:07 NobodyXu

Well, I looked it over and it seems rather complicated. But perhaps that's unavoidable. I don't think the current location of your spawn_local method is compatible with the spawn_local in Tokio due to the requirement of using a LocalSet, which does not have a handle type.

Darksonn avatar Jul 13 '22 07:07 Darksonn

Well, I looked it over and it seems rather complicated.

Part of that is because I also want to support file/fs async operation for io-uring based async executor.

the spawn_local in Tokio due to the requirement of using a LocalSet

Is there any reason why tokio has to use a LocalSet? async_std::task::spawn_local does not require anything equivalent to a LocalSet AFAIK.

NobodyXu avatar Jul 13 '22 07:07 NobodyXu

Well, on the multi-threaded executor, local spawns are a problem due to our block_in_place method. The block_in_place method is supposed to move everything on the current worker thread to a new worker thread, which is impossible if you can spawn_local on that runtime.

As for the current-thread runtime that doesn't support block_in_place, it's because the Runtime object itself is Send, so moving the Runtime would let you move the non-Send task across threads.

Darksonn avatar Jul 13 '22 07:07 Darksonn

You are right, that is something I didn't take into account. I've removed block_in_place API from my proposal.

NobodyXu avatar Jul 13 '22 07:07 NobodyXu

Unfortunately, uring-based runtimes are going to necessitate very different APIs for their completion-based operations.

Noah-Kennedy avatar Jul 13 '22 14:07 Noah-Kennedy

Unfortunately, uring-based runtimes are going to necessitate very different APIs for their completion-based operations.

I suppose that the main difference is how to read and write into fd (file/net), since io-uring requires to either take the ownership of the buffer, or use a io-uring managed buffer?

NobodyXu avatar Jul 13 '22 14:07 NobodyXu

That's one difference, another is that (currently working on doing this for tokio-uring) there are a lot of neat flags you can set on the SQE pre-submission that a user may well want to configure, including things like provided buffers, fixed buffers, etc.

Noah-Kennedy avatar Jul 13 '22 15:07 Noah-Kennedy

@Noah-Kennedy For the difference between read and write, you might want to readn and give feedback for it on #5

there are a lot of neat flags you can set on the SQE pre-submission that a user may well want to configure, including things like provided buffers, fixed buffers, etc.

Portable runtime traits are designed mainly for library, so perhaps the user can configure provided/fixed buffers can be preconfigured that in main or something like that, where they know which runtime they are using?

We can also add functions as hints or adjust the buffer of BufRead/BufWrite:

    fn into_buffered_read(self, n: usize) -> Self::ReadBuffered;
    fn into_buffered_write(self, n: usize) -> Self::WriteBuffered;

    fn into_buffered(self, read_buffer_size: usize, write_buffer_size: usize) -> Self::ReadWriteBuffered;

We might not able to expose all functionalities io-uring provides, so I think we should focus on providing a portable, common interface for crates/library writers.

For these cases where more details can significantly improve performance, perhaps they can use specialisation to archive this?

NobodyXu avatar Jul 13 '22 15:07 NobodyXu

I think this is more emblematic of the limitations of trying to create a standard interface for async runtimes. A lot of runtimes are trying to do specialized things in specialized ways (see glommio for a good example), and thus you can't really abstract over all runtimes terribly well in a lot of cases.

While async-std and tokio are both relatively close in API design and thus more easily abstracted over, outside of them, I think that specialization would need to be the norm rather than the exception unfortunately.

Noah-Kennedy avatar Jul 13 '22 15:07 Noah-Kennedy

I am incredibly confused by the level of complexity here.

Supplying a runtime doesn't mean supplying file or socket types or handling for fds or handles or process creation or similar. The standard library will already have all of those, and they'll be the same types and functions no matter what runtime you're using; that's a big part of what it means to unify the ecosystem. If you want to use the types supplied by some other ecosystem, you can reference that ecosystem's crates. We're going to have one implementation of AsyncFile, not an abstraction that lets each runtime have its own.

The runtime trait should support spawning futures (spawn and spawn_blocking), and possibly spawn_local (though I'm not sure if we need that or if we'll end up with a better solution in terms of "futures that can be sent before they're polled but not after").

spawn_auto, if we need it (and I'm not sure we do since we won't have the problem of abstracting over types mentioned in its doc comment) could be supplied either as a free function that calls the global runtime, or via a sealed extension trait with a blanket impl for Runtime.

The runtime trait shouldn't require an associated type for JoinHandle; we should have a single type for that, with unified semantics, so that callers can work with one without having to abstract over it using dyn or similar. We can store enough information inside that JoinHandle that the runtime can get the information it needs back out. I would expect that storing a pointer should suffice for that.

joshtriplett avatar Jul 13 '22 16:07 joshtriplett