sagemaker-python-sdk icon indicating copy to clipboard operation
sagemaker-python-sdk copied to clipboard

Implementing model monitoring for image dataset.

Open Shradaya opened this issue 1 year ago • 0 comments

I tried to implement model monitoring but I do not understand how to implement it for Image dataset. Specially I am not sure how the 4 dimensional data for the image is to be handled for model monitoring. I have defined a preprocess_handler but not sure if that's the function which can truly help.

In my code I am generating false ground truth and invoke the model as follows:

def generate_load_and_ground_truth():
    df = pd.read_csv('validation_with_predictions.csv')
    gt_records = []
    for i, row in df.iterrows():
        suffix = uuid.uuid1().hex
        inference_id = f'{i}-{suffix}'
        data = np.array([X_test[i]])
        payload = {'instances': data}
        args = {'InferenceId': inference_id}
        out = predictor.predict(data = payload, initial_args = args)
        gt_records.append(str({
            "groundTruthData": {
                "data": str(df['label'][i]),
                "encoding": 'CSV',
            },
            "eventMetadata": {
                "eventId": str(inference_id),
            },
            "eventVersion": "0",
        }))
    upload_ground_truth(gt_records, ground_truth_upload_path, datetime.utcnow())

def upload_ground_truth(records, path, upload_time):
    data_to_upload = ",".join(records)
    data_to_upload = data_to_upload
    target_s3_uri = f"{path}/{upload_time:%Y/%m/%d/%H/%M%S}.jsonl"
    print(f"Uploading {len(records)} records to", target_s3_uri)
    S3Uploader.upload_string_as_file_body(data_to_upload, target_s3_uri)

When the model schedule ran it gave the error message

'MonitoringExecutionStatus': 'Failed',
'FailureReason': 'Algorithm Error: See Job Logs for more information.'
Looking into the cloud watch log I found the error to be:
'Cannot resolve column name "groundTruthMetadata.eventId" among (_corrupt_record);'

Then I used this pre-process function at my endpoint.

import json
def preprocess_handler(inference_record):
    input_dict = json.loads(inference_record.endpoint_input.data)
    output_dict = json.loads(inference_record.endpoint_output.data)
    input_data = str(input_dict['instances'].reshape(3072))[1:-1]
    output_data = str(np.argmax(output_dict['predictions'][0]))
    return_dict = {'prediction000':output_data, 'feature000':input_data}
    return return_dict

It gives the error:

'FailureReason': 'InternalServerError: We encountered an internal error. Please try again.'}

Shradaya avatar Jul 11 '22 04:07 Shradaya