Unable to run embeddings model from sagemaker
Context
Hi everyone,
What I'm trying to achieve is to run privateGPT with some production-grade environment. To do so, I've tried to run something like :
- Create a Qdrant database in Qdrant cloud
- Run LLM model and embedding model through Sagemaker
For now I'm getting stuck when running embedding model from sagemaker.
How to reproduce
I've tried to have the simplest setup to reproduce, if you want me to test anything else, do not hesitate to ask me.
- Create a new profile sagemaker with settings-sagemaker.yaml :
server:
env_name: ${APP_ENV:prod}
port: ${PORT:8001}
ui:
enabled: true
path: /
llm:
mode: sagemaker
embedding:
# Should be matching the value above in most cases
mode: sagemaker
ingest_mode: simple
sagemaker:
llm_endpoint_name: TheBloke-Mistral-7B-Instruct-v0-1-GPTQ # Should have been deployed first to sagemaker
embedding_endpoint_name: BAAI-bge-large-en-v1-5 # Should have been deployed first to sagemaker
- Run
PGPT_PROFILES=sagemaker make run - Ingest a file through the UI
Actual behavior
- The UI is sending a weird output
- In the application logs we have something like :
15:32:24.360 [INFO ] private_gpt.server.ingest.ingest_service - Ingesting file_names=['en_withColumnsRenamed.md']
Parsing nodes: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<00:00, 571.20it/s]
Generating embeddings: 0%| | 0/5 [00:00<?, ?it/s]Traceback (most recent call last):
File "/home/aberges/venv/privateGPT/lib/python3.11/site-packages/gradio/queueing.py", line 456, in call_prediction
output = await route_utils.call_process_api(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/aberges/venv/privateGPT/lib/python3.11/site-packages/gradio/route_utils.py", line 232, in call_process_api
output = await app.get_blocks().process_api(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/aberges/venv/privateGPT/lib/python3.11/site-packages/gradio/blocks.py", line 1522, in process_api
result = await self.call_function(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/aberges/venv/privateGPT/lib/python3.11/site-packages/gradio/blocks.py", line 1144, in call_function
prediction = await anyio.to_thread.run_sync(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/aberges/venv/privateGPT/lib/python3.11/site-packages/anyio/to_thread.py", line 33, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/aberges/venv/privateGPT/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 877, in run_sync_in_worker_thread
return await future
^^^^^^^^^^^^
File "/home/aberges/venv/privateGPT/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 807, in run
result = context.run(func, *args)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/aberges/venv/privateGPT/lib/python3.11/site-packages/gradio/utils.py", line 674, in wrapper
response = f(*args, **kwargs)
^^^^^^^^^^^^^^^^^^
File "/mnt/c/Users/aberges/IdeaProjects/SSG/TRVS/privateGPT/private_gpt/ui/ui.py", line 172, in _upload_file
self._ingest_service.bulk_ingest([(str(path.name), path) for path in paths])
File "/mnt/c/Users/aberges/IdeaProjects/SSG/TRVS/privateGPT/private_gpt/server/ingest/ingest_service.py", line 84, in bulk_ingest
documents = self.ingest_component.bulk_ingest(files)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/c/Users/aberges/IdeaProjects/SSG/TRVS/privateGPT/private_gpt/components/ingest/ingest_component.py", line 130, in bulk_ingest
saved_documents.extend(self._save_docs(documents))
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/c/Users/aberges/IdeaProjects/SSG/TRVS/privateGPT/private_gpt/components/ingest/ingest_component.py", line 137, in _save_docs
self._index.insert(document, show_progress=True)
File "/home/aberges/venv/privateGPT/lib/python3.11/site-packages/llama_index/indices/base.py", line 191, in insert
nodes = run_transformations(
^^^^^^^^^^^^^^^^^^^^
File "/home/aberges/venv/privateGPT/lib/python3.11/site-packages/llama_index/ingestion/pipeline.py", line 70, in run_transformations
nodes = transform(nodes, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/aberges/venv/privateGPT/lib/python3.11/site-packages/llama_index/embeddings/base.py", line 334, in __call__
embeddings = self.get_text_embedding_batch(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/aberges/venv/privateGPT/lib/python3.11/site-packages/llama_index/embeddings/base.py", line 255, in get_text_embedding_batch
embeddings = self._get_text_embeddings(cur_batch)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/c/Users/aberges/IdeaProjects/SSG/TRVS/privateGPT/private_gpt/components/embedding/custom/sagemaker.py", line 82, in _get_text_embeddings
return self._embed(texts)
^^^^^^^^^^^^^^^^^^
File "/mnt/c/Users/aberges/IdeaProjects/SSG/TRVS/privateGPT/private_gpt/components/embedding/custom/sagemaker.py", line 60, in _embed
return response_json["vectors"]
~~~~~~~~~~~~~^^^^^^^^^^^
TypeError: list indices must be integers or slices, not str
Traceback (most recent call last):
File "/home/aberges/venv/privateGPT/lib/python3.11/site-packages/gradio/queueing.py", line 456, in call_prediction
output = await route_utils.call_process_api(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/aberges/venv/privateGPT/lib/python3.11/site-packages/gradio/route_utils.py", line 232, in call_process_api
output = await app.get_blocks().process_api(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/aberges/venv/privateGPT/lib/python3.11/site-packages/gradio/blocks.py", line 1522, in process_api
result = await self.call_function(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/aberges/venv/privateGPT/lib/python3.11/site-packages/gradio/blocks.py", line 1144, in call_function
prediction = await anyio.to_thread.run_sync(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/aberges/venv/privateGPT/lib/python3.11/site-packages/anyio/to_thread.py", line 33, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/aberges/venv/privateGPT/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 877, in run_sync_in_worker_thread
return await future
^^^^^^^^^^^^
File "/home/aberges/venv/privateGPT/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 807, in run
result = context.run(func, *args)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/aberges/venv/privateGPT/lib/python3.11/site-packages/gradio/utils.py", line 674, in wrapper
response = f(*args, **kwargs)
^^^^^^^^^^^^^^^^^^
File "/mnt/c/Users/aberges/IdeaProjects/SSG/TRVS/privateGPT/private_gpt/ui/ui.py", line 172, in _upload_file
self._ingest_service.bulk_ingest([(str(path.name), path) for path in paths])
File "/mnt/c/Users/aberges/IdeaProjects/SSG/TRVS/privateGPT/private_gpt/server/ingest/ingest_service.py", line 84, in bulk_ingest
documents = self.ingest_component.bulk_ingest(files)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/c/Users/aberges/IdeaProjects/SSG/TRVS/privateGPT/private_gpt/components/ingest/ingest_component.py", line 130, in bulk_ingest
saved_documents.extend(self._save_docs(documents))
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/c/Users/aberges/IdeaProjects/SSG/TRVS/privateGPT/private_gpt/components/ingest/ingest_component.py", line 137, in _save_docs
self._index.insert(document, show_progress=True)
File "/home/aberges/venv/privateGPT/lib/python3.11/site-packages/llama_index/indices/base.py", line 191, in insert
nodes = run_transformations(
^^^^^^^^^^^^^^^^^^^^
File "/home/aberges/venv/privateGPT/lib/python3.11/site-packages/llama_index/ingestion/pipeline.py", line 70, in run_transformations
nodes = transform(nodes, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/aberges/venv/privateGPT/lib/python3.11/site-packages/llama_index/embeddings/base.py", line 334, in __call__
embeddings = self.get_text_embedding_batch(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/aberges/venv/privateGPT/lib/python3.11/site-packages/llama_index/embeddings/base.py", line 255, in get_text_embedding_batch
embeddings = self._get_text_embeddings(cur_batch)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/c/Users/aberges/IdeaProjects/SSG/TRVS/privateGPT/private_gpt/components/embedding/custom/sagemaker.py", line 82, in _get_text_embeddings
return self._embed(texts)
^^^^^^^^^^^^^^^^^^
File "/mnt/c/Users/aberges/IdeaProjects/SSG/TRVS/privateGPT/private_gpt/components/embedding/custom/sagemaker.py", line 60, in _embed
return response_json["vectors"]
~~~~~~~~~~~~~^^^^^^^^^^^
TypeError: list indices must be integers or slices, not str
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/aberges/venv/privateGPT/lib/python3.11/site-packages/gradio/queueing.py", line 501, in process_events
response = await self.call_prediction(awake_events, batch)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/aberges/venv/privateGPT/lib/python3.11/site-packages/gradio/queueing.py", line 465, in call_prediction
raise Exception(str(error) if show_error else None) from error
Exception: list indices must be integers or slices, not str
Expected behavior
To be able to use sagemaker embedded sentences in our vector database.
I think I've found a solution to my own problem.
Actually, I think that the current embedding code for sagemaker as some untold prerequisites (like a special inference script).
I was able to make it work with the following setup (mostly extracted from here) :
- Create a custom inference script for your embedding model under scripts/sagemaker/embeddings/inference.py
"""File to use for inference of embedding model.
Mostly extracted from https://medium.com/@domemue/deploy-bge-embedding-models-via-aws-sagemaker-8e8bbe08b558
"""
import torch
from transformers import AutoModel, AutoTokenizer
def model_fn(model_dir):
# load model and processor from model_dir
model = AutoModel.from_pretrained(model_dir)
tokenizer = AutoTokenizer.from_pretrained(model_dir)
return model, tokenizer
def predict_fn(data, model_and_tokenizer):
# unpack model and tokenizer
model, tokenizer = model_and_tokenizer
# process input
inputs = data.pop("inputs", data)
# Tokenize input sentences
encoded_input = tokenizer(
inputs, padding=True, truncation=True, return_tensors="pt"
)
# Compute token sagemaker.embeddings
with torch.no_grad():
model_output = model(**encoded_input)
# Perform pooling. In this case, cls pooling.
sentence_embeddings = model_output[0][:, 0]
# normalize sagemaker.embeddings
sentence_embeddings = torch.nn.functional.normalize(sentence_embeddings, p=2, dim=1)
return {"vectors": sentence_embeddings} ## This is mostly this line that "solves" my issue
- Create a script to deploy my model scripts/sagemaker/upload_embeddings.py
import argparse
import logging
import os
import shutil
import tarfile
from pathlib import Path
from tempfile import TemporaryDirectory
import sagemaker
from huggingface_hub import snapshot_download
from sagemaker.huggingface import HuggingFaceModel
from sagemaker.s3 import S3Uploader
from common import get_execution_role, get_sagemaker_compliant_name
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
def compress(tar_dir=None, output_file="model.tar.gz") -> str:
"""Package embedding model as tar gz.
:param tar_dir: Directory
:param output_file: Output file name
:return:
"""
parent_dir = os.getcwd()
try:
os.chdir(tar_dir)
with tarfile.open(os.path.join(parent_dir, output_file), "w:gz") as tar:
for item in os.listdir("."):
logger.info("Adding %s in %s", item, output_file)
tar.add(item, arcname=item)
finally:
os.chdir(parent_dir)
return output_file
if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog="Upload Embedding on sagemaker",
description="Helper script to deploy embedding on sagemaker",
)
parser.add_argument(
"--aws-role",
dest="aws_role",
default="exec-role",
help="Sagemaker role to use to deploy model",
type=str,
)
parser.add_argument(
"--model-name",
dest="model_name",
default="BAAI/bge-large-en-v1.5",
help="Hugging face model name to deploy to sagemaker",
type=str,
)
parser.add_argument(
"--vm-size",
dest="vm_size",
default="ml.m5.xlarge",
help="Sagemaker VM size to use to deploy model.",
type=str,
)
parser.add_argument(
"--transformers-version",
dest="transformers_version",
default="4.26.0",
help="Hugging face transformers version to use to deploy the model.",
type=str,
)
parser.add_argument(
"--pytorch-version",
dest="pytorch_version",
default="1.13.1",
help="Pytorch version to use to deploy the model.",
type=str,
)
parser.add_argument(
"--python-version",
dest="python_version",
default="py39",
help="Python version to use to deploy the model.",
type=str,
)
parser.add_argument(
"--download-model",
default=True,
action=argparse.BooleanOptionalAction,
help="Enable/Disable downloading of the embedding model.",
)
args = parser.parse_args()
role = get_execution_role(default_role_name=args.aws_role)
# Hub Model configuration. https://huggingface.co/models
hub = {
"HF_MODEL_ID": args.model_name,
"HF_TASK": "feature-extraction",
}
compliant_name = get_sagemaker_compliant_name(model_name=hub["HF_MODEL_ID"])
sess = sagemaker.Session()
# sagemaker session bucket -> used for uploading data, models and logs
# we need the bucket since we will package the model with our own inference script
# sagemaker will automatically create this bucket if it does not exist
sagemaker_session_bucket = None
if sagemaker_session_bucket is None and sess is not None:
# set to default bucket if a bucket name is not given
sagemaker_session_bucket = sess.default_bucket()
sess = sagemaker.Session(default_bucket=sagemaker_session_bucket)
# Custom model packaging
model_tar_dir = Path(hub["HF_MODEL_ID"].split("/")[-1])
if args.download_model:
logger.info("Downloading model %s", hub["HF_MODEL_ID"])
# delete model dir if exist
if os.path.exists(model_tar_dir) and os.path.isdir(model_tar_dir):
shutil.rmtree(model_tar_dir)
# setup temporary directory
with TemporaryDirectory() as tmpdir:
# download snapshot
snapshot_dir = snapshot_download(
repo_id=hub["HF_MODEL_ID"], cache_dir=tmpdir
)
logger.info(os.listdir(snapshot_dir))
# copy snapshot to model dir
logger.info("%s -> %s", snapshot_dir, str(model_tar_dir))
shutil.copytree(snapshot_dir, str(model_tar_dir))
# Copy custom inference script into the model directory
code_in_model_dir = str(model_tar_dir.joinpath("code"))
# delete code dir in model dir if exist
if os.path.exists(code_in_model_dir) and os.path.isdir(code_in_model_dir):
shutil.rmtree(code_in_model_dir)
# copy embeddings/ to model dir
current_path = Path(__file__).parent.resolve()
shutil.copytree(f"{current_path}/embeddings", code_in_model_dir)
# Create model archive
model_tar = compress(str(model_tar_dir))
# Upload model
s3_model_uri = S3Uploader.upload(
local_path=model_tar,
desired_s3_uri=f"s3://{sess.default_bucket()}/{compliant_name}",
)
logger.info(f"model uploaded to: {s3_model_uri}")
logger.info("Starting deployment of model on Sagemaker")
# create Hugging Face Model Class
huggingface_model = HuggingFaceModel(
transformers_version=args.transformers_version,
pytorch_version=args.pytorch_version,
py_version=args.python_version,
env=hub,
role=role,
name=compliant_name,
model_data=s3_model_uri,
)
# deploy model to SageMaker Inference
predictor = huggingface_model.deploy(
initial_instance_count=1, # number of instances
instance_type=args.vm_size, # ec2 instance type
endpoint_name=compliant_name,
)
Once my embedding is deployed through my script, this is working fine :)
You are right, deploying an embeddings model to Sagemaker is not well documented anywhere on the internet. We had to go through the same process.
I don't think PGPT documentation is the right place though. I'd encourage you to write a blog post! I'd definitely re-share it
Closed by #1437
@imartinez sorry i closed the wrong issue ... Could you re-open it may be ? (For people to easily find the code to deploy model while I (or anyone else) have not write the associated blog ?
@imartinez sorry i closed the wrong issue ... Could you re-open it may be ? (For people to easily find the code to deploy model while I (or anyone else) have not write the associated blog ?
Reopened. We could maybe start by documenting in the docs.
Hi, I am in the same endeavour, and this post could boost our project. @LvffY did you finally wrote a blog post on how to do this? I will reposted of course.
I think I've found a solution to my own problem.
Actually, I think that the current embedding code for sagemaker as some untold prerequisites (like a special inference script).
I was able to make it work with the following setup (mostly extracted from here) :
- Create a custom inference script for your embedding model under scripts/sagemaker/embeddings/inference.py
"""File to use for inference of embedding model. Mostly extracted from https://medium.com/@domemue/deploy-bge-embedding-models-via-aws-sagemaker-8e8bbe08b558 """ import torch from transformers import AutoModel, AutoTokenizer def model_fn(model_dir): # load model and processor from model_dir model = AutoModel.from_pretrained(model_dir) tokenizer = AutoTokenizer.from_pretrained(model_dir) return model, tokenizer def predict_fn(data, model_and_tokenizer): # unpack model and tokenizer model, tokenizer = model_and_tokenizer # process input inputs = data.pop("inputs", data) # Tokenize input sentences encoded_input = tokenizer( inputs, padding=True, truncation=True, return_tensors="pt" ) # Compute token sagemaker.embeddings with torch.no_grad(): model_output = model(**encoded_input) # Perform pooling. In this case, cls pooling. sentence_embeddings = model_output[0][:, 0] # normalize sagemaker.embeddings sentence_embeddings = torch.nn.functional.normalize(sentence_embeddings, p=2, dim=1) return {"vectors": sentence_embeddings} ## This is mostly this line that "solves" my issue
- Create a script to deploy my model scripts/sagemaker/upload_embeddings.py
import argparse import logging import os import shutil import tarfile from pathlib import Path from tempfile import TemporaryDirectory import sagemaker from huggingface_hub import snapshot_download from sagemaker.huggingface import HuggingFaceModel from sagemaker.s3 import S3Uploader from common import get_execution_role, get_sagemaker_compliant_name logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) def compress(tar_dir=None, output_file="model.tar.gz") -> str: """Package embedding model as tar gz. :param tar_dir: Directory :param output_file: Output file name :return: """ parent_dir = os.getcwd() try: os.chdir(tar_dir) with tarfile.open(os.path.join(parent_dir, output_file), "w:gz") as tar: for item in os.listdir("."): logger.info("Adding %s in %s", item, output_file) tar.add(item, arcname=item) finally: os.chdir(parent_dir) return output_file if __name__ == "__main__": parser = argparse.ArgumentParser( prog="Upload Embedding on sagemaker", description="Helper script to deploy embedding on sagemaker", ) parser.add_argument( "--aws-role", dest="aws_role", default="exec-role", help="Sagemaker role to use to deploy model", type=str, ) parser.add_argument( "--model-name", dest="model_name", default="BAAI/bge-large-en-v1.5", help="Hugging face model name to deploy to sagemaker", type=str, ) parser.add_argument( "--vm-size", dest="vm_size", default="ml.m5.xlarge", help="Sagemaker VM size to use to deploy model.", type=str, ) parser.add_argument( "--transformers-version", dest="transformers_version", default="4.26.0", help="Hugging face transformers version to use to deploy the model.", type=str, ) parser.add_argument( "--pytorch-version", dest="pytorch_version", default="1.13.1", help="Pytorch version to use to deploy the model.", type=str, ) parser.add_argument( "--python-version", dest="python_version", default="py39", help="Python version to use to deploy the model.", type=str, ) parser.add_argument( "--download-model", default=True, action=argparse.BooleanOptionalAction, help="Enable/Disable downloading of the embedding model.", ) args = parser.parse_args() role = get_execution_role(default_role_name=args.aws_role) # Hub Model configuration. https://huggingface.co/models hub = { "HF_MODEL_ID": args.model_name, "HF_TASK": "feature-extraction", } compliant_name = get_sagemaker_compliant_name(model_name=hub["HF_MODEL_ID"]) sess = sagemaker.Session() # sagemaker session bucket -> used for uploading data, models and logs # we need the bucket since we will package the model with our own inference script # sagemaker will automatically create this bucket if it does not exist sagemaker_session_bucket = None if sagemaker_session_bucket is None and sess is not None: # set to default bucket if a bucket name is not given sagemaker_session_bucket = sess.default_bucket() sess = sagemaker.Session(default_bucket=sagemaker_session_bucket) # Custom model packaging model_tar_dir = Path(hub["HF_MODEL_ID"].split("/")[-1]) if args.download_model: logger.info("Downloading model %s", hub["HF_MODEL_ID"]) # delete model dir if exist if os.path.exists(model_tar_dir) and os.path.isdir(model_tar_dir): shutil.rmtree(model_tar_dir) # setup temporary directory with TemporaryDirectory() as tmpdir: # download snapshot snapshot_dir = snapshot_download( repo_id=hub["HF_MODEL_ID"], cache_dir=tmpdir ) logger.info(os.listdir(snapshot_dir)) # copy snapshot to model dir logger.info("%s -> %s", snapshot_dir, str(model_tar_dir)) shutil.copytree(snapshot_dir, str(model_tar_dir)) # Copy custom inference script into the model directory code_in_model_dir = str(model_tar_dir.joinpath("code")) # delete code dir in model dir if exist if os.path.exists(code_in_model_dir) and os.path.isdir(code_in_model_dir): shutil.rmtree(code_in_model_dir) # copy embeddings/ to model dir current_path = Path(__file__).parent.resolve() shutil.copytree(f"{current_path}/embeddings", code_in_model_dir) # Create model archive model_tar = compress(str(model_tar_dir)) # Upload model s3_model_uri = S3Uploader.upload( local_path=model_tar, desired_s3_uri=f"s3://{sess.default_bucket()}/{compliant_name}", ) logger.info(f"model uploaded to: {s3_model_uri}") logger.info("Starting deployment of model on Sagemaker") # create Hugging Face Model Class huggingface_model = HuggingFaceModel( transformers_version=args.transformers_version, pytorch_version=args.pytorch_version, py_version=args.python_version, env=hub, role=role, name=compliant_name, model_data=s3_model_uri, ) # deploy model to SageMaker Inference predictor = huggingface_model.deploy( initial_instance_count=1, # number of instances instance_type=args.vm_size, # ec2 instance type endpoint_name=compliant_name, )Once my embedding is deployed through my script, this is working fine :)
@egraells no i didn't write the blog yet but my answer above should help (and be quite complete)