async-compression
async-compression copied to clipboard
(Question) How to sync underlying file correctly?
I have a GzipEncoder<File> and I need to finalize it.
I'd like to call sync_all() on it. But before I do that I think I need to finalize encoding first. As far as I understand it is done by calling .shutdown() on the encoder. But the problem is that it will also call .shutdown() on the underlying writer's .shutdown() (link)
I believe it's incorrect to call anything that potentially writes to a file after it was shutdown, so I'm in an egg vs chicken kind of situation right here.
Sorry for posting this as an issue, I just don't see any Discussions/forum to ask questions.
You can call poll_flush to flush the data, then sync_all to sync the data and metadata before shutting it down.
Either I'm missing something or what you're proposing is incorrect.
In order to sync the file properly everything has to be written to it before calling sync_all - it means that encoder needs to be finished (not just flushed).
From what I see in public API the only way to finish encoding is to call .shutdown(), but it will also call shutdown on the underlying file (which might be a problem).
For files specifically this might not be a problem, because tokio::fs::File::shutdown is essentially the same as flush link.
It just seems like the API forces to break shutdown contract "do not read/write after shutdown".
You could have an intermediate wrapper that doesn't forward shutdown to the underlying file, just flushes it and reports that it has been shutdown.
GzipEncoder<NoShutdown<File>>
Nice, this seems like an actual solution.
Or you can change it to sync_all on shutdown
struct SyncOnShutdownFile {
file: File,
// JoinHandle is fused internally
sync_all: Option<JoinHandle<io::Result<()>>>,
}
impl AsyncWrite {
fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
let Self { file, sync_all } = Pin::into_inner(self);
ready!(file.poll_flush(cx));
if sync_all.is_none() {
let file = file.try_clone()?.into_std();
// currently tokio internally spawns, if io-uring is used then it's a different story
*sync_all = tokio::task::spawn_blocking(|| f.sync_all());
}
ready!(Pin::new(sync_all.as_mut().unwrap())).map_err(io::Error::from)??;
file.poll_shutdown(cx)
}
}
I stumbled upon this issue, trying to find an explanation to what I am observing. It is difficult to reproduce, and so it is all speculation at this point, but I would appreciate any input. I basically have a two-step process: first I uncompress data (Brotli) into a file and then I read the file for further processing. It seems to be the case that the file is occasionally incomplete.
Is it sufficient to call shutdown, or does one have to do something else to ensure the file is safe to read?
let file = File::create(path).await?;
let mut decoder = BrotliDecoder::new(file);
decoder.write_all(data).await?;
decoder.shutdown().await?;
Is it sufficient to call shutdown, or does one have to do something else to ensure the file is safe to read?
Might need to call .flush().await, IIRC not every decoder flush on shutdown
Hm, interesting. In my case , it is Brotli and Tokio. Would you please be able to confirm by checking the implementation?
Reading Tokio's documentation:
Invocation of a shutdown implies an invocation of flush. Once this method returns Ready it implies that a flush successfully happened before the shutdown happened. That is, callers don’t need to call flush before calling shutdown. They can rely that by calling shutdown any pending buffered data will be written out.
https://docs.rs/crate/async-compression/latest/source/src/tokio/write/generic/decoder.rs#168
For brotli it does flush on shutdown
first I uncompress data (Brotli) into a file and then I read the file for further processing. It seems to be the case that the file is occasionally incomplete.
If it is in different thread/process, then maybe a synchrononisation issue?
Thank you for checking! Then I can assume it is not here and move on to other ideas. It is a file server, and so yes, there could be other threads trying to access the same file. I probably messed up something with mutual exclusion to shared resources.