ml-commons
ml-commons copied to clipboard
[BUG] Error using Bedrock connector for Claud V2
What is the bug? This query:
response = os_client.search(index='converse', body=
{
"query": {
"simple_query_string": {
"query": "old faded blue jeans",
"fields": ["question_text"]
}
},
"ext": {
"generative_qa_parameters": {
"llm_model": "anthropic.claude-v2",
"llm_question": "old faded blue jeans",
"memory_id": f"{memory_id}",
"context_size": 5,
"message_size": 5,
"timeout": 15
}
}
})
Gives the following error
Traceback (most recent call last):
File "/Users/handler/code/gdata/converse.py", line 180, in <module>
response = os_client.search(index='converse', body=
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/handler/code/gdata/.venv/lib/python3.11/site-packages/opensearchpy/client/utils.py", line 176, in _wrapped
return func(*args, params=params, headers=headers, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/handler/code/gdata/.venv/lib/python3.11/site-packages/opensearchpy/client/__init__.py", line 2364, in search
return self.transport.perform_request(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/handler/code/gdata/.venv/lib/python3.11/site-packages/opensearchpy/transport.py", line 455, in perform_request
raise e
File "/Users/handler/code/gdata/.venv/lib/python3.11/site-packages/opensearchpy/transport.py", line 416, in perform_request
status, headers_response, data = connection.perform_request(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/handler/code/gdata/.venv/lib/python3.11/site-packages/opensearchpy/connection/http_urllib3.py", line 308, in perform_request
self._raise_error(
File "/Users/handler/code/gdata/.venv/lib/python3.11/site-packages/opensearchpy/connection/base.py", line 315, in _raise_error
raise HTTP_EXCEPTIONS.get(status_code, TransportError)(
opensearchpy.exceptions.RequestError: RequestError(400, 'status_exception', 'Error validating input schema: Validation failed: [$.parameters: required property \'inputs\' not found] for instance: {"algorithm":"REMOTE","parameters":{"messages":"[{\\"role\\":\\"system\\",\\"content\\":\\"You are a helpful assistant\\"},{\\"role\\":\\"user\\",\\"content\\":\\"Generate a concise and informative answer in less than 100 words for the given question\\"},{\\"role\\":\\"user\\",\\"content\\":\\"QUESTION: faded old blue jeans\\"},{\\"role\\":\\"user\\",\\"content\\":\\"ANSWER:\\"}]","model":"anthropic.claude-v2"},"action_type":null} with schema: {\n "type": "object",\n "properties": {\n "parameters": {\n "type": "object",\n "properties": {\n "inputs": {\n "type": "string"\n }\n },\n "required": [\n "inputs"\n ]\n }\n },\n "required": [\n "parameters"\n ]\n}')
I followed the documentation here: https://opensearch.org/docs/latest/search-plugins/conversational-search/ and created the following code:
import boto3
import json
from opensearchpy import OpenSearch
from opensearchpy.helpers import bulk
import os
import uuid
OPENSEARCH_HOST = os.environ.get('OPENSEARCH_HOST', 'localhost')
OPENSEARCH_PORT = os.environ.get('OPENSEARCH_PORT', 9200)
OPENSEARCH_AUTH = (os.environ.get('OPENSEARCH_ADMIN_USER', 'admin'),
os.environ.get('OPENSEARCH_ADMIN_PASSWORD', ''))
REGION = os.environ.get('AWS_REGION', 'us-west-2')
# Step 0 set up
os_client = OpenSearch(
hosts = [{'host': OPENSEARCH_HOST, 'port': OPENSEARCH_PORT}],
http_auth = OPENSEARCH_AUTH,
use_ssl = True,
verify_certs = False,
ssl_assert_hostname = False,
ssl_show_warn = False,
)
os_client.cluster.put_settings(body={
"persistent": {
"plugins.ml_commons.memory_feature_enabled": True,
"plugins.ml_commons.rag_pipeline_feature_enabled": True,
"plugins.ml_commons.trusted_connector_endpoints_regex": [
"^https://bedrock-runtime\\..*[a-z0-9-]\\.amazonaws\\.com/.*$"
]
}
})
session = boto3.client('sts', REGION).get_session_token()
## Step 1: Create a connector to a model
response = os_client.transport.perform_request('POST', '/_plugins/_ml/connectors/_create',
body={
"name": "Amazon Bedrock",
"description": "Test connector for Amazon Bedrock",
"version": 1,
"protocol": "aws_sigv4",
"credential": {
"access_key": session['Credentials']['AccessKeyId'],
"secret_key": session['Credentials']['SecretAccessKey'],
"session_token": session['Credentials']['SessionToken']
},
"parameters": {
"region": f"{REGION}",
"service_name": "bedrock",
"model": "anthropic.claude-v2"
},
"actions": [
{
"action_type": "predict",
"method": "POST",
"headers": {
"content-type": "application/json"
},
"url": "https://bedrock-runtime.${parameters.region}.amazonaws.com/model/${parameters.model}/invoke",
"request_body": "{\"prompt\":\"\\n\\nHuman: ${parameters.inputs}\\n\\nAssistant:\",\"max_tokens_to_sample\":300,\"temperature\":0.5,\"top_k\":250,\"top_p\":1,\"stop_sequences\":[\"\\\\n\\\\nHuman:\"]}"
}
]
})
connector_id = response['connector_id']
print(f"Connector ID: {connector_id}")
## Step 2: Register and deploy the model
response = os_client.transport.perform_request('POST', '/_plugins/_ml/models/_register',
body={
"name": "Amazon Bedrock",
"function_name": 'remote',
'description': 'bedrock',
'connector_id': f'{connector_id}'
})
task_id = response['task_id']
status = response['status']
while status != 'COMPLETED':
response = os_client.transport.perform_request('GET', f'/_plugins/_ml/tasks/{task_id}')
status = response['state']
print(f"Task status: {status}")
model_id = response['model_id']
print(f'Model ID: {model_id}')
response = os_client.transport.perform_request('POST', f'/_plugins/_ml/models/{model_id}/_deploy', body='')
print(f"Model deploy response: {response}")
## Step 3: Create a search pipeline
response = os_client.transport.perform_request('PUT', f'/_search/pipeline/rag_pipeline',
body={
"response_processors": [
{
"retrieval_augmented_generation": {
"tag": "conversation demo",
"description": "Demo pipeline Using Bedrock Connector",
"model_id": f"{model_id}",
"context_field_list": ["text"],
"system_prompt": "You are a helpful assistant",
"user_instructions": "Generate a concise and informative answer in less than 100 words for the given question"
}
}
]
})
## Step 4: Ingest RAG data into an index
os_client.indices.delete(index='converse', ignore=[400, 404])
os_client.indices.create(index='converse', body={
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1,
"index.search.default_pipeline" : "rag_pipeline"
},
"mappings": {
"properties": {
"question_id": {"type": "keyword"},
"title": { "type": "text" },
"question_text": {"type": "text"},
"asin": {"type": "keyword"},
"bullet_point1": {"type": "text"},
"bullet_point2": {"type": "text"},
"bullet_point3": {"type": "text"},
"bullet_point4": {"type": "text"},
"bullet_point5": {"type": "text"},
"product_description": {"type": "text"},
"brand_name": {"type": "keyword"},
"item_name": {"type": "text"},
"question_type": {"type": "keyword"},
"answer_aggregated": {"type": "keyword"},
"answers": {
"properties": {
"answer_text": {"type": "text"}
}}}}})
rag_source = './datasets/amazon_pqa/amazon_pqa_jeans.json'
with open(rag_source, 'r') as f:
nline = 0
buffer = []
for line in f:
data = json.loads(line)
buffer.append(
{
"_op_type": "create",
"_index": 'converse',
"_source": data
}
)
nline += 1
if nline >= 5000:
print(nline, ' lines processed')
bulk(os_client, buffer)
break
if nline % 5000 == 0:
print(nline, ' lines processed')
bulk(os_client, buffer)
buffer = []
## RAG pipeline
## Step 5: Create a conversation memory
conversation_name = f'conversation-{str(uuid.uuid1())[:8]}'
response = os_client.transport.perform_request('POST', '/_plugins/_ml/memory/',
body={"name": conversation_name})
memory_id = response['memory_id']
print(f'Memory ID: {memory_id}')
##Step 6: Use the pipeline for RAG
response = os_client.search(index='converse', body=
{
"query": {
"simple_query_string": {
"query": "old faded blue jeans",
"fields": ["question_text"]
}
},
"ext": {
"generative_qa_parameters": {
"llm_model": "anthropic.claude-v2",
"llm_question": "faded old blue jeans",
"memory_id": f"{memory_id}",
"context_size": 5,
"message_size": 5,
"timeout": 15
}
}
})
print(response)
How can one reproduce the bug? Steps to reproduce the behavior: The above code will reproduce the error. The data came from https://registry.opendata.aws/amazon-pqa/
What is the expected behavior? I should get a text response from the model
What is your host/environment?
- OS: MacOS
- Ventura 13.6.9
- Docker, 2 data nodes, 1 ml node
- Plugins
opensearch-ml-node opensearch-alerting 2.16.0.0
opensearch-ml-node opensearch-anomaly-detection 2.16.0.0
opensearch-ml-node opensearch-asynchronous-search 2.16.0.0
opensearch-ml-node opensearch-cross-cluster-replication 2.16.0.0
opensearch-ml-node opensearch-custom-codecs 2.16.0.0
opensearch-ml-node opensearch-flow-framework 2.16.0.0
opensearch-ml-node opensearch-geospatial 2.16.0.0
opensearch-ml-node opensearch-index-management 2.16.0.0
opensearch-ml-node opensearch-job-scheduler 2.16.0.0
opensearch-ml-node opensearch-knn 2.16.0.0
opensearch-ml-node opensearch-ml 2.16.0.0
opensearch-ml-node opensearch-neural-search 2.16.0.0
opensearch-ml-node opensearch-notifications 2.16.0.0
opensearch-ml-node opensearch-notifications-core 2.16.0.0
opensearch-ml-node opensearch-observability 2.16.0.0
opensearch-ml-node opensearch-performance-analyzer 2.16.0.0
opensearch-ml-node opensearch-reports-scheduler 2.16.0.0
opensearch-ml-node opensearch-security 2.16.0.0
opensearch-ml-node opensearch-security-analytics 2.16.0.0
opensearch-ml-node opensearch-skills 2.16.0.0
opensearch-ml-node opensearch-sql 2.16.0.0
opensearch-ml-node query-insights 2.16.0.0
Do you have any screenshots? N/A
Additional context If I run the following in Dev Tools, I get a different error
GET converse/_search
{
"query": {
"simple_query_string": {
"query": "old faded blue jeans",
"fields": ["question_text"]
}
},
"ext": {
"generative_qa_parameters": {
"llm_model": "anthropic.claude-v2",
"llm_question": "faded old blue jeans",
"memory_id": "v21hA5IBrpFbJSuYur7o",
"context_size": 5,
"message_size": 5,
"timeout": 15
}
}
}
{
"error": {
"root_cause": [
{
"type": "runtime_exception",
"reason": "Context text not found in search hit {\n \"_index\" : \"converse\",\n \"_id\" : \"o21qA5IBrpFbJSuY4uy6\",\n \"_score\" : 13.451588,\n \"_source\" : {\n \"question_id\" : \"Tx1S3K9MJ9OVLJI\",\n \"question_text\" : \"Is andi faded blue\",\n \"asin\" : \"B004G9Q83E\",\n \"bullet_point1\" : \"Sits below waist\",\n \"bullet_point2\" : \"Slim through seat and thigh\",\n \"bullet_point3\" : \"Boot cut leg\",\n \"bullet_point4\" : \"\",\n \"bullet_point5\" : \"\",\n \"product_description\" : \"\",\n \"brand_name\" : \"Levi's\",\n \"item_name\" : \"Levi's Mens 527 Bootcut Jean, Andi, 30-32\",\n \"question_type\" : \"yes-no\",\n \"answer_aggregated\" : \"no\",\n \"answers\" : [\n {\n \"answer_text\" : \"It is more of a navy. It looks a little faded, but is not light.\"\n },\n {\n \"answer_text\" : \"no\"\n },\n {\n \"answer_text\" : \"No, these are a dark wash\"\n }\n ]\n }\n}"
}
],
"type": "runtime_exception",
"reason": "Context text not found in search hit {\n \"_index\" : \"converse\",\n \"_id\" : \"o21qA5IBrpFbJSuY4uy6\",\n \"_score\" : 13.451588,\n \"_source\" : {\n \"question_id\" : \"Tx1S3K9MJ9OVLJI\",\n \"question_text\" : \"Is andi faded blue\",\n \"asin\" : \"B004G9Q83E\",\n \"bullet_point1\" : \"Sits below waist\",\n \"bullet_point2\" : \"Slim through seat and thigh\",\n \"bullet_point3\" : \"Boot cut leg\",\n \"bullet_point4\" : \"\",\n \"bullet_point5\" : \"\",\n \"product_description\" : \"\",\n \"brand_name\" : \"Levi's\",\n \"item_name\" : \"Levi's Mens 527 Bootcut Jean, Andi, 30-32\",\n \"question_type\" : \"yes-no\",\n \"answer_aggregated\" : \"no\",\n \"answers\" : [\n {\n \"answer_text\" : \"It is more of a navy. It looks a little faded, but is not light.\"\n },\n {\n \"answer_text\" : \"no\"\n },\n {\n \"answer_text\" : \"No, these are a dark wash\"\n }\n ]\n }\n}"
},
"status": 500
}