async trait for runtime
I propose to add a new trait for the runtime, which provides portable APIs for creating sockets, pipe, AsyncFd, timers and etc.
Motivation
As mentioned in this comment:
I don't think having async IO traits like Read/Write/BufRead is enough to unify the ecosystem.
Many crates, like reqwest, http server, openssh-mux-client need to create a network socket (tcp/udp/unix socket) and without a way to create it in a portable manner, they will resort back to use a specific runtime.
There is also crates like tokio-pipe which wraps the pipe for tokio users, I think we need to have some way to create AsyncFd in a portable manner.
portable runtime trait 0.2.0
Executor 0.3: https://github.com/nrc/portable-interoperable/issues/13#issuecomment-1186439456 Reactor 0.2: https://github.com/nrc/portable-interoperable/issues/13#issuecomment-1185521802
portable runtime trait 0.1.0
Here is a rough scratch of what I expected, many APIs are elided because there are too many APIs:
(NOTE that these APIs are heavily influenced by tokio because it is the most familiar async runtime API to me)
use std::io::Result;
trait Runtime {
type Handle: Handle;
async fn yield_now(&self);
fn block_in_place<F: FnOnce() -> R, R>(&self, f: F) -> R;
fn handle(&self) -> &Self::Handle;
}
trait Handle: Clone {
type JoinHandle<T>: JoinHandle<T>;
fn block_on<T>(&self, f: impl Future<Output = T>) -> T;
fn spawn<T>(&self, f: impl Future<Output = T> + Send + 'static) -> Self::JoinHandle<T>;
fn spawn_local<T>(&self, f: impl Future<Output = T> + 'static) -> Self::JoinHandle<T>;
/// Provided function, automatically call spawn if f is Send, otherwise call spawn_local.
///
/// # Motivation
///
/// Suppose that a user want to write some data into TcpStream at background,
/// they will likely to use spawn.
///
/// However, spawn requires Send, in order to use it, they would have to write
/// `RuntimeNet<TcpStream: Send>`, which is cumbersome.
///
/// Another option is to just use spawn_local, but it only gives concurrency, not
/// parallelism.
///
/// Thus, Runtime should have a provided method spawn_auto to automatically
/// spawn it on local or spawn it on any thread.
///
/// This will make the portable Runtime trait much easier to use, and
/// it would even improve the portability, since users do not have to worry
/// about the "Send" bound anymore.
fn spawn_auto<T>(&self, f: impl Future<Output = T> + 'static) -> Self::JoinHandle<T>;
}
trait HandleStd: Handle {
fn spawn_blocking<F: FnOnce() -> R, R>(&self, f: F) -> Self::JoinHandle<R>;
}
enum JoinErrorKind {
/// Out of memor when spawning
Oom,
/// The future panic
Panic(Box<dyn Any + Send>),
/// Cancelled
Cancelled,
/// Other
Other(Box<dyn core::error::Error + Send>),
}
trait JoinError: core::error::Error {
fn error_kind(self) -> JoinErrorKind;
}
trait JoinHandleBase {
type Error: JoinError;
}
trait JoinHandle<T>: Future<Output = core::result::Result<T, Self::Error>> + JoinHandleBase {
fn cancel(&self);
fn is_done(&self) -> bool;
}
/// Provides AsyncHandle
trait RuntimeAsyncHandle: Runtime {
type AsyncHandle: AsyncHandle;
#[cfg(unix)]
fn from_fd(&self, fd: std::os::unix::io::OwnedFd) -> Result<Self::AsyncHandle>;
#[cfg(windows)]
fn from_handle(&self, fd: std::os::windows::io::OwnedHandle) -> Result<Self::AsyncHandle>;
}
trait AsyncHandle: Read + Write + Seek {
type ReadBuffered: AsyncHandle + BufRead;
type WriteBuffered: AsyncHandle + BufWrite;
type ReadWriteBuffered: AsyncHandle + BufRead + BufWrite;
fn try_clone(&self) -> Result<Self>;
async fn close(self) -> Result<()>;
fn into_buffered_read(self, n: usize) -> Self::ReadBuffered;
fn into_buffered_write(self, n: usize) -> Self::WriteBuffered;
fn into_buffered(self, read_buffer_size: usize, write_buffer_size: usize) -> Self::ReadWriteBuffered;
}
/// Optional
trait AsyncHandleNet: AsyncHandle {
type AcceptStream: Stream<Item = Result<AsyncHandle>>;
type RecvMsgStream: Stream<Item = Result<(Buffer, Auxiliary)>>;
async fn accept(&self) -> Result<AsyncHandle>;
/// For multi-shot accept supported by io-uring
/// Provides a default implementation.
async fn into_accept_stream(self) -> Result<Self::AcceptStream>;
async fn sendmsg(&self, buffer: IoSlice<'_>, aux: Auxiliary) -> Result<usize>;
async fn recvmsg(&self, buffer: IoSliceMut<'_>, aux: Auxiliary) -> Result<usize>;
/// For multi-shot recvmsg supported by io-uring
/// Provides a default implementation.
async fn into_recvmsg_stream(self) -> Result<Self::RecvMsgStream>;
// Provides default implementation which fallback to read + write if the runtime/target does not support sendfile syscall.
async fn sendfile(&self, ...) -> Result<usize>;
}
/// Optional, for io-uring
trait AsyncHandleNetExt: AsyncHandleNet {
async fn connect(&self, ...) -> Result<()>;
}
/// Optional
trait AsyncHandleFile: AsyncHandle + Seek {
async fn fsync(&self) -> Result<()>;
/// If the runtime/os does not support this, then it can issue fsync instead.
async fn fdatasync(&self) -> Result<()>;
/// If the runtime/os does not support this, then it can issue fdatasync instead.
async fn sync_file_range(&self, off: u64, n: u64) -> Result<()>;
/// If the runtime/os does not support this, then it can use read + write to simulate this.
async fn copy_file_range(&self, off_in: u64, other: &Self, off_out: u64, len: u64) -> Result<()>;
async fn metadata(&self) -> Result<MetaData>;
async fn set_metadata(&self, metadata: MetaData) -> Result<()>;
}
/// fs operations, like open, optional
trait RuntimeFs: Runtime {
async fn open(&self) -> Result<Self::AsyncHandle>;
}
/// signal handling, optional
trait RuntimeSignal: Runtime {
fn ctrl_c(&self) -> Result<()>;
}
/// spawn process, optional
trait RuntimeProcess: Runtime {
type Child: ...;
fn wait(&self, child: std::process::Child) -> Result<Self::Child>;
}
/// Perhaps the same set of methods as tokio::time::Interval?
trait Interval {...}
/// time handling, optional but most likely implemented
trait RuntimeTime: Runtime {
type Interval: Interval;
fn interval_at(&self, start: Option<Instant>, period: Duration) -> Self::Interval;
}
#[cfg(unix)]
mod unix {
use super::*;
enum Madvice { ... }
trait RuntimeMem: Runtime {
async unsafe fn madvice(&self, addr: *mut (), len: usize, advice: Madvice) -> Result<()>;
}
trait RuntimeAsyncHandleExt: RuntimeAsyncHandle {
async fn openat2(&self, ...) -> Result<Self::AsyncHandle>;
}
enum SignalKind { ... }
trait RuntimeSignalExt: RuntimeSignal {
type Signal: Stream<Item = Result<SigInfo>>;
fn signal(&self, kind: SinglaKind) -> Result<Self::Signal>;
}
}
#[cfg(linux)]
mod linux {
trait AsyncHandlePipe: AsyncHandle {
async fn splice(&self, ...) -> Result<usize>;
async fn tee(&self, ...) -> Result<usize>;
}
}
Amendment Proposal: Archive zero-cost abstraction
After I written up this proposal, I realized that it is not zero-cost. So I created the Amendment Proposal: Archive zero-cost abstraction.
It would be simpler to not have separate process/net/file/stdio APIs but rather provide a single unified Socket type (actually, might be better to have Fd on Unix and Socket + Handle on Windows) similar to socket2’s API as well as a spawn_blocking function, and then TcpStream, File, Process etc can all build on that generically. So, more like:
pub trait Runtime {
#[cfg(unix)]
type Fd: Fd;
// true for io_uring, false for epoll, used to determine whether file APIs should just use `spawn_blocking`
fn supports_non_socket_fds(&self) -> bool;
}
// unix-only
pub trait Fd: From<OwnedFd> + Into<OwnedFd> + AsyncRead + AsyncWrite {}
and std can define a TcpStream<Inner>, where on Unix Inner: Fd and on Windows Inner: Socket.
It would be simpler to not have separate process/net/file/stdio APIs
I doubt that will be a good idea, since the async executor for embedded environment or for some special use cases, e.g. runtime for the main loop of games, does not need to support process/net/file/stdio APIs.
Instead, they could simply just implement Runtime for spawning tasks and RuntimeTime for managing time.
Ah, you misunderstood me. I meant that process/net/file/stdio should all be together, but time and task spawning should still be separate.
Ah, you misunderstood me. I meant that process/net/file/stdio should all be together, but time and task spawning should still be separate.
Sorry for that, integrating these traits does sound reasonable, but I am still not completely sure this is a good idea though.
Oops...
Having this method:
// true for io_uring, false for epoll, used to determine whether file APIs should just use `spawn_blocking`
fn supports_non_socket_fds(&self) -> bool;
in Runtime will definitely be necessary.
Also, we need to find a way to obtain the runtime.
I personally think the proposal of having a "context" is the best solution for this, since it is zero-cost (does not require thread-local variable) and the crate can easily put a bound on the "context".
Ah, you misunderstood me. I meant that process/net/file/stdio should all be together, but time and task spawning should still be separate.
After a second thought, I think it will be better to keep it separated.
While it's true that process/net/file/stdio can be put together, the implementation of the runtime often split them into multiple features.
For example, tokio provides feature flags for each one of them to speedup compilation and reduce bloat.
Similarly, async-std also provides feature flags.
Thus, I think it will be better to leave them as separate traits so that the runtime can choose to provide feature flags for each of them.
Under my design, types like TcpStream, process::Child, Stdin etc would still be equally feature flagged. All that wouldn't be feature flagged would be the underlying low-level system call wrappers which would be common between the high level net/process/io interfaces anyway.
Under my design, types like TcpStream, process::Child, Stdin etc would still be equally feature flagged. All that wouldn't be feature flagged would be the underlying low-level system call wrappers which would be common between the high level net/process/io interfaces anyway.
I am a bit confused.
Do you mean that process/net/file/stdio would be in the same trait be feature gated?
Each runtime would just provide an implementation of Socket/Handle/Fd, which provides all the functionality of a socket/handle/file descriptor registered with io_uring/IOCP/epoll/etc under an asynchronous interface. It does not specify or keep track of whether said FD refers to a process, or a network socket, or a file, since the underlying interface is the same for them all.
On top of it, the standard library can provide high-level typed wrappers like TcpStream which would hold an Fd and give an API that keeps track of stuff like FD type in the type system. So:
// generic, runtime-independent file API
pub struct File<F: Fd> {
fd: F,
}
impl<F: Fd> File<F> {
pub async fn open(runtime: &F::Runtime, path: &Path) -> io::Result<Self> {
let fd = runtime.open(path).await?;
Ok(Self { fd })
}
}
impl<F: Fd> async Read for File<F> {
async fn read(&mut self, buf: ReadBufRef<'_, '_>) -> io::Result<()> {
self.fd.read(buf).await
}
}
// etc
This actually sounds good.
I think we need the runtime to implement AsyncHandle (can be constructed from OwnedFd or windows's OwnedHandle), Socket for async accept, (multishot) async recv/send implementation, process spawning, timer, fs operations.
@SabrinaJewson I've adopted your suggestion and updated the proposal.
@nrc I have written down a detailed proposal and it seems that we need at least type GAT for spawning tasks.
Amendment Proposal: Archive zero-cost abstraction
Motivation
Currently, in order to implement the portable Runtime trait, the async runtime
must either use reference counting (Rc/Arc) or global static variables.
None of the solutions above is zero-cost.
Reference counting requires boxing and every time a new AsyncHandle and other types
or Future is created, it needs to increase the reference counting and it also
needs to decrease it on Drop.
Global static variable is also not zero-cost since it might makes the executable
larger or requires use of once_cell.
Thus, I decided to propose this to archive zero-cost abstraction.
Modification
In order to archive zero-cost, we must eliminate the need of reference counting
or global static variables and replace them with a reference to the runtime.
To do so, we must add lifetime to every type that implements the portable runtime trait.
There is two way to accomplish this, one is to use lifetime GAT and the other one is
to have a new rust language feature that provides 'self lifetime.
Lifetime GAT
We would need to apply the following modifications:
trait Handle: Clone {
type JoinHandle<'a, T>: JoinHandle<T>;
}
trait RuntimeAsyncHandle: Runtime {
type AsyncHandle<'a>: AsyncHandle;
}
trait AsyncHandle: Read + Write + Seek {
type ReadBuffered<'a>: AsyncHandle + BufRead;
type WriteBuffered<'a>: AsyncHandle + BufWrite;
type ReadWriteBuffered<'a>: AsyncHandle + BufRead + BufWrite;
}
trait AsyncHandleNet: AsyncHandle {
type AcceptStream<'a>: Stream<Item = Result<AsyncHandle>>;
type RecvMsgStream<'a>: Stream<Item = Result<(Buffer, Auxiliary)>>;
}
trait RuntimeProcess: Runtime {
type Command<'a>: ...;
}
trait RuntimeTime: Runtime {
type Interval<'a>: Interval;
}
#[cfg(unix)]
mod unix {
trait RuntimeSignalExt: RuntimeSignal {
type Signal<'a>: Stream<Item = Result<SigInfo>>;
}
}
This requires each type in the traits to be modified and add <'a> bound
to them.
This also means that we would have to depend on GAT and it might further complicate the portable async story, especially for new users.
'self lifetime
On the contrary, using 'self lifetime is trival.
No modification to triat, we just need to add 'self to the type specified
when implementing these traits.
spawn
In additional to these changes, we also need to relax the 'static lifetime bound
in spawn:
fn spawn<'this, 'future: 'this, F, T>(&'this self, f: F)
where F: Future<Output = T> + Send + Sync + 'future -> Self::JoinHandle<T>;
This makes sure that the futures' lifetime is at least as long as the runtime.
Though the implementation of runtime will be more complicated, as they have to handle the order of dropping correctly,
manually create the vtable for the future since they cannot use dyn Future<...> anymore
and they also need to Pin their runtime to ensure that it cannot forgotten safely.
Expected implementation
I would expect the following implementation:
struct SampleRtInner {
...
}
struct SampleRt {
/// Declare thread_pool first to ensure it will
/// be dropped before inner.
thread_pool: ScopedThreadPool,
inner: SampleRtInner,
}
impl Runtime for Pin<&SampleRt> {
...
}
impl SampleRt {
pub fn new() -> Self {
todo!()
}
pub fn new_scoped<F, T>(f: F) -> T
where F: FnOnce(Pin<&Self>) -> T
{
let this = Self::new();
std::pin::pin!(this);
f(this.deref())
}
}
Call for review
I know this amendment proposal must be controversial as this requires either
GAT or a new feature 'self, but I think if we want truely portable and zero-cost
runtime traits, it is the way to go.
@nrc @SabrinaJewson Can you guys review this please? Thanks in advance!
Hey @NobodyXu I haven't looked in detail at the amendment above, I'm still at the information and requirements gathering stage of work on executors. But two comments: I expect using lifetime GATs is fine in general, they seem on a solid path towards stabilisation. I don't think being zero-cost in the sense of having no runtime overhead at all is necessary, the cost of a few statics or an occasional reference count incr/decr is well worth it for the sake of improved ergonomics (if that is in fact the trade-off here).
I expect using lifetime GATs is fine in general, they seem on a solid path towards stabilisation.
It's good to hear that!
the cost of a few statics or an occasional reference count incr/decr is well worth it for the sake of improved ergonomics (if that is in fact the trade-off here).
Well, I don't think using a few statics or reference counting improve ergonomics.
The problem here is zero-cost runtime abstraction requires lifetime GAT, but if that is fine, then I don't think it will have a lot impact on ergonomics.
In fact, I would argue the current API makes more sense.
Users of this portable runtime can clearly see how their AsyncHandle, Interval and futures are created and understand it instead of treating it like black magic and get confused when it fails.
For example, beginners might attempt to create AsyncFd or TcpSocket in non-async context, like a regular fn f() -> TcpSocket.
When using that function f, it will work fine most of the time, but it is easy for beginners to abuse and call that in non-async place where #[tokio::main]/#[tokio::test] has not been used or a tokio::runtime::Runtime has been created but Runtim::enter is not called.
In that case, the code will compile, but fail at runtime with a panic.
With this portable runtime proposal, this won't happen, because users need a Runtime trait (probably as context) to create AsyncHandle, TcpSocket or etc and it will fail at compile time if they forget to add that as a context.
the cost of a few statics or an occasional reference count incr/decr is well worth it for the sake of improved ergonomics (if that is in fact the trade-off here).
In additional to the ergonomic improvement I mentioned in the last comment, the amendment proposal also improves ergonomic for spawn.
Currently, spawn has to be 'static, which is understandable but too strict.
With this amendment proposal, spawn can accept any future that lives as long as the runtime itself.
@nrc The original proposal also uses specialisation:
// Provided function, automatically call spawn if f is Send, otherwise call spawn_local.
fn spawn_auto<T>(&self, f: impl Future<Output = T> + 'static) -> Self::JoinHandle<T>;
Motivation
Suppose that a user want to write some data into TcpStream at background,
they will likely to use spawn.
However, spawn requires Send. In order to use it, they would have to write
RuntimeNet<TcpStream: Send>, which is cumbersome.
Another option is to just use spawn_local, but it only gives concurrency, not
parallelism.
Thus, Runtime should have a provided method spawn_auto to automatically
spawn it on local or spawn it on any thread.
This will make the portable Runtime trait much easier to use, and
it would even improve the portability, since users do not have to worry
about the Send bound anymore.
This proposal definitely needs more feedbacks, espeically from whom maintains the async runtime.
@rust-lang/wg-async, @rust-lang/libs-api @Darksonn @joshtriplett @ihciah @Noah-Kennedy @HippoBaro
Well, I looked it over and it seems rather complicated. But perhaps that's unavoidable. I don't think the current location of your spawn_local method is compatible with the spawn_local in Tokio due to the requirement of using a LocalSet, which does not have a handle type.
Well, I looked it over and it seems rather complicated.
Part of that is because I also want to support file/fs async operation for io-uring based async executor.
the
spawn_localin Tokio due to the requirement of using aLocalSet
Is there any reason why tokio has to use a LocalSet?
async_std::task::spawn_local does not require anything equivalent to a LocalSet AFAIK.
Well, on the multi-threaded executor, local spawns are a problem due to our block_in_place method. The block_in_place method is supposed to move everything on the current worker thread to a new worker thread, which is impossible if you can spawn_local on that runtime.
As for the current-thread runtime that doesn't support block_in_place, it's because the Runtime object itself is Send, so moving the Runtime would let you move the non-Send task across threads.
You are right, that is something I didn't take into account.
I've removed block_in_place API from my proposal.
Unfortunately, uring-based runtimes are going to necessitate very different APIs for their completion-based operations.
Unfortunately, uring-based runtimes are going to necessitate very different APIs for their completion-based operations.
I suppose that the main difference is how to read and write into fd (file/net), since io-uring requires to either take the ownership of the buffer, or use a io-uring managed buffer?
That's one difference, another is that (currently working on doing this for tokio-uring) there are a lot of neat flags you can set on the SQE pre-submission that a user may well want to configure, including things like provided buffers, fixed buffers, etc.
@Noah-Kennedy For the difference between read and write, you might want to readn and give feedback for it on #5
there are a lot of neat flags you can set on the SQE pre-submission that a user may well want to configure, including things like provided buffers, fixed buffers, etc.
Portable runtime traits are designed mainly for library, so perhaps the user can configure provided/fixed buffers can be preconfigured that in main or something like that, where they know which runtime they are using?
We can also add functions as hints or adjust the buffer of BufRead/BufWrite:
fn into_buffered_read(self, n: usize) -> Self::ReadBuffered;
fn into_buffered_write(self, n: usize) -> Self::WriteBuffered;
fn into_buffered(self, read_buffer_size: usize, write_buffer_size: usize) -> Self::ReadWriteBuffered;
We might not able to expose all functionalities io-uring provides, so I think we should focus on providing a portable, common interface for crates/library writers.
For these cases where more details can significantly improve performance, perhaps they can use specialisation to archive this?
I think this is more emblematic of the limitations of trying to create a standard interface for async runtimes. A lot of runtimes are trying to do specialized things in specialized ways (see glommio for a good example), and thus you can't really abstract over all runtimes terribly well in a lot of cases.
While async-std and tokio are both relatively close in API design and thus more easily abstracted over, outside of them, I think that specialization would need to be the norm rather than the exception unfortunately.
I am incredibly confused by the level of complexity here.
Supplying a runtime doesn't mean supplying file or socket types or handling for fds or handles or process creation or similar. The standard library will already have all of those, and they'll be the same types and functions no matter what runtime you're using; that's a big part of what it means to unify the ecosystem. If you want to use the types supplied by some other ecosystem, you can reference that ecosystem's crates. We're going to have one implementation of AsyncFile, not an abstraction that lets each runtime have its own.
The runtime trait should support spawning futures (spawn and spawn_blocking), and possibly spawn_local (though I'm not sure if we need that or if we'll end up with a better solution in terms of "futures that can be sent before they're polled but not after").
spawn_auto, if we need it (and I'm not sure we do since we won't have the problem of abstracting over types mentioned in its doc comment) could be supplied either as a free function that calls the global runtime, or via a sealed extension trait with a blanket impl for Runtime.
The runtime trait shouldn't require an associated type for JoinHandle; we should have a single type for that, with unified semantics, so that callers can work with one without having to abstract over it using dyn or similar. We can store enough information inside that JoinHandle that the runtime can get the information it needs back out. I would expect that storing a pointer should suffice for that.