msgpack-rust icon indicating copy to clipboard operation
msgpack-rust copied to clipboard

Support for AsyncRead/AsyncWrite

Open KillTheMule opened this issue 5 years ago • 9 comments
trafficstars

Hey, I'm in the process of making an async version of https://github.com/daa84/neovim-lib. One hurdle I'm facing is that msgpack-rust only support synchronous IO. It would be great (well, at least for me) if it would work asynchronously as well.

Are there plans? Would you accept a PR? If so, how should it be implemented? I guess one does not want to throw away the sync versions, so should the API be duplicated, or should it be a feature switch?

KillTheMule avatar Dec 07 '19 11:12 KillTheMule

How would result of partial parsing be exposed? Is it supposed to work like an XML SAX parser?

If you're not planning to use incomplete data of an incomplete file, then I suggest just buffering input and using sync version on the buffer. The bytes crate will give you an efficient way to buffer async chunks and then expose them as a reader.

Parsing doesn't benefit much from suspending operation, so you really don't gain much form involving async. In fact, an async parser could be slower and more memory-hungry, because it'd have to suspend state in a Future rather than using thread's stack.

So I don't really see a point in supporting AsyncRead/AsyncWrite. Just because you have an async stream it doesn't mean everything this stream touches has too. Use buffers or adapters.

kornelski avatar Dec 07 '19 12:12 kornelski

How would result of partial parsing be exposed? Is it supposed to work like an XML SAX parser?

I have no idea what the latter part means :) I also don't really have an answer for the first, I was thinking along the lines of "if there's not enough data, (a)wait for more". Isn't that how the sync version is working, too?

If you're not planning to use incomplete data of an incomplete file, then I suggest just buffering input and using sync version on the buffer.

That's what I'm trying to do indeed. But while trying to figure out some problems I thought it might be worthwhile to have it in the lib directly.

(e) To add, I've got some help from the tokio discord, but still was not able to implement that "buffering input". Maybe it's not easy?

KillTheMule avatar Dec 07 '19 13:12 KillTheMule

If you're OK with having to wait for the full file to be downloaded before using it, then this crate doesn't need to support async. You break the problem into two:

  1. Download the file using async
  2. Parse the file as usual using sync interfaces

This crate would only need to support async itself if you wanted to partially parse and use incomplete data before the file finishes loading.

kornelski avatar Dec 07 '19 16:12 kornelski

If you're OK with having to wait for the full file to be downloaded before using it, then this crate doesn't need to support async.

Well, the point of async here would be to do something else while waiting.

There's no file download involved though, I need to read from a ChildStdout. I've made it work just now, if you want, see here. Note this is very raw still, but it seems to do what I want.

Note the main point, I can await the reader here, which means if there's no data to be had, other stuff can run. If we somehow made an async version of model::decode, that block could be something like model::decode(&mut reader).await (model::decode is just a wrapper around rmpv::decode::read_value that makes sure we've got an array with 4 or 3 elements, depending on the first one).

KillTheMule avatar Dec 07 '19 16:12 KillTheMule

I mean you can do anything else while waiting. Sync interface doesn't mean you have to wait for this crate.

You can download data to a Vec any way you want, including doing it using non-blocking async clients. Then supply the Vec to this crate, and it will parse the Vec immediately, without waiting for any I/O.

You can even do parsing on an async threadpool, so even long parsing time won't block anything.

kornelski avatar Dec 07 '19 17:12 kornelski

I mean you can do anything else while waiting. Sync interface doesn't mean you have to wait for this crate. You can download data to a Vec any way you want, including doing it using non-blocking async clients. Then supply the Vec to this crate, and it will parse the Vec immediately, without waiting for any I/O.

Well, but the main problem is, you don't know how much data to download so it can be parsed. This restricts to the pattern I have implemented: Download into a buffer, try to parse it, check out a possibly resulting error, and if there wasn't enough data, download some more until it fits. You also don't know how much data was read after a successful parse, so you need to, e.g., wrap the Vec into a Cursor to check that out afterwards.

That feels like quite a bit of ceremony, and also ineffective besides (although I don't have any numbers to back up that feeling). An async interface would make that much more smooth.

Please don't misunderstand though, I was offering to start of implementing that because I thought it might be useful for others as well. My personal usecase is somewhat solved by the code I linked, so if you decide against the idea it won't hurt me really :)

KillTheMule avatar Dec 08 '19 10:12 KillTheMule

I assumed you'd have some higher-level protocol around it, such as HTTP, to know how large the message is. Even if you use raw streams, you could wrap it in <length><data> format.

If you just rely on io::Read reading only as much as it needs, that still can be used with async context: spawn read on a thread, and use mpsc to feed it data. I've done this the opposite way for making io::Write async: https://stackoverflow.com/a/55764246/27009

kornelski avatar Dec 08 '19 11:12 kornelski

I assumed you'd have some higher-level protocol around it, such as HTTP, to know how large the message is. Even if you use raw streams, you could wrap it in format.

