llama_index icon indicating copy to clipboard operation
llama_index copied to clipboard

[Question]: Bedrock LLMs async

Open austinmw opened this issue 1 year ago • 11 comments

Question Validation

  • [X] I have searched both the documentation and discord for an answer.

Question

Hi, I've constructed a query engine and am testing the following query pipeline with p.run vs p.arun:

image

Regular:

response = p.run(input="What did the author do during his time in YC?")

Async:

response = await p.arun(input="What did the author do during his time in YC?")

When I use the default OpenAI LLM, the async operation is significantly faster. However, when I switch the LLM to a Bedrock LLM, the operations both take the same amount of time. Do Bedrock LLMs not yet support async?

austinmw avatar Feb 14 '24 15:02 austinmw

🤖

Hey there @austinmw! Great to see you back. 😊 I remember our last interaction. I'm diving into your query about the async operations with Bedrock LLMs now and will get back to you with an answer shortly!

dosubot[bot] avatar Feb 14 '24 15:02 dosubot[bot]

Nvm I see in Bedrock class the following:

    async def achat(
        self, messages: Sequence[ChatMessage], **kwargs: Any
    ) -> ChatResponse:
        """Chat asynchronously."""
        # TODO: do synchronous chat for now
        return self.chat(messages, **kwargs)

    async def acomplete(
        self, prompt: str, formatted: bool = False, **kwargs: Any
    ) -> CompletionResponse:
        raise NotImplementedError

Curious if it's possible to support async yet?

austinmw avatar Feb 14 '24 15:02 austinmw

🤖

Yes, it is indeed possible for Bedrock LLMs in the LlamaIndex to support asynchronous operations. The current implementation of the achat method in the Bedrock class is designed to chat asynchronously, but it is currently just calling the synchronous chat method. This is indicated by the comment # TODO: do synchronous chat for now.

As for the acomplete method, it is supposed to complete a prompt asynchronously. However, it is not implemented yet and raises a NotImplementedError when called.

The LlamaIndex provides several utility functions that facilitate asynchronous operations using Python's asyncio library. For instance, the run_async_tasks function accepts a list of coroutine tasks and runs them asynchronously. If the show_progress parameter is set to True, it uses the tqdm library to display a progress bar.

Here is an example of how you might use these functions to run asynchronous tasks:

tasks = [some_async_function(arg) for arg in args_list]
results = run_async_tasks(tasks)

In this example, some_async_function would be an asynchronous function that you want to run for each argument in args_list, and run_async_tasks would run these tasks asynchronously and return a list of results.

The difference between the run and arun methods in the LlamaIndex codebase lies in their execution nature. The _run_component method is a synchronous method, while the _arun_component method is an asynchronous method. In the _run_component method, the query method of the query_engine object is called synchronously. This means that the execution of the program will block or wait until the query method has completed its execution. On the other hand, in the _arun_component method, the aquery method of the query_engine object is called asynchronously using the await keyword. This means that the execution of the program will not wait for the aquery method to complete its execution. Instead, it will move on to execute other parts of the program, and come back to it once the aquery method has completed its execution.

In summary, the main difference between the run and arun methods is that run is synchronous (blocking) and arun is asynchronous (non-blocking).

You can find more details in the LlamaIndex repository.

Sources

About Dosu

This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

If you want to continue the conversation, start your reply with @dosu-bot.

dosubot[bot] avatar Feb 14 '24 16:02 dosubot[bot]

Looks like it could be implemented, but just hasn't yet

Would need to use an async boto3 session: https://pypi.org/project/aioboto3/

logan-markewich avatar Feb 14 '24 16:02 logan-markewich

Ah okay. It looks like I can use aioboto3 with bedrock-runtime like this:

# # For notebooks
# import nest_asyncio
# nest_asyncio.apply()

import aioboto3
import json
from botocore.config import Config
import asyncio


session = aioboto3.Session()

async def fetch_completion(
    session: aioboto3.session.Session,
    body: str,
    modelId: str = "anthropic.claude-instant-v1",
    accept: str="application/json",
    contentType: str="application/json",
    ):

    retry_config = Config(retries={'max_attempts': 10, 'mode': 'adaptive'})

    async with session.client('bedrock-runtime', config=retry_config) as brt:

        # Making the invoke_model call asynchronously
        response = await brt.invoke_model(
            body=body,
            modelId=modelId,
            accept=accept,
            contentType=contentType,
        )

        # Asynchronously reading the response body

        # Check if 'Body' key exists or inspect the response structure
        if 'body' in response:
            response_body_str = await response['body'].read()
        else:
            print("Response structure:", response)
            return  # Exit the function if the structure is not as expected

        response_body = json.loads(response_body_str)
        return response_body.get('completion')


