beam
beam copied to clipboard
[Bug]: Parquet size exploded for no apparent reason
What happened?
Hello, I am facing a strange issue where my parquet file sizes are exploding. My Env:
- Beam SDK: 2.35.0
- Parquet-mr: 1.12.0 (build db75a6815f2ba1d1ee89d1a90aeb296f1f3a8f20)
- Execution Env: Google Dataflow
- Compression: Snappy I have a pipeline which writes data to GCS and bigquery. I noticed a strange behavior where the parquet files being written in GCS directory were very big e.g. instead of being 5GB (in total) as they normally should be, they were around 35-40GB. I suspected that the write process maybe failed but that was not the case. I ran a few tests using spark instead of dataflow/beam.
- The count of records in bigquery and gcs are the same.
- If I read the data from bigquery and write to gcs using spark, the output size is as expected (5GB).
- If I read the data from the big gcs folder (35-40GB) and try to write it again using spark, it is still the same size.
- If I read the data from the big gcs folder (35-40GB and try to repartition(10) it, I still get the same size of 35-40GB.
The only difference I can see between old and new pipeline is the upgrade of beam sdk from 2.20.0 to 2.35.0. I tried searching online and release notes but couldn't find anything. Is this something known? My only suspicion is the ParquetIO doing something wrong. Any help would be much appreciated.
Issue Priority
Priority: 1
Issue Component
Component: io-java-parquet
@bsikander Could you compare the DAG of your pipeline before and after the upgrade to help investigate. The increase in size you are seeing might suggest that the new version of Beam introduced an additional shuffle step that affects Parquet compression.
Columnar formats offer powerful compression if data is adequately laid-out & ordered, but if changed things can look totally different. For instance, if a random shuffle is done before writing data, strategies such as run length encoding
won't be able to compress the data much.
There are a lot of changes in that version range.
git log --oneline v2.35.0 ^v.2.20.0 -- sdks/java/io/parquet
In case this helps. I've deleted commits that have nothing to do with Parquet, like global formatting cleanups and stuff.
2a94534a3d6 [BEAM-13157] add regression test for hadoop configuration on ParquetIO.Parse
0bb6c1494fa [BEAM-12070] ParquetIO: use splittable reading by default
d5cd15ab4de [BEAM-13157] support hadoop configuration on ParquetIO.Parse
136eadc121e [BEAM-13104] ParquetIO: SplitReadFn must read the whole block
15833448ad1 [BEAM-12165] Add support for ParquetIO Sink to specify an Avro model (#14644)
f725953e295 [BEAM-12151] Bump Apache Parquet to 1.12.0
3b77ff84738 [BEAM-12057] Add missing populateDisplayData methods to ParquetIO
93d0be065e1 [BEAM-11969] Adds an option for setting row-group size in ParquetIO (#14227)
8d0b19fa2f7 [BEAM-11972] Close all opened channels/readers on ParquetIO
1de8ef9e0a6 [BEAM-11913] Add support for Hadoop configuration on ParquetIO
d79cd82943c [BEAM-7929] Support column projection for Parquet Tables
07e1e021250 [BEAM-11861] Add methods to explicitly provide coder for ParquetIO's Parse and ParseFiles (#14078)
1a4d9a273a6 [BEAM-10961] enable strict dependency checking for sdks/java/io/parquet (#14062)
4a0f056139c BEAM-4986: Bump to Apache Parquet 1.11.1
39ee3ef170c [BEAM-8202] Support ParquetTable Writer
34a8fcde615 [BEAM-11526] Cleanup Code for Schema support (#13646)
1a617bed40b Add Beam schema support for ParquetIO reads.
ff5a094be93 Consistency improvements and other fixes
a7301b75179 [BEAM-11527] Add builder parameter to allow user defined Hadoop ReadSupport flags in Hadoop Configuration.
0d344ba3b7a Make passthroughfunction singleton by making the class with private constructor.
6faeda36070 Make GenericRecordPassthroughFn singleton
de6d2686e81 improve backward compatibility by creating separate `Parse<T>` and `ParseFiles<T>` implementation for supporting files with unknown schema.
75e167ec34d [BEAM-11460] Implement reading unknown schema files for ParquetIO
b0ac11db4b2 [BEAM-8615 BEAM-8569 BEAM-7937] Add hadoop 3 compatibility tests
fb23d3f3707 [BEAM-8876] Run hadoop tests with different versions and enable HadoopFormatIT test in Java Postcommit
f9492fb9ef3 [BEAM-8719 BEAM-8768 BEAM-8769 BEAM-8770 BEAM-8771] Update minor hadoop dependency
a334bac48f8 [BEAM-7925]add schema encoder
d5944974fbe [Beam-4379] Make ParquetIO read splittable (#12223)
bd915c7af47 Implemented SchemaIO and SchemaIOProvider for Parquet
4fb27bcc19d [BEAM-10284] Remove hadoop from ParquetIO.Sink public API
9cabeae142f [BEAM-10284] Add option to pass configuration into ParquetIO.Sink
It would help a lot of you could find a smaller version range.
@mosche
I just noticed that the there is one more difference between my old and new runs. A DropFields.fields
call is added to drop some duplicates and this results in a shuffle. Based on what you suggested, this can be the reason for file size explode. What is generally a good way to fix this? To sort data on some column after DropFields?
@bsikander I don't think DropFields.fields
would trigger a shuffle, it's just a projection that affects the schema as far as I know. Are you applying a deduplication in addition, e.g. Deduplicate.values()
? If the latter is the case it would certainly explain what you are seeing.
It's hard to give a general recommendation here without knowing the data, but also your read patterns. You can experiment with partitioning (GroupByKey) and ordering (SortValues). In any case it'll be a tradeoff between compute time at the time of writing vs storage efficiency and compute time when reading (mostly due to pruning entire files / row groups).
@mosche sorry for the confusion. You are right, I am doing deduplication which is causing a shuffle. .apply("Remove Duplicates", Distinct.create());
.
As per your suggestion, then I try to see what ordering makes sense and see how it impacts the parquet size.
@bsikander Based on the last answer, is it still an issue for you? If not, can we close it?
I close this one since seems there is no more issue anymore.