mirascope icon indicating copy to clipboard operation
mirascope copied to clipboard

Track costs for streaming with OpenAI

Open brenkao opened this issue 1 year ago • 5 comments

Is your feature request related to a problem? Please describe. Prior versions of openai did not have usage stats when streaming.

Describe the solution you'd like Add stream_options: {"include_usage": true}. Add total_cost as a property of OpenAICallResponseChunk.

Additional context OpenAI Cookbook Reference

brenkao avatar May 07 '24 16:05 brenkao

We may also want to consider updating the generator to just return total cost separate from the response chunk so the generator would have type Generator[BaseCallResponseChunkT, None, Optional[float]] and then return total cost at the end of the generator if available, otherwise return None.

willbakst avatar May 07 '24 16:05 willbakst

The stream_options: {"include_usage": true} has been implemented with https://github.com/Mirascope/mirascope/pull/239

brenkao avatar May 14 '24 03:05 brenkao

I imagine the other providers will follow how OpenAI and Cohere include usage when streaming. So rather than have return type be the cost, we should add properties, cost, usage, input_tokens, and output_tokens like we do for BaseCallResponse.

It would be as follows:

class BaseCallResponseChunk(BaseModel, Generic[ChunkT, BaseToolT], ABC):
    """A base abstract interface for LLM streaming response chunks.

    Attributes:
        response: The original response chunk from whichever model response this wraps.
    """

    chunk: ChunkT
    tool_types: Optional[list[Type[BaseToolT]]] = None
    cost: Optional[float] = None  # The cost of the completion in dollars

    model_config = ConfigDict(extra="allow", arbitrary_types_allowed=True)

    ...

class OpenAICallResponseChunk(BaseCallResponseChunk[ChatCompletionChunk, OpenAITool]):
    """Convenience wrapper around chat completion streaming chunks.

    When using Mirascope's convenience wrappers to interact with OpenAI models via
    `OpenAICall.stream`, responses will return an `OpenAICallResponseChunk`, whereby
    the implemented properties allow for simpler syntax and a convenient developer
    experience.

    Example:

    ```python
    from mirascope.openai import OpenAICall


    class Math(OpenAICall):
        prompt_template = "What is 1 + 2?"


    for chunk in OpenAICall().stream():
        print(chunk.content)

    #> 1
    #  +
    #  2
    #   equals
    #
    #  3
    #  .
    """

    response_format: Optional[ResponseFormat] = None

    @property
    def choices(self) -> list[ChunkChoice]:
        """Returns the array of chat completion choices."""
        return self.chunk.choices

    @property
    def choice(self) -> ChunkChoice:
        """Returns the 0th choice."""
        return self.chunk.choices[0]

    @property
    def delta(self) -> Optional[ChoiceDelta]:
        """Returns the delta for the 0th choice."""
        if self.chunk.choices:
            return self.chunk.choices[0].delta
        return None

    @property
    def content(self) -> str:
        """Returns the content for the 0th choice delta."""
        return (
            self.delta.content if self.delta is not None and self.delta.content else ""
        )

    @property
    def tool_calls(self) -> Optional[list[ChoiceDeltaToolCall]]:
        """Returns the partial tool calls for the 0th choice message.

        The first `list[ChoiceDeltaToolCall]` will contain the name of the tool and
        index, and subsequent `list[ChoiceDeltaToolCall]`s will contain the arguments
        which will be strings that need to be concatenated with future
        `list[ChoiceDeltaToolCall]`s to form a complete JSON tool calls. The last
        `list[ChoiceDeltaToolCall]` will be None indicating end of stream.
        """
        if self.delta:
            return self.delta.tool_calls
        return None

    @property
    def usage(self) -> Optional[CompletionUsage]:
        """Returns the usage of the chat completion."""
        if self.response.usage:
            return self.response.usage
        return None

    @property
    def input_tokens(self) -> Optional[int]:
        """Returns the number of input tokens."""
        if self.usage:
            return self.usage.prompt_tokens
        return None

    @property
    def output_tokens(self) -> Optional[int]:
        """Returns the number of output tokens."""
        if self.usage:
            return self.usage.completion_tokens
        return None

What will also need to be updated are our stream and stream_async functions. We can check if usage exists, and call openai_api_calculate_cost if we detect it.

Finally, the user when iterating through the stream can check if cost exists.

from mirascope.openai import OpenAICall

class BookRecommender(OpenAICall):
    prompt_template = "Please recommend a {genre} book."

    genre: str


stream = BookRecommender(genre="fantasy").stream()
for chunk in stream:
    print(chunk.content, end="")
    if chunk.cost is not None:
        print(chunk.cost)

brenkao avatar May 22 '24 22:05 brenkao

I wonder if we could take advantage of the generator return value to push the cost check inside of the generator if desired.

For instance:

stream = BookRecommender(genre="fantasy").stream()
for chunk in stream:
    print(chunk.content, end="", flush=True)