No, I have to deal with the raw messages, and I don't think neovim will change the format for me :)

If you just rely on io::Read reading only as much as it needs, that still can be used with async context: spawn read on a thread, and use mpsc to feed it data. I've done this the opposite way for making io::Write async: https://stackoverflow.com/a/55764246/27009

That's basically where I'm coming from, using an extra thread. But it's sort of "trivial" that you don't need async if you "just" spawn threads. I wanted to abstract away from that, and leave that decision to the runtime that the user of my lib chooses (even though right now it seems I can only work with tokio for other reasons).

KillTheMule avatar Dec 08 '19 11:12 KillTheMule

I have a similar interest for AsyncRead/AsyncWrite support.

I assumed you'd have some higher-level protocol around it, such as HTTP, to know how large the message is. Even if you use raw streams, you could wrap it in <length><data> format.

I'm probably going to go down this path, but it seems wasteful when msgpack already encodes value lengths. I did experiment with wiring in async support though. In case it's helpful, here's a 100% untested, very questionable implementation of async decoding (haven't gotten around to encoding):

use std::cmp::min;
use std::io;

use rmp::Marker;
use rmp::decode;
use rmpv::{Utf8String, Value};
use rmpv::decode::Error;
use tokio::io::{AsyncRead, AsyncReadExt};
use futures::future::{BoxFuture, FutureExt};


// See https://github.com/3Hren/msgpack-rust/issues/151
const PREALLOC_MAX: usize = 64 * 1024; // 64 KiB


macro_rules! read_fixed {
    ($size:expr, $rd:expr, $next:expr) => {
        {
            let mut bytes = Vec::with_capacity($size);
            $rd.read_exact(&mut bytes).await.map_err(|err| Error::InvalidDataRead(err))?;
            $next(&mut bytes.as_slice())
        }
    };
}


async fn read_array_data<R>(rd: &mut R, mut len: usize) -> Result<Vec<Value>, Error>
where R: AsyncRead + Unpin + Send + 'static
{
    // Note: Do not preallocate a Vec of size `len`.
    // See https://github.com/3Hren/msgpack-rust/issues/151
    let mut vec = Vec::new();

    while len > 0 {
        vec.push(read_value(rd).await?);
        len -= 1;
    }

    Ok(vec)
}

async fn read_map_data<R>(rd: &mut R, mut len: usize) -> Result<Vec<(Value, Value)>, Error>
where R: AsyncRead + Unpin + Send + 'static
{
    // Note: Do not preallocate a Vec of size `len`.
    // See https://github.com/3Hren/msgpack-rust/issues/151
    let mut vec = Vec::new();

    while len > 0 {
        vec.push((read_value(rd).await?, read_value(rd).await?));
        len -= 1;
    }

    Ok(vec)
}

async fn read_str_data<R>(rd: &mut R, len: usize) -> Result<Utf8String, Error>
where R: AsyncRead + Unpin + Send + 'static
{
    match String::from_utf8(read_bin_data(rd, len).await?) {
        Ok(s) => Ok(Utf8String::from(s)),
        Err(err) => Ok(Utf8String::from(format!("{}", err.utf8_error()))) // NOTE: divergence from what rmpv does here, hopefully it's ok
    }
}

async fn read_bin_data<R>(rd: &mut R, len: usize) -> Result<Vec<u8>, Error>
where R: AsyncRead + Unpin + Send + 'static
{
    let mut buf = Vec::with_capacity(min(len, PREALLOC_MAX));
    let bytes_read = rd.take(len as u64).read_to_end(&mut buf).await.map_err(Error::InvalidDataRead)?;
    if bytes_read != len {
        return Err(Error::InvalidDataRead(io::Error::new(
            io::ErrorKind::UnexpectedEof,
            format!("Expected {} bytes, read {} bytes", len, bytes_read),
        )));
    }

    Ok(buf)
}

async fn read_ext_body<R>(rd: &mut R, len: usize) -> Result<(i8, Vec<u8>), Error>
where R: AsyncRead + Unpin + Send + 'static
{
    let ty = read_fixed!(1, rd, decode::read_data_i8)?;
    let vec = read_bin_data(rd, len).await?;
    Ok((ty, vec))
}

