openai-python icon indicating copy to clipboard operation
openai-python copied to clipboard

New Feature Proposal: Assistant API - Chaining streams for function execution

Open pi-infected opened this issue 1 year ago • 1 comments

Confirm this is a feature request for the Python library and not the underlying OpenAI API.

  • [X] This is a feature request for the Python library

Describe the feature or improvement you're requesting

Hi,

I have developed a monkey patch to add the capacity for chaining streams which is very beneficial for the Assistant API function execution workflow. I think it could be integrated into the openai library. So, I guess you want to know the use case, right?

Imagine you are processing the assistant events in a loop (in my case I use the Async stream client but it's basically the almost same for the non-async streaming one):

async for chunk in assistant_stream_response:
    # Process chunk here
    
    
    # Process function calls
    if isinstance(chunk, ThreadRunRequiresAction):
        tool_outputs = # Execute the function and gather the outputs in this var     
    
        new_stream = await async_client.beta.threads.runs.submit_tool_outputs(
            thread_id=thread_id, # stored along the way
            run_id=chunk.data.id,
            tool_outputs=tool_outputs,
            stream=True
        )
        # we can chain the new_stream at the end of the current one to avoid writing another chunk processing loop
        assistant_stream_response.chain_stream(new_stream)


    yield result

With this, we can chain the tool submit stream response to the current one to avoid writing another chunk processing loop. Tested & working.

It very beneficial, especially when you integrate the assistant API inside a project to avoid changing the existing workflow. Here is the monkey patch:

#--------------------------------------MONKEY-PATCH-OPENAI--------------------------------------------------------------
import openai
from typing import Any, TypeVar, AsyncIterator, cast
from openai._utils import is_mapping
from openai._exceptions import APIError
from openai import AsyncOpenAI
import httpx

_T = TypeVar("_T")


def monkey_patch__init__(self, *, cast_to: type[_T], response: httpx.Response, client: AsyncOpenAI) -> None:
  self.response = response
  self._cast_to = cast_to
  self._client = client
  self._decoder = client._make_sse_decoder()
  self._iterator = self.__stream__()
  self._chained_stream = None # MOD HERE 

def chain_stream(self, stream): # NEW FUNCT HERE
  if self._chained_stream:
    self._chained_stream.chain_stream(stream)
  else:
    self._chained_stream = stream

async def monkey_patch__stream__(self) -> AsyncIterator[_T]:
  cast_to = cast(Any, self._cast_to)
  response = self.response
  process_data = self._client._process_response_data
  iterator = self._iter_events()

  async for sse in iterator:
    if sse.data.startswith("[DONE]"):
      break

    if sse.event is None:
      data = sse.json()
      if is_mapping(data) and data.get("error"):
        message = None
        error = data.get("error")
        if is_mapping(error):
          message = error.get("message")
        if not message or not isinstance(message, str):
          message = "An error occurred during streaming"

        raise APIError(
          message=message,
          request=self.response.request,
          body=data["error"],
        )

      yield process_data(data=data, cast_to=cast_to, response=response)

    else:
      data = sse.json()

      if sse.event == "error" and is_mapping(data) and data.get("error"):
        message = None
        error = data.get("error")
        if is_mapping(error):
          message = error.get("message")
        if not message or not isinstance(message, str):
          message = "An error occurred during streaming"

        raise APIError(
          message=message,
          request=self.response.request,
          body=data["error"],
        )

      yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response)

  async for _sse in iterator:
    ...

  if self._chained_stream: # MOD HERE
    async for chunk in self._chained_stream:
      yield chunk


openai.AsyncStream.__init__ = monkey_patch__init__
openai.AsyncStream.__stream__ = monkey_patch__stream__
openai.AsyncStream.chain_stream = chain_stream
#-----------------------------------------------------------------------------------------------------------------------

Best regards, Paul Irolla

Additional context

I have implemented this inside my personal fork of LiteLLM for integrating the assistant API into the existing workflow without changing a thousand of code lines.

pi-infected avatar Mar 22 '24 09:03 pi-infected

great workaround! Something like this is necessary since there could be an arbitrary number of tool_calls.

hayescode avatar Apr 24 '24 16:04 hayescode

Thanks for the suggestion! We have another, similar proposal in the works – stay tuned :)

rattrayalex avatar May 13 '24 01:05 rattrayalex

Hey @rattrayalex, could you share this proposal you mentioned?

mpereira avatar Jun 15 '24 01:06 mpereira