cost = stream.value  # Optional[float]

Internally we would check for the chunk cost (i.e. do everything the same as above) but return it so the user doensn't have to manually check if cost is not None

willbakst avatar May 22 '24 23:05 willbakst

To enhance the functionality of streaming with OpenAI by incorporating cost tracking directly within the streaming process. This involves modifying the BaseCallResponseChunk class to include properties for cost, usage, input tokens, and output tokens, and adjusting the streaming functions (stream and stream_async) to calculate and handle these costs dynamically.

This steps are need to follow:

Step 1: Update the BaseCallResponseChunk Class

First, ensure that the BaseCallResponseChunk class includes properties for cost, usage, input tokens, and output tokens. This class acts as a base for all streaming response chunks, providing a consistent way to access these properties across different tools.

from pydantic import BaseModel, Field, Optional
from typing import Generic, Type, List, Union, Any, Dict, Optional
from abc import ABC, abstractmethod

class BaseCallResponseChunk(BaseModel, Generic[Any, Any], ABC):
    chunk: Any
    tool_types: Optional[List[Type[Any]]] = None
    cost: Optional[float] = Field(None, description="The cost of the completion in dollars")
    usage: Optional[Dict[str, int]] = Field(None, description="Usage statistics")
    input_tokens: Optional[int] = Field(None, description="Number of input tokens")
    output_tokens: Optional[int] = Field(None, description="Number of output tokens")

    @abstractmethod
    def __init__(self, **data: Any) -> None:
        super().__init__(**data)

Step 2: Modify Streaming Functions

Next, adjust the stream and stream_async functions to calculate costs dynamically based on the usage data returned by the API. You'll need to integrate the openai_api_calculate_cost function (or its equivalent for other APIs) to perform this calculation.

async def stream_async(self, *args, **kwargs):
    async for chunk in await self._api_call(*args, **kwargs):
        yield chunk
        if chunk.usage:
            cost = await openai_api_calculate_cost(chunk.model_name, chunk.input_tokens, chunk.output_tokens)
            chunk.cost = cost

Step 3: User Interaction

Users interacting with the stream can now easily access the cost information along with the content of each chunk. Here's an example of how they might use this enhanced functionality:

from mirascope.openai import OpenAICall

class BookRecommender(OpenAICall):
    prompt_template = "Please recommend a {genre} book."
    genre: str

stream = BookRecommender(genre="fantasy").stream()
for chunk in stream:
    print(chunk.content, end="")
    if chunk.cost is not None:
        print(f"\nCost: {chunk.cost}")

Additional Considerations

  • Ensure that the openai_api_calculate_cost function (or its equivalent for other APIs) accurately reflects the pricing model of the API you're using.
  • Test thoroughly to confirm that cost calculations are accurate and that the streaming process behaves as expected under various conditions.
  • Consider implementing error handling for cases where cost or usage data cannot be retrieved from the API.

By following these steps, you'll enhance the Mirascope project's ability to track costs during streaming sessions, providing users with valuable insights into their usage and expenses.

ashishpatel26 avatar May 23 '24 12:05 ashishpatel26

Hi, @willbakst, I am working on this issue and I was able to add the feature for OpenAI. Now while working for Cohere API, the usage() property for CohereCallResponse returns a type Optional[ApiMetaBilledUnits]. But now for the usage() property on CohereCallResponseChunk; according to Cohere's Stream API it does not give the response with the same type (It gives the token_count property). Any ideas on how should I tackle that? I thought of creating a variable of the type Optional[ApiMetaBilledUnits] based on the data available from token_count.

Example:

 "token_count": {
            "prompt_tokens": 2821,
            "response_tokens": 29,
            "total_tokens": 2850,
            "billed_tokens": 37
        }

can be converted to:

 "billed_units": {
            "input_tokens": 8,
            "output_tokens": 29
        }

tvj15 avatar May 25 '24 20:05 tvj15

Please submit the PR for OpenAI first without the cohere stuff so we can review in smaller chunks.

Please also move the discussion on Cohere to its issue so we can continue tracking even if we close this issue. I’ll need to take a deeper look into the cohere API to give the best answer. My quick answer would be that massaging the data into the desired format could work, but if you think there’s a better option we can always review in the PR where we can better see how it works all together.

Thanks!

willbakst avatar May 25 '24 20:05 willbakst

I was going to raise the PR for OpenAI, but the issue with that was that as I made changes to the BaseCallResponseChunk abstract class it required changes to be made in all the classes implementing this class and was failing some test cases. So should I raise the PR anyways?

tvj15 avatar May 25 '24 20:05 tvj15

I would only add the abstract methods if we’re going to require these methods on all response chunk types, but given that not all of them currently support streaming cost tracking we should just make the methods specific (for now) to the providers that support it.

willbakst avatar May 25 '24 20:05 willbakst

This is released in v0.16 🎉

willbakst avatar Jun 04 '24 17:06 willbakst