beam icon indicating copy to clipboard operation
beam copied to clipboard

[Bug]: ReadAllFiles does not fully read gzipped files from GCS

Open janowskijak opened this issue 10 months ago • 21 comments

What happened?

Since the refactor of gcsio (2.52?) ReadAllFiles does not fully read gzipped files from GCS. Part of the file will be correctly returned but rest will go missing.

I presume this is caused by the fact that GCS performs decompressive transcoding while _ExpandIntoRanges uses the GCS objects metadata to determine the read range. This means that the file size we receive is larger than the maximum of the read range.

For example, a gzip on GCS might have a file size of 1 MB and this will be the object size in the metadata. Thus the maximum of the read range will be 1 MB. However, when beam opens the file it's already decompressed by GCS so the file size will be 1.5 MB and we won't read 0.5 MB out of it thus causing data loss.

Issue Priority

Priority: 1 (data loss / total loss of function)

Issue Components

  • [X] Component: Python SDK
  • [ ] Component: Java SDK
  • [ ] Component: Go SDK
  • [ ] Component: Typescript SDK
  • [ ] Component: IO connector
  • [ ] Component: Beam YAML
  • [ ] Component: Beam examples
  • [ ] Component: Beam playground
  • [ ] Component: Beam katas
  • [ ] Component: Website
  • [ ] Component: Spark Runner
  • [ ] Component: Flink Runner
  • [ ] Component: Samza Runner
  • [ ] Component: Twister2 Runner
  • [ ] Component: Hazelcast Jet Runner
  • [ ] Component: Google Cloud Dataflow Runner

janowskijak avatar Apr 18 '24 15:04 janowskijak

Thanks for reporting. Agree this is a P1 bug as it causes data loss.

Abacn avatar Apr 19 '24 16:04 Abacn

Is it possible to provide a working example that reproduce the issue, which could help triage.

Abacn avatar Apr 19 '24 16:04 Abacn

@shunping FYI

liferoad avatar Apr 20 '24 13:04 liferoad

Is it possible to provide a working example that reproduce the issue, which could help triage.

@Abacn I don't have a working example however the steps to reproduce are:

  1. Upload a gzip file to GCS. Make sure that the unzipped file is large enough, e.g a few MB.
  2. Create a beam pipeline using Python SDK that reads the file from 1. using RealAllFromText.
  3. Print or write the output of ReadAllFromText.
  4. Observe that the file is not fully read.

EDIT: This issue will probably appear for any compression type. I just encountered it with gzip but did not test with other compression algorithms.

janowskijak avatar Apr 22 '24 08:04 janowskijak

I uploaded one test file here: gs://apache-beam-samples/gcs/bigfile.txt.gz (~7MB), which has 100000 lines but cannot reproduce this:

# standard libraries
import logging

# third party libraries
import apache_beam as beam
from apache_beam import Create, Map
from apache_beam.io.textio import ReadAllFromText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.combiners import Count

logger = logging.getLogger()
logger.setLevel(logging.INFO)

elements = [
    "gs://apache-beam-samples/gcs/bigfile.txt.gz",
]

options = PipelineOptions()

with beam.Pipeline(options=options) as p:
    (
        p
        | Create(elements)
        | "Read File from GCS" >> ReadAllFromText()
        | Count.Globally()
        | "Log" >> Map(lambda x: logging.info("Total lines %d", x))
    )

This shows:

INFO:root:Total lines 100000

liferoad avatar Apr 22 '24 15:04 liferoad

So I double checked and there are differences between your example and our case.

  • We use content encoding gzip while saving our files to GCS, you don't have encoding specified
  • This leads us to using ReadAllFromText with parameter compression_type=CompressionTypes.UNCOMPRESSED since the downloaded file seems to be already uncompressed (it doesn't work with CompressionTypes.AUTO), as in gcs policy
  • This further results in reading only fragment of the file

Furthermore, after removing encoding type from our file and using CompressionTypes.AUTO on it worked properly. To get you example to represent our situation please add content encoding gzip to your file metadata.

For quick patch we use following solution:

class ReadAllFromTextNotSplittable(ReadAllFromText):
    """This class doesn't take advantage of splitting files in bundles because
    when doing so beam was taking compressed file size as reference resulting in
    reading only a fracture of uncompressed file"""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._read_all_files._splittable = False

What does your medadata look like?

I tried this: image

Then I got this error:

ERROR:apache_beam.runners.common:Error -3 while decompressing data: incorrect header check [while running '[6]: Read File from GCS/ReadAllFiles/ReadRange']

liferoad avatar Apr 23 '24 17:04 liferoad

This is expected, as I mentioned earlier This leads us to using ReadAllFromText with parameter compression_type=CompressionTypes.UNCOMPRESSED since the downloaded file seems to be already uncompressed (it doesn't work with CompressionTypes.AUTO), as in gcs policy I presume while downloading file from GCS it's already decompressed, hence the error of decompression in Beam.

Metadata is as follows (also please note we checked both text/plain and application/x-gzip, both were only partially read): image

I see. We need to check decompressive transcoding for the GCS file to determine whether the content is compressed rather than relying on the file extension.

# standard libraries
import logging

# third party libraries
import apache_beam as beam
from apache_beam import Create, Map
from apache_beam.io.textio import ReadAllFromText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.combiners import Count

logger = logging.getLogger()
logger.setLevel(logging.INFO)

elements = [
    # "gs://apache-beam-samples/gcs/bigfile.txt.gz",
    # "gs://apache-beam-samples/gcs/bigfile_with_encoding.txt.gz",
    "gs://apache-beam-samples/gcs/bigfile_with_encoding_plain.txt.gz",
]

options = PipelineOptions()

with beam.Pipeline(options=options) as p:
    (
        p
        | Create(elements)
        | "Read File from GCS"
        >> ReadAllFromText(
            compression_type=beam.io.filesystem.CompressionTypes.UNCOMPRESSED
        )
        | Count.Globally()
        | "Log" >> Map(lambda x: logging.info("Total lines %d", x))
    )

This only loads 75,601 lines.

#19413 could be related for uploading the file to GCS.

liferoad avatar Apr 27 '24 20:04 liferoad

.take-issue

shunping avatar May 16 '24 14:05 shunping

Have we reproduced this?

kennknowles avatar Jun 07 '24 18:06 kennknowles

Yes, see my above link: https://github.com/apache/beam/issues/31040#issuecomment-2081172626

liferoad avatar Jun 07 '24 19:06 liferoad

Is there a hope of a fix for 2.57.0 cherry pick? I would guess this is a longstanding issue so getting it fixed in a very thorough way for 2.58.0 is actually the best thing to do. I recall we had decompressive transcoding bugs in the past. So we should make sure we really get it right this time. And the user can mitigate by configuring GCS to not do the transcoding.

kennknowles avatar Jun 10 '24 15:06 kennknowles

Moved this to 2.58.0. Thanks!

liferoad avatar Jun 10 '24 18:06 liferoad

Has any progress been made on this?

jrmccluskey avatar Jul 02 '24 14:07 jrmccluskey

Not yet. We can move this to 2.59.0.

liferoad avatar Jul 02 '24 19:07 liferoad

Has any progress been made on this?

lostluck avatar Aug 20 '24 20:08 lostluck

Moved to 2.60.0

lostluck avatar Aug 22 '24 00:08 lostluck

Based on this getting pushed from release to release, it is clearly not a true release-blocker.

kennknowles avatar Aug 22 '24 14:08 kennknowles