arrow icon indicating copy to clipboard operation
arrow copied to clipboard

[R][Python] Partitioned Datasets are not correctly written if partitions differ by case

Open lgaborini opened this issue 2 years ago • 2 comments

Describe the bug, including details regarding any error messages, version, and platform.

With {arrow} 14.0.0, I was able to import a large number of CSVs, merge the schemas, establish a partitioning and write them with arrow::write_dataset().

The dataset can be manipulated in-memory and queried without issues, but most queries fail once the dataset is read from disk. The number of rows is also different.

The issue arises when the partitioning variable differs just by capitalization. On Windows the file system is case-insensitive but {arrow} treats partitions as different, so the Parquet files end up corrupted.

R reprex

https://github.com/apache/arrow/issues/38804#issuecomment-1824165163

Python reprex

import pyarrow as pa
import pyarrow.dataset as ds

tbl = pa.table({
    'var_1': ("arrow", "arrow", "arrow"),
    'var_2': ("arrow", "arrow", "arroW")
})

part = ds.partitioning(
    pa.schema([("var_2", pa.string())]), 
    flavor="hive"
)

ds.write_dataset(tbl, "dataset_output", format="parquet", partitioning=part)

ds_out = ds.dataset("dataset_output")

Only one partition is written:

ds_out.files
# ['dataset_output/var_2=arroW/part-0.parquet']
ds_out.to_table()
# pyarrow.Table     
# var_1: string     
# ----
# var_1: [["arrow"]]

This is arrow::arrow_info():

arrow::arrow_info()
#> Arrow package version: 14.0.0
#> 
#> Capabilities:
#>                
#> acero      TRUE
#> dataset    TRUE
#> substrait FALSE
#> parquet    TRUE
#> json       TRUE
#> s3         TRUE
#> gcs        TRUE
#> utf8proc   TRUE
#> re2        TRUE
#> snappy     TRUE
#> gzip       TRUE
#> brotli     TRUE
#> zstd       TRUE
#> lz4        TRUE
#> lz4_frame  TRUE
#> lzo       FALSE
#> bz2        TRUE
#> jemalloc  FALSE
#> mimalloc   TRUE
#> 
#> Arrow options():
#>                        
#> arrow.use_threads FALSE
#> 
#> Memory:
#>                   
#> Allocator mimalloc
#> Current    0 bytes
#> Max        0 bytes
#> 
#> Runtime:
#>                         
#> SIMD Level          avx2
#> Detected SIMD Level avx2
#> 
#> Build:
#>                                                              
#> C++ Library Version                                    14.0.0
#> C++ Compiler                                              GNU
#> C++ Compiler Version                                   10.3.0
#> Git ID               2dcee3f82c6cf54b53a64729fd81840efa583244

Component(s)

R

lgaborini avatar Nov 20 '23 17:11 lgaborini

Updating the issue: it has something to do with partitioning on a case-sensitive field on a case-insensitive filesystem (Windows).
The behavior also changes depending on compression and dictionary encoding.

Also, by explicitly setting options(arrow.skip_nul = TRUE) (or also FALSE) does not seem to improve anything.

Reprex

I am using this data frame, partitioning is on var_2:


tbl_input <- tibble::tibble(
   var_1 = c("arrow", "arrow", "arrow"),
   var_2 = c("arrow", "arrow", "arroW")
)
tbl_input
#> # A tibble: 3 × 2
#>   var_1 var_2
#>   <chr> <chr>
#> 1 arrow arrow
#> 2 arrow arrow
#> 3 arrow arroW

No compression, dictionary-encoding

Writing and re-reading with these options:

arrow::write_dataset(
   dataset = tbl_input,
   path = f_dataset_merged,
   partitioning = "var_2",
   compression = "uncompressed",
   use_dictionary = TRUE
)

tbl_written <- arrow::open_dataset(
   sources = f_dataset_merged,
   partitioning = arrow::hive_partition()
)

No errors while writing or reading, but results change from run to run:

tbl_written |>
   dplyr::collect()
#> # A tibble: 2 × 2
#>   var_1 var_2
#>   <chr> <chr>
#> 1 arrow arroW
#> 2 arrow arroW

This is the most common output:


tbl_written |>
   dplyr::collect()
#> # A tibble: 1 × 2
#>   var_1 var_2
#>   <chr> <chr>
#> 1 arrow arroW

