amazon-sagemaker-examples icon indicating copy to clipboard operation
amazon-sagemaker-examples copied to clipboard

[Example Request] - Distributed Processing SKLEARN : AWS Sagemaker

Open anansrivastava opened this issue 4 years ago • 5 comments

I have a few raw .csv files in my S3 bucket. How can I process them in parallel to reduce run time? See comments on where I require a little help. I am using SKLearnProcessor and s3_data_distribution_type='ShardedByS3Key'

Screenshot 2021-08-02 at 16 37 30
%%writefile preprocessing/preprocessing_sklearn.py
    
import pandas as pd
import argparse
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import os

def process(input_data_path):
    df = pd.read_csv(input_data_path)
#     drop first col (unamed: 0)
    df = df.iloc[: , 1:]
    
    features = df.iloc[:,1:]
    headers = features.columns
    labels = df.iloc[:,0]

    scaler = StandardScaler()
    
    normalized_x_train = scaler.fit_transform(features)

    # write
    pd.DataFrame(normalized_x_train).to_csv((os.path.join('/opt/ml/processing/output/train', 'train_features.csv')), header=False, index=False)
    pd.DataFrame(labels).to_csv((os.path.join('/opt/ml/processing/output/train', 'train_labels.csv')), header=False, index=False)
    
if __name__ == '__main__':
    # HOW DO I MAKE THIS DYNAMIC? CHUNK_1.CSV, CHUNK_2.CSV ETC
    input_data_path = os.path.join("/opt/ml/processing/input", "train-data-with-header.csv")  
    process(input_data_path)

My calling fn -

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
import timeit

start = timeit.default_timer()
# WHAT SHOULD BE MY SOURCE?
source = "s3://sagemaker-end-to-end/data_tuning/train/chunk_0.csv" 
source2 = "s3://sagemaker-end-to-end/data_tuning/train/"

sklearn_processor = SKLearnProcessor(framework_version='0.23-1',
                                     role=role,
                                     instance_type='ml.m5.xlarge',
                                     instance_count=2,
                                     base_job_name = 'preprocess-sklearn'
                                    )

sklearn_processor.run(
    code='preprocessing/preprocessing_sklearn.py',
    inputs=[
        ProcessingInput(
            source=source2,
            s3_data_distribution_type='ShardedByS3Key',
            destination='/opt/ml/processing/input')
    ],
    
    outputs=[
        ProcessingOutput(
          source='/opt/ml/processing/output/train', 
          destination= make_url(store_bucket, "preprocess_sklearn", "train")
        ),
#                                
        ProcessingOutput(
            source='/opt/ml/processing/output/test',
            destination= make_url(store_bucket, "preprocess_sklearn", "test")
        )
    ]
                     
)

stop = timeit.default_timer()

print('Time: ', stop - start)

anansrivastava avatar Aug 02 '21 15:08 anansrivastava

Hi, You don't have to specify the file names in the script. Depending on the instance count, files will be distributed behind the scenes. In your example, you have 8 files and have 2 for instance count, each instance will get 4 files for processing.

rthamman avatar Aug 23 '21 22:08 rthamman

Hi, I'll request you to provide a sample code as well because I did that and it throws me an error, saying source invalid.

anansrivastava avatar Aug 24 '21 07:08 anansrivastava

Hi, Did you get a chance to find an example using SKLearn aws sagemaker processor in parallel calculation? Would appreciate it if you can point to example code resources. Thank you

AvivTahar avatar Mar 17 '22 20:03 AvivTahar

In your ProcessingInput source parameter you just need to pass the S3 location which contains all the files. In your case, I assume you uploaded all the chunk_*.csv files to s3://sagemaker-end-to-end/data_tuning/train/

Then in your preprocessing_sklearn.py, You can list the files that are available to the instance and use that list as input for your processing logic. For example,

import os
files = list(filter(None,os.popen('ls /opt/ml/processing/input | grep chunk').read().split("\n")))

This will return a list of chunk_*.csv files to process that are on the instance.

You can then iterate over them with:

for file in files:
    print(file)
    #Add your processing logic here

marckarp avatar Mar 18 '22 00:03 marckarp

In hopes this will help someone who stumbles across this post in the future:

%%writefile preprocess.py
import pandas as pd # used for handling the dataset
import boto3
import glob

    
#s3 settings
bucket = 'bucket
output_prefix = 'users/name/output'

def main(input_parquet):
    """parse the parquet, remoe white space from the text, upload it to s3 again"""
    try:
        #load in the parquet
        new_df = pd.read_parquet(input_parquet, engine='auto', columns = ["index", "fake text"])

        new_df['cleaned_text'] = new_df['fake text'].apply(lambda x: "".join(x.split()))

        #name the processed file
        processed_file_name = '{}_processed.parquet'.format(input_parquet)
        #write it to parquet
        new_df.to_parquet(processed_file_name)

        #let the user know it was successfully processed
        print("The final output can be downloaded from: s3://{}/{}{}".format(bucket, output_prefix, processed_file_name))
        print("finished processing{}".format(input_parquet))

        return True, None
    
    except Exception as e:
        print(e)
        return False, e


if __name__ == "__main__":
    import os
    import boto3
#     os.system("pip install package_here") # add any system installs for packages you need

    #get a list of the input files that are copied onto the instance in the input folder
    print("The files we found were:")
    print("\n")
    files = glob.glob("/opt/ml/processing/input_data/*.parquet")
    print(files)
    # your total number of files will be input_files_number / instance_count
    for index, file in enumerate(files):
        
        #run our main function
        main(file)
        print("successfully procesed", file)


code to actually launch the processing containers



from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker import get_execution_role

#name your job name
base_job_name='dev-name-testing'

script_processor = ScriptProcessor(command=['python3'],
                image_uri="763104351884.dkr.ecr.us-east-1.amazonaws.com/huggingface-pytorch-trcomp-training:1.9.0-transformers4.11.0-gpu-py38-cu111-ubuntu20.04",
                role=get_execution_role(),
                base_job_name=base_job_name,
#                 instance_type='ml.g4dn.xlarge',
                instance_type="ml.m5.large",
                instance_count=20)

script_processor.run(code='preprocess.py',
                    inputs=[ProcessingInput(
                        source='s3://bucket/users/name/input/',
                        destination='/opt/ml/processing/input_data',
                        s3_data_distribution_type='ShardedByS3Key',
                        s3_data_type='S3Prefix'),
                        ],
                    outputs=[ProcessingOutput(
                        source='/opt/ml/processing/processed_data',
                        destination='s3://bucket/users/name/output',
                        s3_upload_mode="EndOfJob")],
                    )

joe-webb avatar Apr 13 '22 17:04 joe-webb