llama_index
llama_index copied to clipboard
[Question]: Bedrock LLMs async
Question Validation
- [X] I have searched both the documentation and discord for an answer.
Question
Hi, I've constructed a query engine and am testing the following query pipeline with p.run
vs p.arun
:
Regular:
response = p.run(input="What did the author do during his time in YC?")
Async:
response = await p.arun(input="What did the author do during his time in YC?")
When I use the default OpenAI LLM, the async operation is significantly faster. However, when I switch the LLM to a Bedrock LLM, the operations both take the same amount of time. Do Bedrock LLMs not yet support async?
🤖
Hey there @austinmw! Great to see you back. 😊 I remember our last interaction. I'm diving into your query about the async operations with Bedrock LLMs now and will get back to you with an answer shortly!
Nvm I see in Bedrock
class the following:
async def achat(
self, messages: Sequence[ChatMessage], **kwargs: Any
) -> ChatResponse:
"""Chat asynchronously."""
# TODO: do synchronous chat for now
return self.chat(messages, **kwargs)
async def acomplete(
self, prompt: str, formatted: bool = False, **kwargs: Any
) -> CompletionResponse:
raise NotImplementedError
Curious if it's possible to support async yet?
🤖
Yes, it is indeed possible for Bedrock LLMs in the LlamaIndex to support asynchronous operations. The current implementation of the achat
method in the Bedrock
class is designed to chat asynchronously, but it is currently just calling the synchronous chat
method. This is indicated by the comment # TODO: do synchronous chat for now
.
As for the acomplete
method, it is supposed to complete a prompt asynchronously. However, it is not implemented yet and raises a NotImplementedError
when called.
The LlamaIndex provides several utility functions that facilitate asynchronous operations using Python's asyncio library. For instance, the run_async_tasks
function accepts a list of coroutine tasks and runs them asynchronously. If the show_progress
parameter is set to True, it uses the tqdm
library to display a progress bar.
Here is an example of how you might use these functions to run asynchronous tasks:
tasks = [some_async_function(arg) for arg in args_list]
results = run_async_tasks(tasks)
In this example, some_async_function
would be an asynchronous function that you want to run for each argument in args_list
, and run_async_tasks
would run these tasks asynchronously and return a list of results.
The difference between the run
and arun
methods in the LlamaIndex codebase lies in their execution nature. The _run_component
method is a synchronous method, while the _arun_component
method is an asynchronous method. In the _run_component
method, the query
method of the query_engine
object is called synchronously. This means that the execution of the program will block or wait until the query
method has completed its execution. On the other hand, in the _arun_component
method, the aquery
method of the query_engine
object is called asynchronously using the await
keyword. This means that the execution of the program will not wait for the aquery
method to complete its execution. Instead, it will move on to execute other parts of the program, and come back to it once the aquery
method has completed its execution.
In summary, the main difference between the run
and arun
methods is that run
is synchronous (blocking) and arun
is asynchronous (non-blocking).
You can find more details in the LlamaIndex repository.
Sources
About Dosu
This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.
Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.
If you want to continue the conversation, start your reply with @dosu-bot.
Looks like it could be implemented, but just hasn't yet
Would need to use an async boto3 session: https://pypi.org/project/aioboto3/
Ah okay. It looks like I can use aioboto3
with bedrock-runtime
like this:
# # For notebooks
# import nest_asyncio
# nest_asyncio.apply()
import aioboto3
import json
from botocore.config import Config
import asyncio
session = aioboto3.Session()
async def fetch_completion(
session: aioboto3.session.Session,
body: str,
modelId: str = "anthropic.claude-instant-v1",
accept: str="application/json",
contentType: str="application/json",
):
retry_config = Config(retries={'max_attempts': 10, 'mode': 'adaptive'})
async with session.client('bedrock-runtime', config=retry_config) as brt:
# Making the invoke_model call asynchronously
response = await brt.invoke_model(
body=body,
modelId=modelId,
accept=accept,
contentType=contentType,
)
# Asynchronously reading the response body
# Check if 'Body' key exists or inspect the response structure
if 'body' in response:
response_body_str = await response['body'].read()
else:
print("Response structure:", response)
return # Exit the function if the structure is not as expected
response_body = json.loads(response_body_str)
return response_body.get('completion')
body = json.dumps({
"prompt": "\n\nHuman: concisely explain black holes to 8th graders\n\nAssistant:",
"max_tokens_to_sample": 50,
"temperature": 0.1,
"top_p": 0.9,
})
# Running once
result = asyncio.run(fetch_completion(session, body))
print(result)
# Running multiple times in parallel
payloads = [body] * 10
results = await asyncio.gather(*(fetch_completion(session, payload) for payload in payloads))
assert len(results) == 10
print(results)
This seems to work for me and successfully improve my response time; not sure if there's any issues with it I might have missed:
import asyncio
from botocore.config import Config
import json
from typing import Any, Sequence
from llama_index.llms.bedrock import Bedrock
from llama_index.llms.bedrock.utils import completion_with_retry
from llama_index.core.base.llms.types import (
ChatMessage,
ChatResponse,
CompletionResponse
)
from llama_index.core.llms.generic_utils import (
completion_response_to_chat_response,
)
from llama_index.core.llms.callbacks import (
llm_chat_callback,
llm_completion_callback,
)
from llama_index.core.bridge.pydantic import Field, PrivateAttr
class AsyncBedrock(Bedrock):
_session: Any = PrivateAttr()
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Initialize the aioboto3 session here for use in async methods
self._session = aioboto3.Session(
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
aws_session_token=self.aws_session_token,
region_name=self.region_name,
profile_name=self.profile_name,
)
@llm_completion_callback()
async def acomplete(
self, prompt: str, formatted: bool = False, **kwargs: Any
) -> CompletionResponse:
# Ensure prompt formatting
if not formatted:
prompt = self.completion_to_prompt(prompt)
all_kwargs = self._get_all_kwargs(**kwargs)
request_body = self._provider.get_request_body(prompt, all_kwargs)
request_body_str = json.dumps(request_body)
retry_config = Config(retries={'max_attempts': self.max_retries, 'mode': 'adaptive'},
connect_timeout=self.timeout,
read_timeout=self.timeout)
# Use the aioboto3 client within an async with block
async with self._session.client('bedrock-runtime', config=retry_config) as client:
response = await client.invoke_model(
body=request_body_str,
modelId=self.model,
accept='application/json',
contentType='application/json',
)
if 'body' in response:
response_body_str = await response['body'].read()
response_body = json.loads(response_body_str)
return CompletionResponse(
text=self._provider.get_text_from_response(response_body), raw=response_body
)
else:
raise ValueError("Unexpected response format")
@llm_chat_callback()
async def achat(
self, messages: Sequence[ChatMessage], **kwargs: Any
) -> ChatResponse:
prompt = self.messages_to_prompt(messages)
completion_response = await self.acomplete(prompt, formatted=True, **kwargs)
return completion_response_to_chat_response(completion_response)
region = "us-east-1"
llm = AsyncBedrock(
model="anthropic.claude-instant-v1",
model_kwargs={
"temperature": 0.2
},
region_name=region,
)
@austinmw I am thinking about doing a pull request from your work. Oki for you ?
@billmetangmo Any updates on this? Also interested since synchronous can be painfully slow.
@logan-markewich LangChain has added this, so it would be good to have it here as well.
@billmetangmo go for it!
@billmetangmo Have you started this? If not, I'll take a stab.
Did not have the time yet. You cn go for ot @brycecf
@austinmw, I am using Bedrock provider too. How should I change the code to make run the LLM evaluation as in: https://docs.llamaindex.ai/en/stable/examples/retrievers/auto_merging_retriever/#evaluation ?
I have a workaround to use bedrock embeddings with multi-regions and async support if someone is interested
from random import choice
from typing import List, Literal, Union
import aioboto3
import boto3
from botocore.config import Config
from llama_index.core.base.embeddings.base import Embedding
from llama_index.embeddings.bedrock import BedrockEmbedding
from llama_index.embeddings.bedrock.base import PROVIDER_SPECIFIC_IDENTIFIERS
class MultiRegionSession:
def __init__(self, region_names=None, use_async=False, **kwargs):
session_manager = aioboto3.Session if use_async else boto3.Session
self.sessions = {
region: session_manager(region_name=region, **kwargs)
for region in region_names
}
self.is_async = use_async
def client(self, client_name, **kwargs):
return (
AsyncMultiregionClient(client_name, available_sessions=self.sessions)
if self.is_async
else MultiregionClient(client_name, available_sessions=self.sessions)
)
class AsyncMultiregionClient:
def __init__(self, client_name, available_sessions={}):
self.sessions = available_sessions
self.client_name = client_name
async def invoke_model(self, *args, **kwargs):
# __getattr__ wont work with async ctx manager
# we need to implement every used methods
chosen_region = choice(list(self.sessions.keys()))
async with getattr(self.sessions[chosen_region], "client")(
self.client_name
) as client:
return await client.invoke_model(*args, **kwargs)
class MultiregionClient:
def __init__(self, client_name, available_sessions={}):
self.sessions = available_sessions
self.client_name = client_name
def __getattr__(self, method):
chosen_region = choice(list(self.sessions.keys()))
client = getattr(self.sessions[chosen_region], "client")(self.client_name)
return getattr(client, method)
class MultiRegionBedrockEmbedding(BedrockEmbedding):
def __init__(
self,
use_async=True,
session_kwargs={},
retries={"max_attempts": 1, "mode": "standard"},
timeout=60,
*args,
**kwargs
):
try:
region_names = session_kwargs.pop("region_names")
except KeyError:
raise Exception("you must provide region_names in session_kwargs")
session = MultiRegionSession(
use_async=use_async, region_names=region_names, **session_kwargs
)
client = session.client(
"bedrock-runtime",
config=Config(
retries=retries,
connect_timeout=timeout,
read_timeout=timeout,
),
)
super().__init__(client=client, *args, **kwargs)
async def _aget_embedding(
self, payload: Union[str, List[str]], type: Literal["text", "query"]
) -> Union[Embedding, List[Embedding]]:
if self._client is None:
self.set_credentials()
if self._client is None:
raise ValueError("Client not set")
provider = self.model_name.split(".")[0]
request_body = self._get_request_body(provider, payload, type)
response = await self._client.invoke_model(
body=request_body,
modelId=self.model_name,
accept="application/json",
contentType="application/json",
)
resp = json.loads((await response.get("body").read()).decode("utf-8"))
identifiers = PROVIDER_SPECIFIC_IDENTIFIERS.get(provider, None)
if identifiers is None:
raise ValueError("Provider not supported")
return identifiers["get_embeddings_func"](resp, isinstance(payload, list))
async def _aget_query_embedding(self, query: str) -> Embedding:
if not isinstance(self._client, AsyncMultiregionClient):
raise Exception(
"Error: async method called but class has been created with `use_async=False` "
)
return await self._aget_embedding(query, "query")
async def _aget_text_embedding(self, text: str) -> Embedding:
if not isinstance(self._client, AsyncMultiregionClient):
raise Exception(
"Error: async method called but class has been created with `use_async=False` "
)
return await self._aget_embedding(text, "text")
PR #14326 addresses the need for async Bedrock, albeit using the new BedrockConverse
LLM instead of the old Bedrock
one.