[R][Python] Partitioned Datasets are not correctly written if partitions differ by case
Describe the bug, including details regarding any error messages, version, and platform.
With {arrow} 14.0.0, I was able to import a large number of CSVs, merge the schemas, establish a partitioning and write them with arrow::write_dataset().
The dataset can be manipulated in-memory and queried without issues, but most queries fail once the dataset is read from disk. The number of rows is also different.
The issue arises when the partitioning variable differs just by capitalization. On Windows the file system is case-insensitive but {arrow} treats partitions as different, so the Parquet files end up corrupted.
R reprex
https://github.com/apache/arrow/issues/38804#issuecomment-1824165163
Python reprex
import pyarrow as pa
import pyarrow.dataset as ds
tbl = pa.table({
'var_1': ("arrow", "arrow", "arrow"),
'var_2': ("arrow", "arrow", "arroW")
})
part = ds.partitioning(
pa.schema([("var_2", pa.string())]),
flavor="hive"
)
ds.write_dataset(tbl, "dataset_output", format="parquet", partitioning=part)
ds_out = ds.dataset("dataset_output")
Only one partition is written:
ds_out.files
# ['dataset_output/var_2=arroW/part-0.parquet']
ds_out.to_table()
# pyarrow.Table
# var_1: string
# ----
# var_1: [["arrow"]]
This is arrow::arrow_info():
arrow::arrow_info()
#> Arrow package version: 14.0.0
#>
#> Capabilities:
#>
#> acero TRUE
#> dataset TRUE
#> substrait FALSE
#> parquet TRUE
#> json TRUE
#> s3 TRUE
#> gcs TRUE
#> utf8proc TRUE
#> re2 TRUE
#> snappy TRUE
#> gzip TRUE
#> brotli TRUE
#> zstd TRUE
#> lz4 TRUE
#> lz4_frame TRUE
#> lzo FALSE
#> bz2 TRUE
#> jemalloc FALSE
#> mimalloc TRUE
#>
#> Arrow options():
#>
#> arrow.use_threads FALSE
#>
#> Memory:
#>
#> Allocator mimalloc
#> Current 0 bytes
#> Max 0 bytes
#>
#> Runtime:
#>
#> SIMD Level avx2
#> Detected SIMD Level avx2
#>
#> Build:
#>
#> C++ Library Version 14.0.0
#> C++ Compiler GNU
#> C++ Compiler Version 10.3.0
#> Git ID 2dcee3f82c6cf54b53a64729fd81840efa583244
Component(s)
R
Updating the issue: it has something to do with partitioning on a case-sensitive field on a case-insensitive filesystem (Windows).
The behavior also changes depending on compression and dictionary encoding.
Also, by explicitly setting options(arrow.skip_nul = TRUE) (or also FALSE) does not seem to improve anything.
Reprex
I am using this data frame, partitioning is on var_2:
tbl_input <- tibble::tibble(
var_1 = c("arrow", "arrow", "arrow"),
var_2 = c("arrow", "arrow", "arroW")
)
tbl_input
#> # A tibble: 3 × 2
#> var_1 var_2
#> <chr> <chr>
#> 1 arrow arrow
#> 2 arrow arrow
#> 3 arrow arroW
No compression, dictionary-encoding
Writing and re-reading with these options:
arrow::write_dataset(
dataset = tbl_input,
path = f_dataset_merged,
partitioning = "var_2",
compression = "uncompressed",
use_dictionary = TRUE
)
tbl_written <- arrow::open_dataset(
sources = f_dataset_merged,
partitioning = arrow::hive_partition()
)
No errors while writing or reading, but results change from run to run:
tbl_written |>
dplyr::collect()
#> # A tibble: 2 × 2
#> var_1 var_2
#> <chr> <chr>
#> 1 arrow arroW
#> 2 arrow arroW
This is the most common output:
tbl_written |>
dplyr::collect()
#> # A tibble: 1 × 2
#> var_1 var_2
#> <chr> <chr>
#> 1 arrow arroW
Sometimes I get:
tbl_written |>
dplyr::collect()
#> # A tibble: 2 × 2
#> var_1 var_2
#> <chr> <chr>
#> 1 arrow arroW
#> 2 arrow arroW
No compression, no dictionary-encoding
Writing with these options:
arrow::write_dataset(
dataset = tbl_input,
path = f_dataset_merged,
partitioning = "var_2",
compression = "uncompressed",
use_dictionary = FALSE
)
Reading either fails:
tbl_written <- arrow::open_dataset(
sources = f_dataset_merged,
partitioning = arrow::hive_partition()
)
#> Error in `arrow::open_dataset()`:
#> ! IOError: Error creating dataset. Could not read schema from '/Temp/RtmpEfdPRA/file808821056880/var_2=arroW/part-0.parquet'. Is this a 'parquet' file?: Could not open Parquet input source '/Temp/RtmpEfdPRA/file808821056880/var_2=arroW/part-0.parquet': Couldn't deserialize thrift: TProtocolException: Invalid data
Or succeeds, but the next collect() call fails:
tbl_written |>
dplyr::collect()
#> Error in `compute.Dataset()`:
#> ! IOError: Unexpected end of stream
Or everything succeeds:
tbl_written |>
dplyr::collect()
#> # A tibble: 2 × 2
#> var_1 var_2
#> <chr> <chr>
#> 1 arrow arroW
#> 2 arrow arroW
Snappy compression, no dictionary-encoding
Writing with these options:
arrow::write_dataset(
dataset = tbl_input,
path = f_dataset_merged,
partitioning = "var_2",
compression = "snappy",
use_dictionary = FALSE
)
This is the most common error:
#> Error in `arrow::open_dataset()`:
#> ! IOError: Error creating dataset. Could not read schema from '/Temp/RtmpSMXExe/file5d645de7595e/var_2=arroW/part-0.parquet'. Is this a 'parquet' file?: Could not open Parquet input source '/Temp/RtmpSMXExe/file5d645de7595e/var_2=arroW/part-0.parquet': Couldn't deserialize thrift: don't know what type:
Can also get:
tbl_written |>
dplyr::collect()
#> Error in `compute.Dataset()`:
#> ! IOError: Couldn't deserialize thrift: No more data to read.
#> Deserializing page header failed.
or:
tbl_written |>
dplyr::collect()
#> Error in `compute.Dataset()`:
#> ! IOError: Unexpected end of stream
Sometimes it even succeeds.
Snappy compression, dictionary encoding
Writing with these options:
arrow::write_dataset(
dataset = tbl_input,
path = f_dataset_merged,
partitioning = "var_2",
compression = "snappy",
use_dictionary = TRUE
)
Reading always succeeds but the which rows are read is unpredictable:
tbl_written |>
dplyr::collect()
#> # A tibble: 2 × 2
#> var_1 var_2
#> <chr> <chr>
#> 1 arrow arroW
#> 2 arrow arroW
or
tbl_written |>
dplyr::collect()
#> # A tibble: 1 × 2
#> var_1 var_2
#> <chr> <chr>
#> 1 arrow arroW
or even
tbl_written |>
dplyr::collect()
#> # A tibble: 1 × 2
#> var_1 var_2
#> <chr> <chr>
#> 1 arrow arrow
Proposals
I would expect:
- some warning when partitions collide due to difference in capitalization, or silent merging
- if the partitioned variable is stored in the Parquets (as per 14.0.1, right?), the partitioning might just be used for queries, but the reading to be truly lossless (so Hive information should be discarded, not added back to the Table).
- I expect a correct behavior on Linux/OSX since case-sensitivity is not an issue
If everything must be OS-independent, probably the easiest mitigation is to avoid having partitions that differ just in capitalization. (I also wonder what happens when illegal characters are used)
I didn't use partition, yet got OSError: Couldn't deserialize thrift: TProtocolException: Invalid data Deserializing page header failed.
example data: data.zip
I write this file by pandas. Is this the same reason ?