async-std
async-std copied to clipboard
Some data is dropped on buffer boundary crossing when implementing minimal custom Stream of Lines over BufReader.
I tried to implement minimalist Stream over lines in a BufReader using read_line method, but it drops some data when boundary of its buffer is crossed.
Using async_std::io::Lines does not reproduce the issue, as well as calling read_line in a loop does not reproduce it either.
Here is the code:
use std::{
pin::Pin,
task::{Context, Poll},
};
use async_std::{
fs::File,
io::{prelude::*, stdout, BufReader, Result},
prelude::*,
stream::Stream,
task,
};
fn main() -> Result<()> {
task::block_on(run())
}
async fn run() -> Result<()> {
let input = BufReader::with_capacity(284, File::open("pattern.txt").await?);
let mut lines = Lines::new(input);
while let Some(line) = lines.next().await {
stdout().write_all(line?.as_bytes()).await?;
}
Ok(())
}
pub struct Lines {
file: BufReader<File>,
}
impl Lines {
pub fn new(file: BufReader<File>) -> Self {
Self { file }
}
}
impl Stream for Lines {
type Item = Result<String>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut buf = String::new();
let mut line = self.file.read_line(&mut buf);
match Pin::new(&mut line).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e.into()))),
Poll::Ready(Ok(0)) => Poll::Ready(None),
Poll::Ready(Ok(_)) => Poll::Ready(Some(Ok(buf))),
}
}
}
Here is the content of the file patttern.txt:
00 | 0123456789-ABCDEFGHIJKLMNOPQRSTUVWXYZ+abcdefghijklmnopqrstuvwxyz
01 | 0123456789-ABCDEFGHIJKLMNOPQRSTUVWXYZ+abcdefghijklmnopqrstuvwxyz
02 | 0123456789-ABCDEFGHIJKLMNOPQRSTUVWXYZ+abcdefghijklmnopqrstuvwxyz
03 | 0123456789-ABCDEFGHIJKLMNOPQRSTUVWXYZ+abcdefghijklmnopqrstuvwxyz
04 | 0123456789-ABCDEFGHIJKLMNOPQRSTUVWXYZ+abcdefghijklmnopqrstuvwxyz
05 | 0123456789-ABCDEFGHIJKLMNOPQRSTUVWXYZ+abcdefghijklmnopqrstuvwxyz
06 | 0123456789-ABCDEFGHIJKLMNOPQRSTUVWXYZ+abcdefghijklmnopqrstuvwxyz
07 | 0123456789-ABCDEFGHIJKLMNOPQRSTUVWXYZ+abcdefghijklmnopqrstuvwxyz
08 | 0123456789-ABCDEFGHIJKLMNOPQRSTUVWXYZ+abcdefghijklmnopqrstuvwxyz
09 | 0123456789-ABCDEFGHIJKLMNOPQRSTUVWXYZ+abcdefghijklmnopqrstuvwxyz
Here is the output of the sample program:
00 | 0123456789-ABCDEFGHIJKLMNOPQRSTUVWXYZ+abcdefghijklmnopqrstuvwxyz
01 | 0123456789-ABCDEFGHIJKLMNOPQRSTUVWXYZ+abcdefghijklmnopqrstuvwxyz
02 | 0123456789-ABCDEFGHIJKLMNOPQRSTUVWXYZ+abcdefghijklmnopqrstuvwxyz
03 | 0123456789-ABCDEFGHIJKLMNOPQRSTUVWXYZ+abcdefghijklmnopqrstuvwxyz
0123456789-ABCDEFGHIJKLMNOPQRSTUVWXYZ+abcdefghijklmnopqrstuvwxyz
05 | 0123456789-ABCDEFGHIJKLMNOPQRSTUVWXYZ+abcdefghijklmnopqrstuvwxyz
06 | 0123456789-ABCDEFGHIJKLMNOPQRSTUVWXYZ+abcdefghijklmnopqrstuvwxyz
07 | 0123456789-ABCDEFGHIJKLMNOPQRSTUVWXYZ+abcdefghijklmnopqrstuvwxyz
3456789-ABCDEFGHIJKLMNOPQRSTUVWXYZ+abcdefghijklmnopqrstuvwxyz
09 | 0123456789-ABCDEFGHIJKLMNOPQRSTUVWXYZ+abcdefghijklmnopqrstuvwxyz
Notes:
- Length of a single line is 70 bytes.
- Buffer size is set to 284 bytes (4 lines + 4 bytes).
- When buffer boundary is crossed (after first 4 lines), next 4 bytes are dropped (the remainder of the buffer) and following line contains only remaining 66 bytes.
Update: After reading this book I've figured out that code above is incorrect and it drops futures half-way this skipping some data which were already read from the file but not yet delivered to the receiving code. Writing streams from scratch is very inconvenient, thanks to async-stream crate which makes it easy.
So, correct and working version looks like
use async_std::{
fs::File,
io::{prelude::*, stdout, BufReader, Result},
prelude::*,
stream::Stream,
task,
};
use async_stream::stream;
use pin_utils::pin_mut;
// ---
fn main() -> Result<()> {
task::block_on(run())
}
async fn run() -> Result<()> {
let input = BufReader::with_capacity(284, File::open("pattern.txt").await?);
let lines = lines(input);
pin_mut!(lines);
while let Some(line) = lines.next().await {
stdout().write_all(line?.as_bytes()).await?;
}
Ok(())
}
// ---
fn lines<R: Read + Unpin>(mut reader: BufReader<R>) -> impl Stream<Item = Result<String>> {
stream! {
let mut buf = String::new();
loop{
match reader.read_line(&mut buf).await {
Ok(0) => break,
Ok(_) => {
yield Ok(std::mem::replace(&mut buf, String::new()));
}
Err(e) => {
yield Err(e);
}
}
}
}
}
But it is a good example of how it is possible to easily get a behavior which looks like an undefined behavior (at least it is definitely undesired) without deep understanding of how futures work under the hood.
Rust was designed to do its best during compile time to avoid such situations.
So, I would suggest to think about how to improve type system of the library code to make it more strict on detecting similar issues at compile time.