datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

"Unknown frame descriptor" for ZSTD data.

Open Smotrov opened this issue 1 year ago โ€ข 2 comments

Describe the bug

When reading a partition of big NDJSON files compressed with ZSTD there is an error appears. Error: Custom { kind: Other, error: External(ArrowError(ExternalError(IoError(Custom { kind: Other, error: "Unknown frame descriptor" })), None)) } Meanwhile all files could be easily decompressed with ZSTD terminal utility.

To Reproduce

Try to access big ZSTD files with multiple frames.

Expected behavior

Would be good if it could read it.

Additional context

Same was fixed in Hadoop

Smotrov avatar May 09 '24 13:05 Smotrov

Thanks for the report -- can you possiblly share an example of such a file (or instructions for how to create one)?

alamb avatar May 09 '24 17:05 alamb

Here is an example file data.zst.json

And the code, which shows that the file could be perfectly decoded with async_compression which is used in DataFusion. Meanwhile it could not be used to read as DataFrame.

use arrow::datatypes::{Field, Schema};
use datafusion::common::arrow::datatypes::{DataType, TimeUnit};
use datafusion::datasource::file_format::options::NdJsonReadOptions;
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
use datafusion::prelude::*;
use std::io::Error;
use datafusion::error::Result;
use async_compression::tokio::bufread::ZstdDecoder;
use tokio::io::AsyncReadExt;

const FILE_PATH: &str = "data.zst";

#[tokio::main]
async fn main() -> Result<(), Error>  {


    // read file with tokio and create a StreamReader
    let file = tokio::fs::File::open(FILE_PATH).await?;
    let mut reader = ZstdDecoder::new(tokio::io::BufReader::new(file));

    let mut buf = vec![];
    reader.read_to_end(&mut buf).await?;
    
    println!("๐Ÿ“ฆ Read {} bytes", buf.len());


    let schema = Schema::new(vec![
        Field::new("OriginalRequest", DataType::Utf8, false),
        Field::new(
            "RequestStarted",
            DataType::Timestamp(TimeUnit::Millisecond, None),
            false,
        ),
    ]);

    // Create context
    let ctx = SessionContext::new();

    // Read data
    let json_options = NdJsonReadOptions::default()
        .file_extension("zst")
        .file_compression_type(FileCompressionType::ZSTD)
        .schema(&schema);
    let df = ctx.read_json(FILE_PATH, json_options).await?;

    println!("๐Ÿคจ Hello, ZStd issue!");
    df.show_limit(10).await?;
    
    Ok(())
}

Smotrov avatar May 09 '24 18:05 Smotrov

Thank you @Smotrov ๐Ÿ™

alamb avatar May 10 '24 13:05 alamb

Given we now have a good reproducer on this issue I think it is ready for someone to take a look if they have time

alamb avatar May 10 '24 13:05 alamb

The issue with the way how DataFusion drakes the file into pieces.

let session_config = SessionConfig::new().with_repartition_file_scans(false);
let ctx = SessionContext::new_with_config(session_config);

I've found that if repartition is disabled, it works flawlessly.

So I suspect something is wrong here in case of ZStd. https://github.com/apache/datafusion/blob/9f0e0164c73c834260f842f0ee942593707730bd/datafusion/core/src/datasource/physical_plan/json.rs#L164

After splitting the file into 10 slices it does decodes some of them, but fails with the others.

Smotrov avatar May 10 '24 14:05 Smotrov

Nice find!

It seems like the current code disables repartitioning for gzip:

https://github.com/apache/datafusion/blob/9f0e0164c73c834260f842f0ee942593707730bd/datafusion/core/src/datasource/physical_plan/json.rs#L157-L159

Maybe we have to do something similar for zstd and other compression types ๐Ÿค”

alamb avatar May 10 '24 15:05 alamb