"Unknown frame descriptor" for ZSTD data.
Describe the bug
When reading a partition of big NDJSON files compressed with ZSTD there is an error appears.
Error: Custom { kind: Other, error: External(ArrowError(ExternalError(IoError(Custom { kind: Other, error: "Unknown frame descriptor" })), None)) }
Meanwhile all files could be easily decompressed with ZSTD terminal utility.
To Reproduce
Try to access big ZSTD files with multiple frames.
Expected behavior
Would be good if it could read it.
Additional context
Same was fixed in Hadoop
Thanks for the report -- can you possiblly share an example of such a file (or instructions for how to create one)?
Here is an example file data.zst.json
And the code, which shows that the file could be perfectly decoded with async_compression which is used in DataFusion. Meanwhile it could not be used to read as DataFrame.
use arrow::datatypes::{Field, Schema};
use datafusion::common::arrow::datatypes::{DataType, TimeUnit};
use datafusion::datasource::file_format::options::NdJsonReadOptions;
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
use datafusion::prelude::*;
use std::io::Error;
use datafusion::error::Result;
use async_compression::tokio::bufread::ZstdDecoder;
use tokio::io::AsyncReadExt;
const FILE_PATH: &str = "data.zst";
#[tokio::main]
async fn main() -> Result<(), Error> {
// read file with tokio and create a StreamReader
let file = tokio::fs::File::open(FILE_PATH).await?;
let mut reader = ZstdDecoder::new(tokio::io::BufReader::new(file));
let mut buf = vec![];
reader.read_to_end(&mut buf).await?;
println!("๐ฆ Read {} bytes", buf.len());
let schema = Schema::new(vec![
Field::new("OriginalRequest", DataType::Utf8, false),
Field::new(
"RequestStarted",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
]);
// Create context
let ctx = SessionContext::new();
// Read data
let json_options = NdJsonReadOptions::default()
.file_extension("zst")
.file_compression_type(FileCompressionType::ZSTD)
.schema(&schema);
let df = ctx.read_json(FILE_PATH, json_options).await?;
println!("๐คจ Hello, ZStd issue!");
df.show_limit(10).await?;
Ok(())
}
Thank you @Smotrov ๐
Given we now have a good reproducer on this issue I think it is ready for someone to take a look if they have time
The issue with the way how DataFusion drakes the file into pieces.
let session_config = SessionConfig::new().with_repartition_file_scans(false);
let ctx = SessionContext::new_with_config(session_config);
I've found that if repartition is disabled, it works flawlessly.
So I suspect something is wrong here in case of ZStd. https://github.com/apache/datafusion/blob/9f0e0164c73c834260f842f0ee942593707730bd/datafusion/core/src/datasource/physical_plan/json.rs#L164
After splitting the file into 10 slices it does decodes some of them, but fails with the others.
Nice find!
It seems like the current code disables repartitioning for gzip:
https://github.com/apache/datafusion/blob/9f0e0164c73c834260f842f0ee942593707730bd/datafusion/core/src/datasource/physical_plan/json.rs#L157-L159
Maybe we have to do something similar for zstd and other compression types ๐ค