/// Attempts to read bytes from the given reader and interpret them as a `Value`.
///
/// # Errors
///
/// This function will return `Error` on any I/O error while either reading or decoding a `Value`.
/// All instances of `ErrorKind::Interrupted` are handled by this function and the underlying
/// operation is retried.
pub fn read_value<'a, R>(rd: &'a mut R) -> BoxFuture<'a, Result<Value, Error>>
where R: AsyncRead + Unpin + Send + 'static
{
    async move {
        let val = match read_fixed!(1, rd, decode::read_marker)? {
            Marker::Null => Value::Nil,
            Marker::True => Value::Boolean(true),
            Marker::False => Value::Boolean(false),
            Marker::FixPos(val) => Value::from(val),
            Marker::FixNeg(val) => Value::from(val),
            Marker::U8 => Value::from(read_fixed!(1, rd, decode::read_data_u8)?),
            Marker::U16 => Value::from(read_fixed!(2, rd, decode::read_data_u16)?),
            Marker::U32 => Value::from(read_fixed!(4, rd, decode::read_data_u32)?),
            Marker::U64 => Value::from(read_fixed!(8, rd, decode::read_data_u64)?),
            Marker::I8 => Value::from(read_fixed!(1, rd, decode::read_data_i8)?),
            Marker::I16 => Value::from(read_fixed!(2, rd, decode::read_data_i16)?),
            Marker::I32 => Value::from(read_fixed!(4, rd, decode::read_data_i32)?),
            Marker::I64 => Value::from(read_fixed!(8, rd, decode::read_data_i64)?),
            Marker::F32 => Value::F32(read_fixed!(4, rd, decode::read_data_f32)?),
            Marker::F64 => Value::F64(read_fixed!(8, rd, decode::read_data_f64)?),
            Marker::FixStr(len) => {
                let res = read_str_data(rd, len as usize).await?;
                Value::String(res)
            }
            Marker::Str8 => {
                let len = read_fixed!(1, rd, decode::read_data_u8)?;
                let res = read_str_data(rd, len as usize).await?;
                Value::String(res)
            }
            Marker::Str16 => {
                let len = read_fixed!(2, rd, decode::read_data_u16)?;
                let res = read_str_data(rd, len as usize).await?;
                Value::String(res)
            }
            Marker::Str32 => {
                let len = read_fixed!(4, rd, decode::read_data_u32)?;
                let res = read_str_data(rd, len as usize).await?;
                Value::String(res)
            }
            Marker::FixArray(len) => {
                let vec = read_array_data(rd, len as usize).await?;
                Value::Array(vec)
            }
            Marker::Array16 => {
                let len = read_fixed!(2, rd, decode::read_data_u16)?;
                let vec = read_array_data(rd, len as usize).await?;
                Value::Array(vec)
            }
            Marker::Array32 => {
                let len = read_fixed!(4, rd, decode::read_data_u32)?;
                let vec = read_array_data(rd, len as usize).await?;
                Value::Array(vec)
            }
            Marker::FixMap(len) => {
                let map = read_map_data(rd, len as usize).await?;
                Value::Map(map)
            }
            Marker::Map16 => {
                let len = read_fixed!(2, rd, decode::read_data_u16)?;
                let map = read_map_data(rd, len as usize).await?;
                Value::Map(map)
            }
            Marker::Map32 => {
                let len = read_fixed!(4, rd, decode::read_data_u32)?;
                let map = read_map_data(rd, len as usize).await?;
                Value::Map(map)
            }
            Marker::Bin8 => {
                let len = read_fixed!(1, rd, decode::read_data_u8)?;
                let vec = read_bin_data(rd, len as usize).await?;
                Value::Binary(vec)
            }
            Marker::Bin16 => {
                let len = read_fixed!(2, rd, decode::read_data_u16)?;
                let vec = read_bin_data(rd, len as usize).await?;
                Value::Binary(vec)
            }
            Marker::Bin32 => {
                let len = read_fixed!(4, rd, decode::read_data_u32)?;
                let vec = read_bin_data(rd, len as usize).await?;
                Value::Binary(vec)
            }
            Marker::FixExt1 => {
                let len = 1 as usize;
                let (ty, vec) = read_ext_body(rd, len).await?;
                Value::Ext(ty, vec)
            }
            Marker::FixExt2 => {
                let len = 2 as usize;
                let (ty, vec) = read_ext_body(rd, len).await?;
                Value::Ext(ty, vec)
            }
            Marker::FixExt4 => {
                let len = 4 as usize;
                let (ty, vec) = read_ext_body(rd, len).await?;
                Value::Ext(ty, vec)
            }
            Marker::FixExt8 => {
                let len = 8 as usize;
                let (ty, vec) = read_ext_body(rd, len).await?;
                Value::Ext(ty, vec)
            }
            Marker::FixExt16 => {
                let len = 16 as usize;
                let (ty, vec) = read_ext_body(rd, len).await?;
                Value::Ext(ty, vec)
            }
            Marker::Ext8 => {
                let len = read_fixed!(1, rd, decode::read_data_u8)? as usize;
                let (ty, vec) = read_ext_body(rd, len).await?;
                Value::Ext(ty, vec)
            }
            Marker::Ext16 => {
                let len = read_fixed!(2, rd, decode::read_data_u16)? as usize;
                let (ty, vec) = read_ext_body(rd, len).await?;
                Value::Ext(ty, vec)
            }
            Marker::Ext32 => {
                let len = read_fixed!(4, rd, decode::read_data_u32)? as usize;
                let (ty, vec) = read_ext_body(rd, len).await?;
                Value::Ext(ty, vec)
            }
            Marker::Reserved => Value::Nil,
        };

        Ok(val)
    }.boxed()
}

ysimonson avatar Jul 25 '20 15:07 ysimonson