ml-commons
ml-commons copied to clipboard
[BUG] custom pre-processing function in ML connectors is returning `Invalid JSON in payload`
What is the bug? I am trying to deploy an ML model that connects to an external resource
I am trying to write a pre-process function that will pass the following request body to my Sagemaker endpoint
{
"inputs": [
{
"name": "query",
"shape": [${parameters.input.length}, 1],
"datatype": "BYTES",
"data": ${parameters.input}
}
]
}
I have created the ML connector and deployed the model following the steps below. When I do this I get the following Invalid JSON in payload error. Could someone assist me in understanding why the pre-processing function is not working as expected?
How can one reproduce the bug? Steps to reproduce the behavior:
- create connector
POST /_plugins/_ml/connectors/_create
{
"name": "my_connector",
"description": "My Connector",
"version": 1,
"protocol": "aws_sigv4",
"credential": {
"roleArn": "my_role_arn"
},
"parameters": {
"region": "us-west-2",
"service_name": "sagemaker"
},
"actions": [
{
"action_type": "predict",
"method": "POST",
"headers": {
"content-type": "application/json"
},
"url": "my_sagemaker_endpoint",
"request_body": "{ \"inputs\": [{ \"name\": \"query\", \"shape\": [${parameters.length}, 1], \"datatype\": \"BYTES\", \"data\": ${parameters.input} }] }",
"pre_process_function": "return '{\"length\": ' + params.text_docs.length + ', \"input\": ' + params.text_docs + ' }'",
"post_process_function": "def data = params.outputs[0].data; def n = Math.ceil(data.length / 512);def embeddings = [];for (int i = 0; i < n; i++) {embeddings.add(Arrays.copyOfRange(data, i * 512, Math.min(data.length, 512 * (i + 1))));}return '{\"name\": \"sentence_embedding\", \"data_type\": \"FLOAT32\", \"shape\": [' + n + '], \"data\": ' + embeddings + '}';"
}
]
}
- register and deploy model
POST /_plugins/_ml/models/_register
{
"name": "my_model",
"function_name": "remote",
"model_group_id": "my_model_group_id",
"description": "My Model",
"connector_id": "my_connector_id"
}
POST /_plugins/_ml/models/my_model_id/_deploy
- create ingest pipeline
PUT /_ingest/pipeline/my-ingest-pipeline
{
"description": "My ingest pipeline",
"processors": [
{
"convert": {
"field": "brand.id",
"type": "string"
}
},
{
"convert": {
"field": "category.id",
"type": "string"
}
},
{
"text_embedding": {
"model_id": "my_model_id",
"field_map": {
"name": {
"en": "name_vector"
},
"brand": {
"name": {
"en": "brand_name_vector"
}
},
"category": {
"name": {
"en": "category_name_vector"
}
}
}
}
}
]
}
- simulate pipeline and see error
POST _ingest/pipeline/my-ingest-pipeline/_simulate
{
"docs": [
{
"_index": "my-index",
"_id": "1",
"_source": {
"brand": {
"id": 1,
"name": {
"en": "brand 1"
}
},
"category": {
"id": 1,
"name": {
"en": "category 1"
}
}
}
}
]
}
What is the expected behavior?
Expected generated vector embeddings brand_name_vector and category_name_vector
What is your host/environment?
- Environment: AWS Opensearch - Managed Cluster
- OS: Opensearch
- Version: 2.11
Do you have any screenshots?
{
"docs": [
{
"error": {
"root_cause": [
{
"type": "illegal_argument_exception",
"reason": "Invalid JSON in payload"
}
],
"type": "illegal_argument_exception",
"reason": "Invalid JSON in payload"
}
}
]
}
Do you have any additional context? I am aware of signing the request using AWS SigV4 and providing the correct keys. The issue is not with creating the connector, I am able to create the connector fine.
My issue is when I deploy my model using the connector, and simulate the ingest pipeline, that is when I get the error described.
So here your pre-processing function is to translate the text docs to your model input, but seems not correct. Can you try this ?
"pre_process_function": " StringBuilder builder = new StringBuilder('[');\n \n for (int i=0; i<params.text_docs.length; i ++) {\n builder.append('\"');\n builder.append(escape(params.text_docs[i]));\n builder.append('\"');\n if (i<params.text_docs.length - 1) {\n builder.append(',');\n }\n }\n builder.append(']');\n \n def parameters = '{\"length\": ' + params.text_docs.length + ', \"input\": ' + builder + ' }';\n return '{\"parameters\": ' + parameters + '}';\n "
For post-processing function, need to know your model's output. Can you share the raw model output?
hi @ylwu-amzn, thank you for looking into this!
the raw output would look like [embeddings1, embeddings2, ..., embeddingsN]
where embeddings is float[]
also I noticed an ecape method that you added to the script. Is that something that exists in Painless scripting language or something I will need to create?
or perhaps I could use this if the Apache Commons package is installed in the Opensearch tool?
{
"docs": [
{
"error": {
"root_cause": [
{
"type": "script_exception",
"reason": "compile error",
"script_stack": [
"""... ');
builder.append(escape(params.text_docs[i ...""",
" ^---- HERE"
],
"script": """ StringBuilder builder = new StringBuilder('[');
for (int i=0; i<params.text_docs.length; i ++) {
builder.append('"');
builder.append(escape(params.text_docs[i]));
builder.append('"');
if (i<params.text_docs.length - 1) {
builder.append(',');
}
}
builder.append(']');
def parameters = '{"length": ' + params.text_docs.length + ', "input": ' + builder + ' }';
return '{"parameters": ' + parameters + '}';
""",
"lang": "painless",
"position": {
"offset": 155,
"start": 130,
"end": 180
}
}
],
"type": "script_exception",
"reason": "compile error",
"script_stack": [
"""... ');
builder.append(escape(params.text_docs[i ...""",
" ^---- HERE"
],
"script": """ StringBuilder builder = new StringBuilder('[');
for (int i=0; i<params.text_docs.length; i ++) {
builder.append('"');
builder.append(escape(params.text_docs[i]));
builder.append('"');
if (i<params.text_docs.length - 1) {
builder.append(',');
}
}
builder.append(']');
def parameters = '{"length": ' + params.text_docs.length + ', "input": ' + builder + ' }';
return '{"parameters": ' + parameters + '}';
""",
"lang": "painless",
"position": {
"offset": 155,
"start": 130,
"end": 180
},
"caused_by": {
"type": "illegal_argument_exception",
"reason": "Unknown call [escape] with [[org.opensearch.painless.node.EBrace@449daf]] arguments."
}
}
}
]
}
also I noticed an ecape method that you added to the script. Is that something that exists in Painless scripting language or something I will need to create?
No, that's a function we added in ml-commons (code link). Which added in 2.12 release. If you are using older version, you can manually copy the escape function to your pre/post process function
You can't use Apache Commons package
the raw output would look like [embeddings1, embeddings2, ..., embeddingsN] where embeddings is float[]
Just confirm the raw whole output doesn't contain any key ? For example
{
"embeddings": [
float[], float[]
]
}
If the raw output just [ float[], float[] ], I think you can try default post-process function
Thank you @ylwu-amzn ! The pre-process function almost worked. However the request body sent to my sagemaker endpoint now looks like
{
"inputs": [
{
"name": "query",
"shape": [
2.0, // error is because this is not an integer
1
],
"datatype": "BYTES",
"data": [
"brand 1",
"category 1"
]
}
]
}
I have tried
- type cast
int paramsLength = (int) params.text_docs.length - returning the integer value
int paramsLength = params.text_docs.length.intValue() - rounding to nearest integer
int paramsLength = Math.round(params.text_docs.length)
all 3 approaches were still returning a non-integer in the request body for sagemaker
see the error below when simulating the ingest pipeline
{
"docs": [
{
"error": {
"root_cause": [
{
"type": "status_exception",
"reason": """Error from remote service: {"ErrorCode":"CLIENT_ERROR_FROM_MODEL","LogStreamArn":"log_arn","Message":"Received client error (400) from primary with message \"{\"error\":\"Unable to parse 'shape': attempt to access JSON non-unsigned-integer as unsigned-integer\"}\". See log_url in account account_number for more information.","OriginalMessage":"{\"error\":\"Unable to parse 'shape': attempt to access JSON non-unsigned-integer as unsigned-integer\"}","OriginalStatusCode":400}"""
}
],
"type": "status_exception",
"reason": """Error from remote service: {"ErrorCode":"CLIENT_ERROR_FROM_MODEL","LogStreamArn":"my_log_arn","Message":"Received client error (400) from primary with message \"{\"error\":\"Unable to parse 'shape': attempt to access JSON non-unsigned-integer as unsigned-integer\"}\". See log_url in account account_number for more information.","OriginalMessage":"{\"error\":\"Unable to parse 'shape': attempt to access JSON non-unsigned-integer as unsigned-integer\"}","OriginalStatusCode":400}"""
}
}
]
}
Can you try this ? Wrap the params.text_docs.length with \"
"pre_process_function": " StringBuilder builder = new StringBuilder('[');\n \n for (int i=0; i<params.text_docs.length; i ++) {\n builder.append('\"');\n builder.append(escape(params.text_docs[i]));\n builder.append('\"');\n if (i<params.text_docs.length - 1) {\n builder.append(',');\n }\n }\n builder.append(']');\n \n def parameters = '{\"length\": \"' + params.text_docs.length + '\", \"input\": ' + builder + ' }';\n return '{\"parameters\": ' + parameters + '}';\n "
I tried your suggestion. The request body is now correct ✅
{
"inputs": [
{
"name": "query",
"shape": [
2, // correct, it is now integer
1
],
"datatype": "BYTES",
"data": [
"brand 1",
"category 1"
]
}
]
}
but I am getting this error now in the simulator
{
"docs": [
{
"error": {
"root_cause": [
{
"type": "class_cast_exception",
"reason": "class java.lang.String cannot be cast to class java.util.List (java.lang.String and java.util.List are in module java.base of loader 'bootstrap')"
}
],
"type": "class_cast_exception",
"reason": "class java.lang.String cannot be cast to class java.util.List (java.lang.String and java.util.List are in module java.base of loader 'bootstrap')"
}
}
]
}
"class java.lang.String cannot be cast to class java.util.List (java.lang.String and java.util.List are in module java.base of loader 'bootstrap')"
Hard to figure out why from this error. Can you share the log exception trace ?
I do not have a log trace because the request did not reach the sagemaker endpoint. I was expecting the error to contain Error from remote service, but it looks like the error occurs in the ML Commons Framework before the request is sent
Will be hard to guess what's wrong. If possible, you can reach out to me on OpenSearch slack and we can jump to a call? You can join the public ml channel
thank you @ylwu-amzn for joining a call to debug the issue!
here is the raw output of the sagemaker endpoint
{
"model_name": "search_ensemble",
"model_version": "1",
"parameters": {
"sequence_id": 0,
"sequence_start": false,
"sequence_end": false
},
"outputs": [
{
"name": "outputs",
"datatype": "FP32",
"shape": [
2,
512
],
"data": [ // (1024 x 1) array
-0.3834260106086731,
-0.36356380581855776,
-0.25114601850509646,
-0.12556827068328858,
-0.0514649897813797,
...
]
}
]
}
turns out the raw output is not a 2D array but a json object.
the data array is 1 dimensional, so it will need to be sliced into the correct shape.
We should be able to re-write the custom post-process function now based on this information. I will continue working on that while you investigate what the function should look like on your side.
Thanks again 🙏🏿
@toyaokeke Can you try these pre/post process function?
"pre_process_function": " StringBuilder builder = new StringBuilder('[');\n \n for (int i=0; i<params.text_docs.length; i ++) {\n builder.append('\"');\n builder.append(escape(params.text_docs[i]));\n builder.append('\"');\n if (i<params.text_docs.length - 1) {\n builder.append(',');\n }\n }\n builder.append(']');\n \n def parameters = '{\"length\": \"' + params.text_docs.length + '\", \"input\": ' + builder + ' }';\n return '{\"parameters\": ' + parameters + '}';\n ",
"post_process_function": "\n \n def dataType = \"FLOAT32\";\n \n \n if (params.outputs == null || params.outputs.length == 0)\n {\n return 'no embedding generated';\n }\n def outputs = params.outputs;\n def embedding_output = outputs[0];\n def embedding_num = embedding_output.shape[0].intValue();\n def embedding_dimension = embedding_output.shape[1].intValue();\n def embedding_data = embedding_output.data;\n \n def resultBuilder = new StringBuilder(\"[\");\n for (int i=0; i<embedding_num; i++) {\n resultBuilder.append('{\"name\": \"sentence_embedding\", \"data_type\": \"FLOAT32\", \"shape\": [');\n resultBuilder.append(embedding_dimension).append('],');\n \n resultBuilder.append('\"data\": [');\n for (int j=i*embedding_dimension; j<(i+1)*embedding_dimension; j++) {\n resultBuilder.append(embedding_data[j]);\n if (j<(i+1)*embedding_dimension - 1) {\n resultBuilder.append(',');\n }\n }\n resultBuilder.append(']}');\n if (i<embedding_num-1) {\n resultBuilder.append(',');\n }\n }\n resultBuilder.append(']');\n \n return resultBuilder.toString();\n "
Edit: As you are using OS 2.11, you should add the escape method to pre-process function, the whole pre-process function with escape function will be
"pre_process_function": "\n String escape(def input) { \n if (input.contains(\"\\\\\")) {\n input \u003d input.replace(\"\\\\\", \"\\\\\\\\\");\n }\n if (input.contains(\"\\\"\")) {\n input \u003d input.replace(\"\\\"\", \"\\\\\\\"\");\n }\n if (input.contains(\u0027\r\u0027)) {\n input \u003d input \u003d input.replace(\u0027\r\u0027, \u0027\\\\r\u0027);\n }\n if (input.contains(\"\\\\t\")) {\n input \u003d input.replace(\"\\\\t\", \"\\\\\\\\\\\\t\");\n }\n if (input.contains(\u0027\n\u0027)) {\n input \u003d input.replace(\u0027\n\u0027, \u0027\\\\n\u0027);\n }\n if (input.contains(\u0027\b\u0027)) {\n input \u003d input.replace(\u0027\b\u0027, \u0027\\\\b\u0027);\n }\n if (input.contains(\u0027\f\u0027)) {\n input \u003d input.replace(\u0027\f\u0027, \u0027\\\\f\u0027);\n }\n return input;\n }\n StringBuilder builder = new StringBuilder('[');\n \n for (int i=0; i<params.text_docs.length; i ++) {\n builder.append('\"');\n builder.append(escape(params.text_docs[i]));\n builder.append('\"');\n if (i<params.text_docs.length - 1) {\n builder.append(',');\n }\n }\n builder.append(']');\n \n def parameters = '{\"length\": \"' + params.text_docs.length + '\", \"input\": ' + builder + ' }';\n return '{\"parameters\": ' + parameters + '}';\n ",
The above pre_process_function has Unicode escape sequence , you can also use this
"pre_process_function": "\n String escape(def input) { \n if (input.contains(\"\\\\\")) {\n input = input.replace(\"\\\\\", \"\\\\\\\\\");\n }\n if (input.contains(\"\\\"\")) {\n input = input.replace(\"\\\"\", \"\\\\\\\"\");\n }\n if (input.contains('\r')) {\n input = input = input.replace('\r', '\\\\r');\n }\n if (input.contains(\"\\\\t\")) {\n input = input.replace(\"\\\\t\", \"\\\\\\\\\\\\t\");\n }\n if (input.contains('\n')) {\n input = input.replace('\n', '\\\\n');\n }\n if (input.contains('\b')) {\n input = input.replace('\b', '\\\\b');\n }\n if (input.contains('\f')) {\n input = input.replace('\f', '\\\\f');\n }\n return input;\n }\n StringBuilder builder = new StringBuilder('[');\n \n for (int i=0; i<params.text_docs.length; i ++) {\n builder.append('\"');\n builder.append(escape(params.text_docs[i]));\n builder.append('\"');\n if (i<params.text_docs.length - 1) {\n builder.append(',');\n }\n }\n builder.append(']');\n \n def parameters = '{\"length\": \"' + params.text_docs.length + '\", \"input\": ' + builder + ' }';\n return '{\"parameters\": ' + parameters + '}';\n "
@ylwu-amzn this is working as expected!! Thank you very much 🙏🏿
Close this issue as problem solved.