beam
beam copied to clipboard
[Bug]: WriteToFiles in python leave few records in temp directory when writing to large number (100+) of files
What happened?
I am running Pipeline with WriteToFiles the destination is fetched from record and then files are written to GCP cloud Storage we found out that few of the records are missing in the files, and during debug we found out few files are left in the temp directory beam logs this while running locally but not in Dataflow pipelines the temp directory is deleted when running on dataflow. but its retained when running locally.
INFO:apache_beam.io.fileio:Some files may be left orphaned in the temporary folder: ['data/temp\\1891039626663227874_034010af-241f-4930-a782-127098aba54b']
we tried to play with "max_writers_per_bundle" and set it to 300, as numbers of files we are writing is between 100 - 300. there were fewer missing records than earlier but some are still missing (2 records missing for every 100 records)
it does feel like a performance issue and there might be a need optimizing the parameter, but records missing without any errors or warning is cause of a concern for us.
SDK version
Apache Beam Python 3.11 SDK 2.50.0
machineType
n2-standard-4
Issue Priority
Priority: 1 (data loss / total loss of function)
Issue Components
- [X] Component: Python SDK
- [ ] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [X] Component: Google Cloud Dataflow Runner
Write to Files
((appsuccess, possuccess, refundsuccess)
| "merge success" >> beam.Flatten()
| "write orders to cloud storage" >> WriteToFiles(
path=options.outputpath,
destination=get_destination,
sink=JsonSink(),
file_naming=success_file_naming(),
max_writers_per_bundle=300,
temp_directory=options.temppath,
))
JsonSink
@beam.typehints.with_input_types(dict)
class JsonSink(FileSink):
def open(self, fh):
self._fh = fh
def write(self, record):
string = json.dumps(record, default=str)
self._fh.write(string.encode("utf8"))
self._fh.write(b"\n")
def flush(self):
self._fh.flush()
get_destination
def get_destination(record):
if 'location' in record:
return str(record["location"])
elif 'org' in record:
return str(record["org"])
else:
return "DEFAULT"
success_file_naming
def success_file_naming():
def _inner(window, pane, shard_index, total_shards, compression, destination):
return destination + "/processed.json"
return _inner
have implemented Group By as a step, am not seeing any missing records
((appsuccess, possuccess, refundsuccess)
| "merge success" >> beam.Flatten()
| "group by org" >> beam.GroupBy(get_destination)
| "write orders to cloud storage" >> WriteToFiles(
path=options.outputpath,
destination=lambda x: x[0],
sink=GroupedJsonSink(),
file_naming=success_file_naming(),
max_writers_per_bundle=100,
))
modified Json Sink
@beam.typehints.with_input_types(dict)
class GroupedJsonSink(FileSink):
def open(self, fh):
self._fh = fh
def write(self, records):
for record in records[1]:
string = json.dumps(record, default=str)
self._fh.write(string.encode("utf8"))
self._fh.write(b"\n")
def flush(self):
self._fh.flush()
I Feel logging for the orphaned files in temp directory needs to be as error/warning
Hi @awadhoot-sq , I can have a look into the issue!
@RahulDubey391 do you need any additional info?
@RahulDubey391 do you need any additional info?
For now I am good @awadhoot-sq . If I have questions I'll raise them. Thanks!
Thanks for the report. Data loss is concerning. However we were not aware this kind of issue reported in Beam 2.50.0 or earlier.
How often is this issue? Reproducible every time or intermittent? In any case, submit a ticket to gcp would be helpful as they can investigate the jobId
The orphaned file does not mean the file left and not in the final destination.
FileIO works in two stage to ensure data integrity. It write to a temp location, until all record written (or window fire), the temp file is moved to the final destination.
In GCS file system move is achieved by a copy then delete original because GCS blobs is immutable. Orphaned file means the delete wasn't complete, usually due to permission error (if the IAM role of service account used by the worker can create blobs but cannot delete). It's harmless. If there exists file failed to be copied in final destination then the pipeline should fail
The problem seen in local and on data flow may be different. But it is still strange why locally can have orphaned file. When running local, do you mean direct runner or flink/other runner?
Hmm the error described here was also my motivation behind filing https://github.com/apache/beam/pull/29485.
If there exists file failed to be copied in final destination then the pipeline should fail
This was not the case when I had tried this, the pipeline succeeded, but the files were not moved.
Hmm the error described here was also my motivation behind filing #29485.
If there exists file failed to be copied in final destination then the pipeline should fail
This was not the case when I had tried this, the pipeline succeeded, but the files were not moved.
Failed to copy means there is no file in final destination, not failing pipeline means data loss; fail to move is just it copied but failed to delete. the data exists in final destination so the pipeline still considered succeeded
Can we confirm if this can be reproduced or confirmed data loss? Do we have a pipeline that succeeds but the files are not in the final destination?
Any update on this issue?
Any update here? I suggest downgrade to P2 if we cannot get enough information to reproduce and understand it.
The problem seen in local and on data flow may be different. But it is still strange why locally can have orphaned file. When running local, do you mean direct runner or flink/other runner?
I experienced the error "Some files may be left orphaned" which did not fail the pipeline but caused data loss. I realized it occurs when the directory path is local and does not exist.
# DirectRunner
path = "./outputs" # local directory does not exist
p = p | "Write" >> WriteToFiles(path)
>> Some files may be left orphaned... (failure)
Forcing the creation of the local directory resolved the issue.
# DirectRunner
path = "./outputs" # local directory does not exist
os.makedirs(path, exist_ok=True) # now local directory exists
p = p | "Write" >> WriteToFiles(path)
>> Moving temporary files... (success)
Testing with a non-existent GCS path did not present this problem: the directory was created automatically and the "Some files may be left orphaned" was not observed.
# DirectRunner
path = "gs://bucket/outputs" # remote directory does not exist
p = p | "Write" >> WriteToFiles(path)
>> Moving temporary files... (success)
I believe that both the local and remote scenarios should have the same behavior.
I experienced the error "Some files may be left orphaned" which did not fail the pipeline but caused data loss. I realized i occurs when the directory path is local and does not exist.
This is because GCS filesystem is flat and does not have directory, while local filesystem does.
WriteFiles has the logic of creating base path if not exist, but only for temp directory:
https://github.com/apache/beam/blob/90d3f8a177ff935a09852fd85892bb1399b85c91/sdks/python/apache_beam/io/fileio.py#L659
Should be able to use this in final directory also.