mirascope
mirascope copied to clipboard
Track costs for streaming with OpenAI
Is your feature request related to a problem? Please describe. Prior versions of openai did not have usage stats when streaming.
Describe the solution you'd like
Add stream_options: {"include_usage": true}. Add total_cost as a property of OpenAICallResponseChunk.
Additional context OpenAI Cookbook Reference
We may also want to consider updating the generator to just return total cost separate from the response chunk so the generator would have type Generator[BaseCallResponseChunkT, None, Optional[float]] and then return total cost at the end of the generator if available, otherwise return None.
The stream_options: {"include_usage": true} has been implemented with https://github.com/Mirascope/mirascope/pull/239
I imagine the other providers will follow how OpenAI and Cohere include usage when streaming. So rather than have return type be the cost, we should add properties, cost, usage, input_tokens, and output_tokens like we do for BaseCallResponse.
It would be as follows:
class BaseCallResponseChunk(BaseModel, Generic[ChunkT, BaseToolT], ABC):
"""A base abstract interface for LLM streaming response chunks.
Attributes:
response: The original response chunk from whichever model response this wraps.
"""
chunk: ChunkT
tool_types: Optional[list[Type[BaseToolT]]] = None
cost: Optional[float] = None # The cost of the completion in dollars
model_config = ConfigDict(extra="allow", arbitrary_types_allowed=True)
...
class OpenAICallResponseChunk(BaseCallResponseChunk[ChatCompletionChunk, OpenAITool]):
"""Convenience wrapper around chat completion streaming chunks.
When using Mirascope's convenience wrappers to interact with OpenAI models via
`OpenAICall.stream`, responses will return an `OpenAICallResponseChunk`, whereby
the implemented properties allow for simpler syntax and a convenient developer
experience.
Example:
```python
from mirascope.openai import OpenAICall
class Math(OpenAICall):
prompt_template = "What is 1 + 2?"
for chunk in OpenAICall().stream():
print(chunk.content)
#> 1
# +
# 2
# equals
#
# 3
# .
"""
response_format: Optional[ResponseFormat] = None
@property
def choices(self) -> list[ChunkChoice]:
"""Returns the array of chat completion choices."""
return self.chunk.choices
@property
def choice(self) -> ChunkChoice:
"""Returns the 0th choice."""
return self.chunk.choices[0]
@property
def delta(self) -> Optional[ChoiceDelta]:
"""Returns the delta for the 0th choice."""
if self.chunk.choices:
return self.chunk.choices[0].delta
return None
@property
def content(self) -> str:
"""Returns the content for the 0th choice delta."""
return (
self.delta.content if self.delta is not None and self.delta.content else ""
)
@property
def tool_calls(self) -> Optional[list[ChoiceDeltaToolCall]]:
"""Returns the partial tool calls for the 0th choice message.
The first `list[ChoiceDeltaToolCall]` will contain the name of the tool and
index, and subsequent `list[ChoiceDeltaToolCall]`s will contain the arguments
which will be strings that need to be concatenated with future
`list[ChoiceDeltaToolCall]`s to form a complete JSON tool calls. The last
`list[ChoiceDeltaToolCall]` will be None indicating end of stream.
"""
if self.delta:
return self.delta.tool_calls
return None
@property
def usage(self) -> Optional[CompletionUsage]:
"""Returns the usage of the chat completion."""
if self.response.usage:
return self.response.usage
return None
@property
def input_tokens(self) -> Optional[int]:
"""Returns the number of input tokens."""
if self.usage:
return self.usage.prompt_tokens
return None
@property
def output_tokens(self) -> Optional[int]:
"""Returns the number of output tokens."""
if self.usage:
return self.usage.completion_tokens
return None
What will also need to be updated are our stream and stream_async functions. We can check if usage exists, and call openai_api_calculate_cost if we detect it.
Finally, the user when iterating through the stream can check if cost exists.
from mirascope.openai import OpenAICall
class BookRecommender(OpenAICall):
prompt_template = "Please recommend a {genre} book."
genre: str
stream = BookRecommender(genre="fantasy").stream()
for chunk in stream:
print(chunk.content, end="")
if chunk.cost is not None:
print(chunk.cost)
I wonder if we could take advantage of the generator return value to push the cost check inside of the generator if desired.
For instance:
stream = BookRecommender(genre="fantasy").stream()
for chunk in stream:
print(chunk.content, end="", flush=True)
cost = stream.value # Optional[float]
Internally we would check for the chunk cost (i.e. do everything the same as above) but return it so the user doensn't have to manually check if cost is not None
To enhance the functionality of streaming with OpenAI by incorporating cost tracking directly within the streaming process. This involves modifying the BaseCallResponseChunk class to include properties for cost, usage, input tokens, and output tokens, and adjusting the streaming functions (stream and stream_async) to calculate and handle these costs dynamically.
This steps are need to follow:
Step 1: Update the BaseCallResponseChunk Class
First, ensure that the BaseCallResponseChunk class includes properties for cost, usage, input tokens, and output tokens. This class acts as a base for all streaming response chunks, providing a consistent way to access these properties across different tools.
from pydantic import BaseModel, Field, Optional
from typing import Generic, Type, List, Union, Any, Dict, Optional
from abc import ABC, abstractmethod
class BaseCallResponseChunk(BaseModel, Generic[Any, Any], ABC):
chunk: Any
tool_types: Optional[List[Type[Any]]] = None
cost: Optional[float] = Field(None, description="The cost of the completion in dollars")
usage: Optional[Dict[str, int]] = Field(None, description="Usage statistics")
input_tokens: Optional[int] = Field(None, description="Number of input tokens")
output_tokens: Optional[int] = Field(None, description="Number of output tokens")
@abstractmethod
def __init__(self, **data: Any) -> None:
super().__init__(**data)
Step 2: Modify Streaming Functions
Next, adjust the stream and stream_async functions to calculate costs dynamically based on the usage data returned by the API. You'll need to integrate the openai_api_calculate_cost function (or its equivalent for other APIs) to perform this calculation.
async def stream_async(self, *args, **kwargs):
async for chunk in await self._api_call(*args, **kwargs):
yield chunk
if chunk.usage:
cost = await openai_api_calculate_cost(chunk.model_name, chunk.input_tokens, chunk.output_tokens)
chunk.cost = cost
Step 3: User Interaction
Users interacting with the stream can now easily access the cost information along with the content of each chunk. Here's an example of how they might use this enhanced functionality:
from mirascope.openai import OpenAICall
class BookRecommender(OpenAICall):
prompt_template = "Please recommend a {genre} book."
genre: str
stream = BookRecommender(genre="fantasy").stream()
for chunk in stream:
print(chunk.content, end="")
if chunk.cost is not None:
print(f"\nCost: {chunk.cost}")
Additional Considerations
- Ensure that the
openai_api_calculate_costfunction (or its equivalent for other APIs) accurately reflects the pricing model of the API you're using. - Test thoroughly to confirm that cost calculations are accurate and that the streaming process behaves as expected under various conditions.
- Consider implementing error handling for cases where cost or usage data cannot be retrieved from the API.
By following these steps, you'll enhance the Mirascope project's ability to track costs during streaming sessions, providing users with valuable insights into their usage and expenses.
Hi, @willbakst, I am working on this issue and I was able to add the feature for OpenAI. Now while working for Cohere API, the usage() property for CohereCallResponse returns a type Optional[ApiMetaBilledUnits].
But now for the usage() property on CohereCallResponseChunk; according to Cohere's Stream API it does not give the response with the same type (It gives the token_count property). Any ideas on how should I tackle that? I thought of creating a variable of the type Optional[ApiMetaBilledUnits] based on the data available from token_count.
Example:
"token_count": {
"prompt_tokens": 2821,
"response_tokens": 29,
"total_tokens": 2850,
"billed_tokens": 37
}
can be converted to:
"billed_units": {
"input_tokens": 8,
"output_tokens": 29
}
Please submit the PR for OpenAI first without the cohere stuff so we can review in smaller chunks.
Please also move the discussion on Cohere to its issue so we can continue tracking even if we close this issue. I’ll need to take a deeper look into the cohere API to give the best answer. My quick answer would be that massaging the data into the desired format could work, but if you think there’s a better option we can always review in the PR where we can better see how it works all together.
Thanks!
I was going to raise the PR for OpenAI, but the issue with that was that as I made changes to the BaseCallResponseChunk abstract class it required changes to be made in all the classes implementing this class and was failing some test cases. So should I raise the PR anyways?
I would only add the abstract methods if we’re going to require these methods on all response chunk types, but given that not all of them currently support streaming cost tracking we should just make the methods specific (for now) to the providers that support it.
This is released in v0.16 🎉