sagemaker-python-sdk icon indicating copy to clipboard operation
sagemaker-python-sdk copied to clipboard

Model monitor mix up features with target

Open arvgram opened this issue 1 year ago • 0 comments

Describe the bug When using a model monitor for my deployed decision tree model, the monitor mixes up the features and the target variable. That is: It interprets the collected model output, i.e. the target variable as feature no. 1, and interprets the collected feature no. 1 as feature no. 2, and feat no. 2 as feat no. 3 and so on. Target -> no 1. -> no.2 -> no 3. -> no. 4 -> Target. I sent the baseline dataset as input in order to make a histogram in which it is very clear. The distribution of the collected data matches the previous feature's baseline data. image

To reproduce

# Create a model object from the pretrained model. 

model_uri = string with the uri for the s3 bucket where model i stored. 
model_name = f"my-decision-tree-model"

model = SKLearnModel(model_data= model_uri,
                             role=role,
                             entry_point= "Scripts/serve.py",
                             framework_version="0.23-1")
print('Model name: ' + model_name)

The serve-function we are using is defined as:

%%writefile Scripts/serve.py

# The methods below are for model serving, which will be called by the SageMaker serving endpoint automatically:
#  model_fn()
#  input_fn()
#  predict_fn()
#  output_fn()

import os
import joblib
import json
import pandas as pd
import numpy as np
from io import StringIO 

"""
Deserialize fitted model
"""
def model_fn(model_dir):
    model = joblib.load(os.path.join(model_dir, "model.joblib"))
    return model

"""
input_fn
    request_body: The body of the request sent to the model.
    request_content_type: (string) specifies the format/variable type of the request
"""
def input_fn(request_body, request_content_type):
    if request_content_type == 'application/json':
        request_body = json.loads(request_body)
        inpVar = request_body['Input']
        return inpVar
    elif request_content_type == 'text/csv':
        df = pd.read_csv(StringIO(request_body),header=None)
        return df.values.tolist()
    else:
        raise ValueError("This model only supports text/csv or application/json input")
        
"""
predict_fn
    input_data: returned array from input_fn above
    model (sklearn model) returned model loaded from model_fn above
"""
def predict_fn(input_data, model):
    return model.predict(input_data)

"""
output_fn
    prediction: the returned value from predict_fn above
    content_type: the content type the endpoint expects to be returned. Ex: JSON, string
"""

def output_fn(prediction, content_type):
    if content_type == 'application/json':
        res = prediction.tolist()
        respJSON = {'Output': res}
        return respJSON
    elif content_type == 'text/csv':
        s = StringIO()
        np.savetxt(s, prediction, fmt='%d', delimiter=',', newline='\n')
        return s.getvalue()
    else:
        raise ValueError("This model only supports text/csv or application/json input") 

  • Upload model to endpoint with endpoint data capture enabled:
endpoint_name = "model-endpoint" 
print("Endpoint name (endpoint_name) =", endpoint_name)

data_capture_config = DataCaptureConfig(
    enable_capture = True, 
    sampling_percentage = 100, 
    destination_s3_uri = endpoint_data_uri,
    csv_content_types = ['text/csv', 'text/html']
)

predictor = model.deploy(
    initial_instance_count=1,
    instance_type="ml.t2.medium",
    endpoint_name=endpoint_name,
    data_capture_config=data_capture_config,
)
  • Create a model monitor object and set an hourly schedule.
my_model_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type="ml.t3.medium",
    volume_size_in_gb=20,
    max_runtime_in_seconds=1800,
    sagemaker_session=session,
)

mon_schedule_name = my_monitor_schedule

my_model_monitor.create_monitoring_schedule(
    monitor_schedule_name=mon_schedule_name,
    endpoint_input=endpoint_name,
    output_s3_uri = s3_report_path,
    statistics = statistics_uri, #we point at uploaded statistics in s3
    constraints = constraints_uri, #we point at uploaded constraints in s3
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)

  • Send some data to endpoint. I have made a loop that sends randomized input with features that are similar to the Iris dataset, mixed with input from the actual Iris dataset and see if it catches any drift etc.
def invoke_csv_iris(output = False):
    import numpy as np
    
    content_type = "text/csv"
    
    sepal_length = 3 + np.random.rand()*5
    sepal_width = 2 + np.random.rand()*2
    petal_length = 1 + np.random.rand()*5
    petal_width = np.random.rand()*3

    random_iris_string = str(sepal_length) + "," + str(sepal_width) + "," + str(petal_length) + "," + str(petal_width)

    response = client.invoke_endpoint(
        EndpointName=endpoint_name, 
        ContentType=content_type,
        Accept=content_type,
        Body=random_iris_string)
    r = response['Body'].read().decode()
    if output:
        print(type(r))
        print(r)
content_type = "text/csv"
count = 0
stop_count = 7
with open("dataset_for_monitor.csv", "r") as csv_file:
    for row in csv_file:
        body = row[:-3]  # without the target
        if count%4 == 0: #send a random iris every fourth 
            invoke_csv_iris(output = False)
            count += 1
            if count > stop_count:
                break
        
        response = client.invoke_endpoint(
            EndpointName=endpoint_name, 
            ContentType=content_type,
            Accept=content_type,
            Body=body)
        time.sleep(1)
        count += 1
        if count > stop_count:
            break
    print(f'Processed {count} predictions.')

Examine executions:

from sagemaker.model_monitor import MonitoringExecution
!wget https://raw.githubusercontent.com/awslabs/amazon-sagemaker-examples/main/sagemaker_model_monitor/visualization/utils.py

import utils as mu

monitoring_executions = my_model_monitor.list_executions()
print("The ARN of the executions the monitor has made are: ")
for execution in monitoring_executions:
    print(20*"-")
    print("Execution ARN: {}".format(execution.describe()['ProcessingJobArn']))
latest_job_arn = my_model_monitor.list_executions()[-1].describe()['ProcessingJobArn']

execution = MonitoringExecution.from_processing_arn(
    sagemaker_session=session, processing_job_arn=latest_job_arn)
exec_inputs = {inp["InputName"]: inp for inp in execution.describe()["ProcessingInputs"]}
exec_results = execution.output.destination

baseline_statistics_filepath = (
    exec_inputs["baseline"]["S3Input"]["S3Uri"] if "baseline" in exec_inputs else None
)
execution_statistics_filepath = os.path.join(exec_results, "statistics.json")
violations_filepath = os.path.join(exec_results, "constraint_violations.json")
                 
baseline_statistics = (
    json.loads(S3Downloader.read_file(baseline_statistics_filepath))
    if baseline_statistics_filepath is not None
    else None
)
execution_statistics = json.loads(S3Downloader.read_file(execution_statistics_filepath))
violations = json.loads(S3Downloader.read_file(violations_filepath))["violations"]

mu.show_violation_df(
    baseline_statistics=baseline_statistics,
    latest_statistics=execution_statistics,
    violations=violations,
)
features = mu.get_features(execution_statistics)
feature_baselines = mu.get_features(baseline_statistics)
mu.show_distributions(features, feature_baselines)

Expected behavior I expect the monitor to interpret the collected data for each feature as that very feature and not any other feature.

Screenshots or logs image

System information A description of your system. Please provide:

  • SageMaker Python SDK version: 2.98.0
  • Framework name (eg. PyTorch) or algorithm (eg. KMeans): ScikitLearn decision tree
  • Framework version:
  • Python version 3.8.12.
  • CPU or GPU: CPU
  • Custom Docker image (Y/N): No.

arvgram avatar Aug 04 '22 10:08 arvgram