amazon-sagemaker-examples
amazon-sagemaker-examples copied to clipboard
[Example Request] - Distributed Processing SKLEARN : AWS Sagemaker
I have a few raw .csv files in my S3 bucket. How can I process them in parallel to reduce run time? See comments on where I require a little help. I am using SKLearnProcessor and s3_data_distribution_type='ShardedByS3Key'
%%writefile preprocessing/preprocessing_sklearn.py
import pandas as pd
import argparse
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import os
def process(input_data_path):
df = pd.read_csv(input_data_path)
# drop first col (unamed: 0)
df = df.iloc[: , 1:]
features = df.iloc[:,1:]
headers = features.columns
labels = df.iloc[:,0]
scaler = StandardScaler()
normalized_x_train = scaler.fit_transform(features)
# write
pd.DataFrame(normalized_x_train).to_csv((os.path.join('/opt/ml/processing/output/train', 'train_features.csv')), header=False, index=False)
pd.DataFrame(labels).to_csv((os.path.join('/opt/ml/processing/output/train', 'train_labels.csv')), header=False, index=False)
if __name__ == '__main__':
# HOW DO I MAKE THIS DYNAMIC? CHUNK_1.CSV, CHUNK_2.CSV ETC
input_data_path = os.path.join("/opt/ml/processing/input", "train-data-with-header.csv")
process(input_data_path)
My calling fn -
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
import timeit
start = timeit.default_timer()
# WHAT SHOULD BE MY SOURCE?
source = "s3://sagemaker-end-to-end/data_tuning/train/chunk_0.csv"
source2 = "s3://sagemaker-end-to-end/data_tuning/train/"
sklearn_processor = SKLearnProcessor(framework_version='0.23-1',
role=role,
instance_type='ml.m5.xlarge',
instance_count=2,
base_job_name = 'preprocess-sklearn'
)
sklearn_processor.run(
code='preprocessing/preprocessing_sklearn.py',
inputs=[
ProcessingInput(
source=source2,
s3_data_distribution_type='ShardedByS3Key',
destination='/opt/ml/processing/input')
],
outputs=[
ProcessingOutput(
source='/opt/ml/processing/output/train',
destination= make_url(store_bucket, "preprocess_sklearn", "train")
),
#
ProcessingOutput(
source='/opt/ml/processing/output/test',
destination= make_url(store_bucket, "preprocess_sklearn", "test")
)
]
)
stop = timeit.default_timer()
print('Time: ', stop - start)
Hi, You don't have to specify the file names in the script. Depending on the instance count, files will be distributed behind the scenes. In your example, you have 8 files and have 2 for instance count, each instance will get 4 files for processing.
Hi, I'll request you to provide a sample code as well because I did that and it throws me an error, saying source invalid.
Hi, Did you get a chance to find an example using SKLearn aws sagemaker processor in parallel calculation? Would appreciate it if you can point to example code resources. Thank you
In your ProcessingInput source parameter you just need to pass the S3 location which contains all the files.
In your case, I assume you uploaded all the chunk_*.csv files to s3://sagemaker-end-to-end/data_tuning/train/
Then in your preprocessing_sklearn.py,
You can list the files that are available to the instance and use that list as input for your processing logic.
For example,
import os
files = list(filter(None,os.popen('ls /opt/ml/processing/input | grep chunk').read().split("\n")))
This will return a list of chunk_*.csv files to process that are on the instance.
You can then iterate over them with:
for file in files:
print(file)
#Add your processing logic here
In hopes this will help someone who stumbles across this post in the future:
%%writefile preprocess.py
import pandas as pd # used for handling the dataset
import boto3
import glob
#s3 settings
bucket = 'bucket
output_prefix = 'users/name/output'
def main(input_parquet):
"""parse the parquet, remoe white space from the text, upload it to s3 again"""
try:
#load in the parquet
new_df = pd.read_parquet(input_parquet, engine='auto', columns = ["index", "fake text"])
new_df['cleaned_text'] = new_df['fake text'].apply(lambda x: "".join(x.split()))
#name the processed file
processed_file_name = '{}_processed.parquet'.format(input_parquet)
#write it to parquet
new_df.to_parquet(processed_file_name)
#let the user know it was successfully processed
print("The final output can be downloaded from: s3://{}/{}{}".format(bucket, output_prefix, processed_file_name))
print("finished processing{}".format(input_parquet))
return True, None
except Exception as e:
print(e)
return False, e
if __name__ == "__main__":
import os
import boto3
# os.system("pip install package_here") # add any system installs for packages you need
#get a list of the input files that are copied onto the instance in the input folder
print("The files we found were:")
print("\n")
files = glob.glob("/opt/ml/processing/input_data/*.parquet")
print(files)
# your total number of files will be input_files_number / instance_count
for index, file in enumerate(files):
#run our main function
main(file)
print("successfully procesed", file)
code to actually launch the processing containers
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker import get_execution_role
#name your job name
base_job_name='dev-name-testing'
script_processor = ScriptProcessor(command=['python3'],
image_uri="763104351884.dkr.ecr.us-east-1.amazonaws.com/huggingface-pytorch-trcomp-training:1.9.0-transformers4.11.0-gpu-py38-cu111-ubuntu20.04",
role=get_execution_role(),
base_job_name=base_job_name,
# instance_type='ml.g4dn.xlarge',
instance_type="ml.m5.large",
instance_count=20)
script_processor.run(code='preprocess.py',
inputs=[ProcessingInput(
source='s3://bucket/users/name/input/',
destination='/opt/ml/processing/input_data',
s3_data_distribution_type='ShardedByS3Key',
s3_data_type='S3Prefix'),
],
outputs=[ProcessingOutput(
source='/opt/ml/processing/processed_data',
destination='s3://bucket/users/name/output',
s3_upload_mode="EndOfJob")],
)