beam
beam copied to clipboard
[Bug]: ReadAllFiles does not fully read gzipped files from GCS
What happened?
Since the refactor of gcsio (2.52?) ReadAllFiles does not fully read gzipped files from GCS. Part of the file will be correctly returned but rest will go missing.
I presume this is caused by the fact that GCS performs decompressive transcoding while _ExpandIntoRanges
uses the GCS objects metadata to determine the read range. This means that the file size we receive is larger than the maximum of the read range.
For example, a gzip on GCS might have a file size of 1 MB and this will be the object size in the metadata. Thus the maximum of the read range will be 1 MB. However, when beam opens the file it's already decompressed by GCS so the file size will be 1.5 MB and we won't read 0.5 MB out of it thus causing data loss.
Issue Priority
Priority: 1 (data loss / total loss of function)
Issue Components
- [X] Component: Python SDK
- [ ] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
Thanks for reporting. Agree this is a P1 bug as it causes data loss.
Is it possible to provide a working example that reproduce the issue, which could help triage.
@shunping FYI
Is it possible to provide a working example that reproduce the issue, which could help triage.
@Abacn I don't have a working example however the steps to reproduce are:
- Upload a gzip file to GCS. Make sure that the unzipped file is large enough, e.g a few MB.
- Create a beam pipeline using Python SDK that reads the file from 1. using RealAllFromText.
- Print or write the output of ReadAllFromText.
- Observe that the file is not fully read.
EDIT: This issue will probably appear for any compression type. I just encountered it with gzip but did not test with other compression algorithms.
I uploaded one test file here: gs://apache-beam-samples/gcs/bigfile.txt.gz
(~7MB), which has 100000 lines but cannot reproduce this:
# standard libraries
import logging
# third party libraries
import apache_beam as beam
from apache_beam import Create, Map
from apache_beam.io.textio import ReadAllFromText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.combiners import Count
logger = logging.getLogger()
logger.setLevel(logging.INFO)
elements = [
"gs://apache-beam-samples/gcs/bigfile.txt.gz",
]
options = PipelineOptions()
with beam.Pipeline(options=options) as p:
(
p
| Create(elements)
| "Read File from GCS" >> ReadAllFromText()
| Count.Globally()
| "Log" >> Map(lambda x: logging.info("Total lines %d", x))
)
This shows:
INFO:root:Total lines 100000
So I double checked and there are differences between your example and our case.
- We use content encoding
gzip
while saving our files to GCS, you don't have encoding specified - This leads us to using
ReadAllFromText
with parametercompression_type=CompressionTypes.UNCOMPRESSED
since the downloaded file seems to be already uncompressed (it doesn't work withCompressionTypes.AUTO
), as in gcs policy - This further results in reading only fragment of the file
Furthermore, after removing encoding type from our file and using CompressionTypes.AUTO
on it worked properly.
To get you example to represent our situation please add content encoding gzip
to your file metadata.
For quick patch we use following solution:
class ReadAllFromTextNotSplittable(ReadAllFromText):
"""This class doesn't take advantage of splitting files in bundles because
when doing so beam was taking compressed file size as reference resulting in
reading only a fracture of uncompressed file"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._read_all_files._splittable = False
What does your medadata look like?
I tried this:
Then I got this error:
ERROR:apache_beam.runners.common:Error -3 while decompressing data: incorrect header check [while running '[6]: Read File from GCS/ReadAllFiles/ReadRange']
This is expected, as I mentioned earlier
This leads us to using ReadAllFromText with parameter compression_type=CompressionTypes.UNCOMPRESSED since the downloaded file seems to be already uncompressed (it doesn't work with CompressionTypes.AUTO), as in gcs policy
I presume while downloading file from GCS it's already decompressed, hence the error of decompression in Beam.
Metadata is as follows (also please note we checked both text/plain
and application/x-gzip
, both were only partially read):
I see. We need to check decompressive transcoding for the GCS file to determine whether the content is compressed rather than relying on the file extension.
# standard libraries
import logging
# third party libraries
import apache_beam as beam
from apache_beam import Create, Map
from apache_beam.io.textio import ReadAllFromText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.combiners import Count
logger = logging.getLogger()
logger.setLevel(logging.INFO)
elements = [
# "gs://apache-beam-samples/gcs/bigfile.txt.gz",
# "gs://apache-beam-samples/gcs/bigfile_with_encoding.txt.gz",
"gs://apache-beam-samples/gcs/bigfile_with_encoding_plain.txt.gz",
]
options = PipelineOptions()
with beam.Pipeline(options=options) as p:
(
p
| Create(elements)
| "Read File from GCS"
>> ReadAllFromText(
compression_type=beam.io.filesystem.CompressionTypes.UNCOMPRESSED
)
| Count.Globally()
| "Log" >> Map(lambda x: logging.info("Total lines %d", x))
)
This only loads 75,601 lines.
#19413 could be related for uploading the file to GCS.
.take-issue
Have we reproduced this?
Yes, see my above link: https://github.com/apache/beam/issues/31040#issuecomment-2081172626
Is there a hope of a fix for 2.57.0 cherry pick? I would guess this is a longstanding issue so getting it fixed in a very thorough way for 2.58.0 is actually the best thing to do. I recall we had decompressive transcoding bugs in the past. So we should make sure we really get it right this time. And the user can mitigate by configuring GCS to not do the transcoding.
Moved this to 2.58.0. Thanks!
Has any progress been made on this?
Not yet. We can move this to 2.59.0.
Has any progress been made on this?
Moved to 2.60.0
Based on this getting pushed from release to release, it is clearly not a true release-blocker.