Sometimes I get:


tbl_written |>
   dplyr::collect()
#> # A tibble: 2 × 2
#>   var_1 var_2
#>   <chr> <chr>
#> 1 arrow arroW
#> 2 arrow arroW

No compression, no dictionary-encoding

Writing with these options:


arrow::write_dataset(
   dataset = tbl_input,
   path = f_dataset_merged,
   partitioning = "var_2",
   compression = "uncompressed",
   use_dictionary = FALSE
)

Reading either fails:


tbl_written <- arrow::open_dataset(
   sources = f_dataset_merged,
   partitioning = arrow::hive_partition()
)
#> Error in `arrow::open_dataset()`:
#> ! IOError: Error creating dataset. Could not read schema from '/Temp/RtmpEfdPRA/file808821056880/var_2=arroW/part-0.parquet'. Is this a 'parquet' file?: Could not open Parquet input source '/Temp/RtmpEfdPRA/file808821056880/var_2=arroW/part-0.parquet': Couldn't deserialize thrift: TProtocolException: Invalid data

Or succeeds, but the next collect() call fails:


tbl_written |>
   dplyr::collect()
#> Error in `compute.Dataset()`:
#> ! IOError: Unexpected end of stream

Or everything succeeds:

tbl_written |>
   dplyr::collect()
#> # A tibble: 2 × 2
#>   var_1 var_2
#>   <chr> <chr>
#> 1 arrow arroW
#> 2 arrow arroW

Snappy compression, no dictionary-encoding

Writing with these options:

arrow::write_dataset(
   dataset = tbl_input,
   path = f_dataset_merged,
   partitioning = "var_2",
   compression = "snappy",
   use_dictionary = FALSE
)

This is the most common error:

#> Error in `arrow::open_dataset()`:
#> ! IOError: Error creating dataset. Could not read schema from '/Temp/RtmpSMXExe/file5d645de7595e/var_2=arroW/part-0.parquet'. Is this a 'parquet' file?: Could not open Parquet input source '/Temp/RtmpSMXExe/file5d645de7595e/var_2=arroW/part-0.parquet': Couldn't deserialize thrift: don't know what type: 

Can also get:


tbl_written |>
   dplyr::collect()
#> Error in `compute.Dataset()`:
#> ! IOError: Couldn't deserialize thrift: No more data to read.
#> Deserializing page header failed.

or:

tbl_written |>
   dplyr::collect()
#> Error in `compute.Dataset()`:
#> ! IOError: Unexpected end of stream

Sometimes it even succeeds.

Snappy compression, dictionary encoding

Writing with these options:

arrow::write_dataset(
   dataset = tbl_input,
   path = f_dataset_merged,
   partitioning = "var_2",
   compression = "snappy",
   use_dictionary = TRUE
)

Reading always succeeds but the which rows are read is unpredictable:

tbl_written |>
   dplyr::collect()
#> # A tibble: 2 × 2
#>   var_1 var_2
#>   <chr> <chr>
#> 1 arrow arroW
#> 2 arrow arroW

or

tbl_written |>
   dplyr::collect()
#> # A tibble: 1 × 2
#>   var_1 var_2
#>   <chr> <chr>
#> 1 arrow arroW

or even

tbl_written |>
   dplyr::collect()
#> # A tibble: 1 × 2
#>   var_1 var_2
#>   <chr> <chr>
#> 1 arrow arrow

Proposals

I would expect:

  1. some warning when partitions collide due to difference in capitalization, or silent merging
  2. if the partitioned variable is stored in the Parquets (as per 14.0.1, right?), the partitioning might just be used for queries, but the reading to be truly lossless (so Hive information should be discarded, not added back to the Table).
  3. I expect a correct behavior on Linux/OSX since case-sensitivity is not an issue

If everything must be OS-independent, probably the easiest mitigation is to avoid having partitions that differ just in capitalization. (I also wonder what happens when illegal characters are used)

lgaborini avatar Nov 23 '23 10:11 lgaborini

I didn't use partition, yet got OSError: Couldn't deserialize thrift: TProtocolException: Invalid data Deserializing page header failed.

example data: data.zip

I write this file by pandas. Is this the same reason ?

eromoe avatar Jul 02 '24 07:07 eromoe