sagemaker-python-sdk
sagemaker-python-sdk copied to clipboard
ScriptProcessor unable to handle millions of output files to be saved to S3
Describe the bug I am having trouble with data post-processing in AWS Sagemaker, where I need to split one large text file with predictions (~2-10 GB) into millions of small files (one file per user ~3-10KB).
I've been able to process a small dataset (32MB, 13540 records). When I try 1.2 million records (2.2 GB), ScriptProcessor successfully processes the input file and saves the output files to /opt/ml/processing/output, however it fails to put them in S3 with an error.
To reproduce Jupyter notebook:
import boto3
from sagemaker import get_execution_role
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput, NetworkConfig
role = get_execution_role()
instance_type = 'ml.m4.4xlarge'
ecr_image_full_name = '0123456789.dkr.ecr.eu-central.amazonaws.com/maslick-sagemaker-processing-image:latest'
input_file = 'input.csv'
input_object = 's3://my-awesome-dataset/input.csv'
output_object = 's3://my-awesome-results'
network_config = NetworkConfig(enable_network_isolation=False,
subnets=["subnet-12345", "subnet-67890"],
security_group_ids=["sg-0123456789"])
script_processor = ScriptProcessor(role=role,
image_uri=ecr_image_full_name,
command=['python3'],
instance_count=1,
instance_type=instance_type)
input = ProcessingInput(source=input_object, destination='/opt/ml/processing/input')
output = ProcessingOutput(source='/opt/ml/processing/output', destination=output_object)
script_processor.run(code='callable.py', inputs=[input], outputs=[output], arguments=[input_file])
callable.py:
import hashlib
import json
import sys
from collections import defaultdict
from concurrent.futures.process import ProcessPoolExecutor
from pathlib import Path
import pandas as pd
def saveFilesMultiProcesses(items):
with ProcessPoolExecutor() as executor:
for item in items:
executor.submit(saveFile, item)
def readCsv(input_file):
colnames = ['id', 'article', 'type', 'rank']
df = pd.read_csv('/opt/ml/processing/input/{}'.format(input_file), sep='|', names=colnames)
return df
def processCsv(df):
dicts = []
for row in df.itertuples():
dict = defaultdict(lambda: defaultdict(list))
dict["id"] = row.id
dict["article"] = row.article
dict["type"] = row.type
dict["rank"] = row.rank
dicts.append(dict)
return dicts
def saveFile(item):
hashed_prefix = hashlib.md5(str(item['id']).encode('utf-8')).hexdigest()
short = hashed_prefix[:5]
file_name = short + "_" + str(item['id']) + "_latest.json"
outfile = Path('/opt/ml/processing/output', file_name)
with open(outfile, 'w') as json_file:
json.dump(item, json_file)
if __name__ == '__main__':
input_file = sys.argv[1]
df = readCsv(input_file)
list_of_dicts = processCsv(df)
saveFilesMultiProcesses(list_of_dicts)
print("Done. Wait until all files are saved to S3")
Dockerfile:
FROM python:3.7-slim-buster
RUN pip3 install pandas==0.25.3
ENV PYTHONUNBUFFERED=TRUE
Expected behavior
All files that I save to /opt/ml/processing/output should be saved to S3.
Screenshots or logs
---------------------------------------------------------------------------
UnexpectedStatusException Traceback (most recent call last)
<ipython-input-66-48dccaef0bee> in <module>()
----> 1 script_processor.run(code='callable.py', inputs=[input], outputs=[output], arguments=[input_file])
~/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker/processing.py in run(self, code, inputs, outputs, arguments, wait, logs, job_name, experiment_config)
402 self.jobs.append(self.latest_job)
403 if wait:
--> 404 self.latest_job.wait(logs=logs)
405
406 def _get_user_code_name(self, code):
~/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker/processing.py in wait(self, logs)
726 """
727 if logs:
--> 728 self.sagemaker_session.logs_for_processing_job(self.job_name, wait=True)
729 else:
730 self.sagemaker_session.wait_for_processing_job(self.job_name)
~/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker/session.py in logs_for_processing_job(self, job_name, wait, poll)
3132
3133 if wait:
-> 3134 self._check_job_status(job_name, description, "ProcessingJobStatus")
3135 if dot:
3136 print()
~/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker/session.py in _check_job_status(self, job, desc, status_key_name)
2636 ),
2637 allowed_statuses=["Completed", "Stopped"],
-> 2638 actual_status=status,
2639 )
2640
UnexpectedStatusException: Error for Processing job maslick-sagemaker-processing-image-2020-06-11-15-42-34-593: Failed. Reason: InternalServerError: We encountered an internal error. Please try again.
System information
- SageMaker Python SDK version: 1.50.17
- Framework name (eg. PyTorch) or algorithm (eg. KMeans): pandas
- Framework version: 0.25.3
- Python version: 3.7.4
- CPU or GPU: CPU
- Custom Docker image (Y/N): Y
Additional context See my stackoverflow question for more details.
Any ideas? It's rather annoying - for the moment we're just uploading inside the container but that removes part of the "ease" of sagemaker processing
I am experiencing a similar issue. I can think of the following reason: To upload the data from the container to S3, I guess AWS uses the S3 API in the background, where I assume rate limits apply. If you write too many files, this might trigger many calls to S3 API, which will be throttled at some point and maybe the throttling leads to a timeout. I can see that in my use case: part of the files gets written to S3 as expected. What already seemed to help for me was to use S3 Upload Mode continuous, to distribute the uploading part over the whole time the job was running and not just at the end.