body = json.dumps({
    "prompt": "\n\nHuman: concisely explain black holes to 8th graders\n\nAssistant:",
    "max_tokens_to_sample": 50,
    "temperature": 0.1,
    "top_p": 0.9,
})

# Running once
result = asyncio.run(fetch_completion(session, body))
print(result)

# Running multiple times in parallel
payloads = [body] * 10
results = await asyncio.gather(*(fetch_completion(session, payload) for payload in payloads))
assert len(results) == 10
print(results)

austinmw avatar Feb 14 '24 17:02 austinmw

This seems to work for me and successfully improve my response time; not sure if there's any issues with it I might have missed:

import asyncio
from botocore.config import Config
import json
from typing import Any, Sequence

from llama_index.llms.bedrock import Bedrock
from llama_index.llms.bedrock.utils import completion_with_retry
from llama_index.core.base.llms.types import (
    ChatMessage,
    ChatResponse,
    CompletionResponse
)
from llama_index.core.llms.generic_utils import (
    completion_response_to_chat_response,
)

from llama_index.core.llms.callbacks import (
    llm_chat_callback,
    llm_completion_callback,
)
from llama_index.core.bridge.pydantic import Field, PrivateAttr


class AsyncBedrock(Bedrock):

    _session: Any = PrivateAttr()

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # Initialize the aioboto3 session here for use in async methods
        self._session = aioboto3.Session(
            aws_access_key_id=self.aws_access_key_id,
            aws_secret_access_key=self.aws_secret_access_key,
            aws_session_token=self.aws_session_token,
            region_name=self.region_name,
            profile_name=self.profile_name,
        )

    @llm_completion_callback()
    async def acomplete(
        self, prompt: str, formatted: bool = False, **kwargs: Any
    ) -> CompletionResponse:
        # Ensure prompt formatting
        if not formatted:
            prompt = self.completion_to_prompt(prompt)
        all_kwargs = self._get_all_kwargs(**kwargs)
        request_body = self._provider.get_request_body(prompt, all_kwargs)
        request_body_str = json.dumps(request_body)

        retry_config = Config(retries={'max_attempts': self.max_retries, 'mode': 'adaptive'},
                              connect_timeout=self.timeout,
                              read_timeout=self.timeout)

        # Use the aioboto3 client within an async with block
        async with self._session.client('bedrock-runtime', config=retry_config) as client:
            response = await client.invoke_model(
                body=request_body_str,
                modelId=self.model,
                accept='application/json',
                contentType='application/json',
            )

            if 'body' in response:
                response_body_str = await response['body'].read()
                response_body = json.loads(response_body_str)
                return CompletionResponse(
                    text=self._provider.get_text_from_response(response_body), raw=response_body
                )
            else:
                raise ValueError("Unexpected response format")

    @llm_chat_callback()
    async def achat(
        self, messages: Sequence[ChatMessage], **kwargs: Any
    ) -> ChatResponse:
        prompt = self.messages_to_prompt(messages)
        completion_response = await self.acomplete(prompt, formatted=True, **kwargs)
        return completion_response_to_chat_response(completion_response)


region = "us-east-1"

llm = AsyncBedrock(
    model="anthropic.claude-instant-v1",
    model_kwargs={
        "temperature": 0.2
    },
    region_name=region,
)

austinmw avatar Feb 14 '24 17:02 austinmw

@austinmw I am thinking about doing a pull request from your work. Oki for you ?

billmetangmo avatar Mar 06 '24 22:03 billmetangmo

@billmetangmo Any updates on this? Also interested since synchronous can be painfully slow.

@logan-markewich LangChain has added this, so it would be good to have it here as well.

brycecf avatar Apr 26 '24 11:04 brycecf

@billmetangmo go for it!

austinmw avatar Apr 26 '24 21:04 austinmw

@billmetangmo Have you started this? If not, I'll take a stab.

brycecf avatar May 11 '24 01:05 brycecf

Did not have the time yet. You cn go for ot @brycecf

billmetangmo avatar May 21 '24 00:05 billmetangmo

