sagemaker-python-sdk
sagemaker-python-sdk copied to clipboard
Model monitor mix up features with target
Describe the bug When using a model monitor for my deployed decision tree model, the monitor mixes up the features and the target variable. That is: It interprets the collected model output, i.e. the target variable as feature no. 1, and interprets the collected feature no. 1 as feature no. 2, and feat no. 2 as feat no. 3 and so on. Target -> no 1. -> no.2 -> no 3. -> no. 4 -> Target. I sent the baseline dataset as input in order to make a histogram in which it is very clear. The distribution of the collected data matches the previous feature's baseline data.
To reproduce
# Create a model object from the pretrained model.
model_uri = string with the uri for the s3 bucket where model i stored.
model_name = f"my-decision-tree-model"
model = SKLearnModel(model_data= model_uri,
role=role,
entry_point= "Scripts/serve.py",
framework_version="0.23-1")
print('Model name: ' + model_name)
The serve
-function we are using is defined as:
%%writefile Scripts/serve.py
# The methods below are for model serving, which will be called by the SageMaker serving endpoint automatically:
# model_fn()
# input_fn()
# predict_fn()
# output_fn()
import os
import joblib
import json
import pandas as pd
import numpy as np
from io import StringIO
"""
Deserialize fitted model
"""
def model_fn(model_dir):
model = joblib.load(os.path.join(model_dir, "model.joblib"))
return model
"""
input_fn
request_body: The body of the request sent to the model.
request_content_type: (string) specifies the format/variable type of the request
"""
def input_fn(request_body, request_content_type):
if request_content_type == 'application/json':
request_body = json.loads(request_body)
inpVar = request_body['Input']
return inpVar
elif request_content_type == 'text/csv':
df = pd.read_csv(StringIO(request_body),header=None)
return df.values.tolist()
else:
raise ValueError("This model only supports text/csv or application/json input")
"""
predict_fn
input_data: returned array from input_fn above
model (sklearn model) returned model loaded from model_fn above
"""
def predict_fn(input_data, model):
return model.predict(input_data)
"""
output_fn
prediction: the returned value from predict_fn above
content_type: the content type the endpoint expects to be returned. Ex: JSON, string
"""
def output_fn(prediction, content_type):
if content_type == 'application/json':
res = prediction.tolist()
respJSON = {'Output': res}
return respJSON
elif content_type == 'text/csv':
s = StringIO()
np.savetxt(s, prediction, fmt='%d', delimiter=',', newline='\n')
return s.getvalue()
else:
raise ValueError("This model only supports text/csv or application/json input")
- Upload model to endpoint with endpoint data capture enabled:
endpoint_name = "model-endpoint"
print("Endpoint name (endpoint_name) =", endpoint_name)
data_capture_config = DataCaptureConfig(
enable_capture = True,
sampling_percentage = 100,
destination_s3_uri = endpoint_data_uri,
csv_content_types = ['text/csv', 'text/html']
)
predictor = model.deploy(
initial_instance_count=1,
instance_type="ml.t2.medium",
endpoint_name=endpoint_name,
data_capture_config=data_capture_config,
)
- Create a model monitor object and set an hourly schedule.
my_model_monitor = DefaultModelMonitor(
role=role,
instance_count=1,
instance_type="ml.t3.medium",
volume_size_in_gb=20,
max_runtime_in_seconds=1800,
sagemaker_session=session,
)
mon_schedule_name = my_monitor_schedule
my_model_monitor.create_monitoring_schedule(
monitor_schedule_name=mon_schedule_name,
endpoint_input=endpoint_name,
output_s3_uri = s3_report_path,
statistics = statistics_uri, #we point at uploaded statistics in s3
constraints = constraints_uri, #we point at uploaded constraints in s3
schedule_cron_expression=CronExpressionGenerator.hourly(),
enable_cloudwatch_metrics=True,
)
- Send some data to endpoint. I have made a loop that sends randomized input with features that are similar to the Iris dataset, mixed with input from the actual Iris dataset and see if it catches any drift etc.
def invoke_csv_iris(output = False):
import numpy as np
content_type = "text/csv"
sepal_length = 3 + np.random.rand()*5
sepal_width = 2 + np.random.rand()*2
petal_length = 1 + np.random.rand()*5
petal_width = np.random.rand()*3
random_iris_string = str(sepal_length) + "," + str(sepal_width) + "," + str(petal_length) + "," + str(petal_width)
response = client.invoke_endpoint(
EndpointName=endpoint_name,
ContentType=content_type,
Accept=content_type,
Body=random_iris_string)
r = response['Body'].read().decode()
if output:
print(type(r))
print(r)
content_type = "text/csv"
count = 0
stop_count = 7
with open("dataset_for_monitor.csv", "r") as csv_file:
for row in csv_file:
body = row[:-3] # without the target
if count%4 == 0: #send a random iris every fourth
invoke_csv_iris(output = False)
count += 1
if count > stop_count:
break
response = client.invoke_endpoint(
EndpointName=endpoint_name,
ContentType=content_type,
Accept=content_type,
Body=body)
time.sleep(1)
count += 1
if count > stop_count:
break
print(f'Processed {count} predictions.')
Examine executions:
from sagemaker.model_monitor import MonitoringExecution
!wget https://raw.githubusercontent.com/awslabs/amazon-sagemaker-examples/main/sagemaker_model_monitor/visualization/utils.py
import utils as mu
monitoring_executions = my_model_monitor.list_executions()
print("The ARN of the executions the monitor has made are: ")
for execution in monitoring_executions:
print(20*"-")
print("Execution ARN: {}".format(execution.describe()['ProcessingJobArn']))
latest_job_arn = my_model_monitor.list_executions()[-1].describe()['ProcessingJobArn']
execution = MonitoringExecution.from_processing_arn(
sagemaker_session=session, processing_job_arn=latest_job_arn)
exec_inputs = {inp["InputName"]: inp for inp in execution.describe()["ProcessingInputs"]}
exec_results = execution.output.destination
baseline_statistics_filepath = (
exec_inputs["baseline"]["S3Input"]["S3Uri"] if "baseline" in exec_inputs else None
)
execution_statistics_filepath = os.path.join(exec_results, "statistics.json")
violations_filepath = os.path.join(exec_results, "constraint_violations.json")
baseline_statistics = (
json.loads(S3Downloader.read_file(baseline_statistics_filepath))
if baseline_statistics_filepath is not None
else None
)
execution_statistics = json.loads(S3Downloader.read_file(execution_statistics_filepath))
violations = json.loads(S3Downloader.read_file(violations_filepath))["violations"]
mu.show_violation_df(
baseline_statistics=baseline_statistics,
latest_statistics=execution_statistics,
violations=violations,
)
features = mu.get_features(execution_statistics)
feature_baselines = mu.get_features(baseline_statistics)
mu.show_distributions(features, feature_baselines)
Expected behavior I expect the monitor to interpret the collected data for each feature as that very feature and not any other feature.
Screenshots or logs
System information A description of your system. Please provide:
- SageMaker Python SDK version: 2.98.0
- Framework name (eg. PyTorch) or algorithm (eg. KMeans): ScikitLearn decision tree
- Framework version:
- Python version 3.8.12.
- CPU or GPU: CPU
- Custom Docker image (Y/N): No.