sagemaker-python-sdk icon indicating copy to clipboard operation
sagemaker-python-sdk copied to clipboard

ScriptProcessor unable to handle millions of output files to be saved to S3

Open maslick opened this issue 5 years ago • 2 comments
trafficstars

Describe the bug I am having trouble with data post-processing in AWS Sagemaker, where I need to split one large text file with predictions (~2-10 GB) into millions of small files (one file per user ~3-10KB).

I've been able to process a small dataset (32MB, 13540 records). When I try 1.2 million records (2.2 GB), ScriptProcessor successfully processes the input file and saves the output files to /opt/ml/processing/output, however it fails to put them in S3 with an error.

To reproduce Jupyter notebook:

import boto3
from sagemaker import get_execution_role
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput, NetworkConfig


role = get_execution_role()
instance_type = 'ml.m4.4xlarge'
ecr_image_full_name = '0123456789.dkr.ecr.eu-central.amazonaws.com/maslick-sagemaker-processing-image:latest'
    
input_file = 'input.csv'
input_object = 's3://my-awesome-dataset/input.csv'
output_object = 's3://my-awesome-results'
    
network_config = NetworkConfig(enable_network_isolation=False,
                               subnets=["subnet-12345", "subnet-67890"],
                               security_group_ids=["sg-0123456789"])
    
script_processor = ScriptProcessor(role=role,
                                   image_uri=ecr_image_full_name,
                                   command=['python3'],
                                   instance_count=1,
                                   instance_type=instance_type)

input = ProcessingInput(source=input_object, destination='/opt/ml/processing/input')
output = ProcessingOutput(source='/opt/ml/processing/output', destination=output_object)
    
script_processor.run(code='callable.py', inputs=[input], outputs=[output], arguments=[input_file])

callable.py:

import hashlib
import json
import sys
from collections import defaultdict
from concurrent.futures.process import ProcessPoolExecutor
from pathlib import Path
import pandas as pd


def saveFilesMultiProcesses(items):
    with ProcessPoolExecutor() as executor:
        for item in items:
            executor.submit(saveFile, item)


def readCsv(input_file):
    colnames = ['id', 'article', 'type', 'rank']
    df = pd.read_csv('/opt/ml/processing/input/{}'.format(input_file), sep='|', names=colnames)
    return df


def processCsv(df):
    dicts = []
    for row in df.itertuples():
        dict = defaultdict(lambda: defaultdict(list))
        dict["id"] = row.id
        dict["article"] = row.article
        dict["type"] = row.type
        dict["rank"] = row.rank
        dicts.append(dict)

    return dicts


def saveFile(item):
    hashed_prefix = hashlib.md5(str(item['id']).encode('utf-8')).hexdigest()
    short = hashed_prefix[:5]

    file_name = short + "_" + str(item['id']) + "_latest.json"
    outfile = Path('/opt/ml/processing/output', file_name)
    with open(outfile, 'w') as json_file:
        json.dump(item, json_file)


if __name__ == '__main__':
    input_file = sys.argv[1]
    df = readCsv(input_file)
    list_of_dicts = processCsv(df)
    saveFilesMultiProcesses(list_of_dicts)
    print("Done. Wait until all files are saved to S3")

Dockerfile:

FROM python:3.7-slim-buster
RUN pip3 install pandas==0.25.3
ENV PYTHONUNBUFFERED=TRUE

Expected behavior All files that I save to /opt/ml/processing/output should be saved to S3.

Screenshots or logs

---------------------------------------------------------------------------
UnexpectedStatusException                 Traceback (most recent call last)
<ipython-input-66-48dccaef0bee> in <module>()
----> 1 script_processor.run(code='callable.py', inputs=[input], outputs=[output], arguments=[input_file])

~/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker/processing.py in run(self, code, inputs, outputs, arguments, wait, logs, job_name, experiment_config)
    402         self.jobs.append(self.latest_job)
    403         if wait:
--> 404             self.latest_job.wait(logs=logs)
    405 
    406     def _get_user_code_name(self, code):

~/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker/processing.py in wait(self, logs)
    726         """
    727         if logs:
--> 728             self.sagemaker_session.logs_for_processing_job(self.job_name, wait=True)
    729         else:
    730             self.sagemaker_session.wait_for_processing_job(self.job_name)

~/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker/session.py in logs_for_processing_job(self, job_name, wait, poll)
   3132 
   3133         if wait:
-> 3134             self._check_job_status(job_name, description, "ProcessingJobStatus")
   3135             if dot:
   3136                 print()

~/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker/session.py in _check_job_status(self, job, desc, status_key_name)
   2636                 ),
   2637                 allowed_statuses=["Completed", "Stopped"],
-> 2638                 actual_status=status,
   2639             )
   2640 

UnexpectedStatusException: Error for Processing job maslick-sagemaker-processing-image-2020-06-11-15-42-34-593: Failed. Reason: InternalServerError: We encountered an internal error.  Please try again.

System information

  • SageMaker Python SDK version: 1.50.17
  • Framework name (eg. PyTorch) or algorithm (eg. KMeans): pandas
  • Framework version: 0.25.3
  • Python version: 3.7.4
  • CPU or GPU: CPU
  • Custom Docker image (Y/N): Y

Additional context See my stackoverflow question for more details.

maslick avatar Jun 24 '20 06:06 maslick

Any ideas? It's rather annoying - for the moment we're just uploading inside the container but that removes part of the "ease" of sagemaker processing

RoelantStegmann avatar Feb 08 '21 10:02 RoelantStegmann

I am experiencing a similar issue. I can think of the following reason: To upload the data from the container to S3, I guess AWS uses the S3 API in the background, where I assume rate limits apply. If you write too many files, this might trigger many calls to S3 API, which will be throttled at some point and maybe the throttling leads to a timeout. I can see that in my use case: part of the files gets written to S3 as expected. What already seemed to help for me was to use S3 Upload Mode continuous, to distribute the uploading part over the whole time the job was running and not just at the end.

lorenzwalthert avatar Oct 07 '24 13:10 lorenzwalthert