@austinmw, I am using Bedrock provider too. How should I change the code to make run the LLM evaluation as in: https://docs.llamaindex.ai/en/stable/examples/retrievers/auto_merging_retriever/#evaluation ?

mdciri avatar Jun 17 '24 16:06 mdciri

I have a workaround to use bedrock embeddings with multi-regions and async support if someone is interested

from random import choice
from typing import List, Literal, Union

import aioboto3
import boto3
from botocore.config import Config
from llama_index.core.base.embeddings.base import Embedding
from llama_index.embeddings.bedrock import BedrockEmbedding
from llama_index.embeddings.bedrock.base import PROVIDER_SPECIFIC_IDENTIFIERS


class MultiRegionSession:
    def __init__(self, region_names=None, use_async=False, **kwargs):
        session_manager = aioboto3.Session if use_async else boto3.Session
        self.sessions = {
            region: session_manager(region_name=region, **kwargs)
            for region in region_names
        }
        self.is_async = use_async

    def client(self, client_name, **kwargs):
        return (
            AsyncMultiregionClient(client_name, available_sessions=self.sessions)
            if self.is_async
            else MultiregionClient(client_name, available_sessions=self.sessions)
        )


class AsyncMultiregionClient:
    def __init__(self, client_name, available_sessions={}):
        self.sessions = available_sessions
        self.client_name = client_name

    async def invoke_model(self, *args, **kwargs):
        # __getattr__ wont work with async ctx manager
        # we need to implement every used methods
        chosen_region = choice(list(self.sessions.keys()))
        async with getattr(self.sessions[chosen_region], "client")(
            self.client_name
        ) as client:
            return await client.invoke_model(*args, **kwargs)


class MultiregionClient:
    def __init__(self, client_name, available_sessions={}):
        self.sessions = available_sessions
        self.client_name = client_name

    def __getattr__(self, method):
        chosen_region = choice(list(self.sessions.keys()))
        client = getattr(self.sessions[chosen_region], "client")(self.client_name)
        return getattr(client, method)


class MultiRegionBedrockEmbedding(BedrockEmbedding):
    def __init__(
        self,
        use_async=True,
        session_kwargs={},
        retries={"max_attempts": 1, "mode": "standard"},
        timeout=60,
        *args,
        **kwargs
    ):
        try: 
            region_names = session_kwargs.pop("region_names")
        except KeyError:
            raise Exception("you must provide region_names in session_kwargs")
        session = MultiRegionSession(
            use_async=use_async, region_names=region_names, **session_kwargs
        )
        client = session.client(
            "bedrock-runtime",
            config=Config(
                retries=retries,
                connect_timeout=timeout,
                read_timeout=timeout,
            ),
        )
        super().__init__(client=client, *args, **kwargs)

    async def _aget_embedding(
        self, payload: Union[str, List[str]], type: Literal["text", "query"]
    ) -> Union[Embedding, List[Embedding]]:
        if self._client is None:
            self.set_credentials()

        if self._client is None:
            raise ValueError("Client not set")

        provider = self.model_name.split(".")[0]
        request_body = self._get_request_body(provider, payload, type)

        response = await self._client.invoke_model(
            body=request_body,
            modelId=self.model_name,
            accept="application/json",
            contentType="application/json",
        )
        resp = json.loads((await response.get("body").read()).decode("utf-8"))
        identifiers = PROVIDER_SPECIFIC_IDENTIFIERS.get(provider, None)
        if identifiers is None:
            raise ValueError("Provider not supported")
        return identifiers["get_embeddings_func"](resp, isinstance(payload, list))

    async def _aget_query_embedding(self, query: str) -> Embedding:
        if not isinstance(self._client, AsyncMultiregionClient):
            raise Exception(
                "Error: async method called but class has been created with `use_async=False` "
            )
        return await self._aget_embedding(query, "query")

    async def _aget_text_embedding(self, text: str) -> Embedding:
        if not isinstance(self._client, AsyncMultiregionClient):
            raise Exception(
                "Error: async method called but class has been created with `use_async=False` "
            )
        return await self._aget_embedding(text, "text")

adgaultier avatar Jul 03 '24 17:07 adgaultier

PR #14326 addresses the need for async Bedrock, albeit using the new BedrockConverse LLM instead of the old Bedrock one.

AndreCNF avatar Jul 10 '24 15:07 AndreCNF