Option to enable structured outputs with OpenAI Generators
Is your feature request related to a problem? Please describe. OpenAI and many other llm providers are introducing structured outputs. See this doc
- It will only be enabled for some models
- It uses
.parseand not.createthe way we use in our generators right now
Describe the solution you'd like It would be great if we can add support for this either here, or as an experimental feature while this is still in beta. For example, here's what I've tried to build:
from haystack import Pipeline
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator
from pydantic import BaseModel
class DecomposedQuestions(BaseModel):
questions: list[str]
splitter_prompt = """
You are a query engine.
You prepare queries that will be sent to a web search component.
Sometimes, these queries are very complex.
You split up complex queries into multiple queries so that you can run multiple searches to find an answer.
When you split a query, you separate the sub-queries with '//'.
If the query is simple, then keep it as it is.
###
Example 1:
Query: Did Microsoft or Google make more money last year?
Decomposed Questions: DecomposedQuestions(questions=['How much profit did Microsoft make?', 'How much profit did Google make?'])
###
Example 2:
Query: What is the capital of Germany?
Decomposed Questions: DecomposedQuestions(questions=['What is the capital of Germany?'])
###
Example 3:
Query: {{question}}
Decomposed Questions:
"""
builder = PromptBuilder(splitter_prompt)
llm = OpenAIGenerator(model="gpt-4o-mini", generation_kwargs={"response_format": DecomposedQuestions})
pipeline = Pipeline()
pipeline.add_component("prompt", builder)
pipeline.add_component("llm", llm)
pipeline.connect("prompt", "llm")
Or something similar..
currently this will result in the following error:
[/usr/local/lib/python3.10/dist-packages/openai/resources/chat/completions.py](https://localhost:8080/#) in validate_response_format(response_format)
1414 def validate_response_format(response_format: object) -> None:
1415 if inspect.isclass(response_format) and issubclass(response_format, pydantic.BaseModel):
-> 1416 raise TypeError(
1417 "You tried to pass a `BaseModel` class to `chat.completions.create()`; You must use `beta.chat.completions.parse()` instead"
1418 )
TypeError: You tried to pass a `BaseModel` class to `chat.completions.create()`; You must use `beta.chat.completions.parse()` instead
Yes, please implement this : )
@TuanaCelik, thank you for proposing this. Did you implement it as a custom component in the near-term?
Hey @dashinja and @thompsondt
Check out the query decomposition article/recipe. It's not an integration/component officially but I sneaked in an implementation there to help out in the meantime:
https://haystack.deepset.ai/blog/query-decomposition
I'm going to try implementing this. The decomposition example maps almost 1:1 to what I'm anticipating in multiple queries.
Thanks @TuanaCelik ! I found this cookbook post of yours (which actually implements the extended OpenAIGenerator) much more helpful than the blog, which skipped the implementation!
https://haystack.deepset.ai/cookbook/query_decomposition
Thanks @TuanaCelik ! I found this cookbook post of yours (which actually implements the extended OpenAIGenerator) much more helpful than the blog, which skipped the implementation!
https://haystack.deepset.ai/cookbook/query_decomposition
Yep! It's linked from the article too! You can open the colab from there as well :)
I created the OpenAIChatGenerator (very much inspired from what @TuanaCelik has done)
class CustomOpenAIGenerator(OpenAIChatGenerator):
"""Same as OpenAIGenerator but with response_format"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
@component.output_types(replies=List[str], meta=List[Dict[str, Any]], structured_reply=BaseModel)
def run(self,
messages: List[ChatMessage],
streaming_callback: Optional[Callable[[StreamingChunk], None]] = None,
generation_kwargs: Optional[Dict[str, Any]] = None,):
generation_kwargs = {**self.generation_kwargs, **(generation_kwargs or {})}
# check if streaming_callback is passed
if "response_format" in generation_kwargs.keys():
streaming_callback = streaming_callback or self.streaming_callback
openai_formatted_messages = [_convert_message_to_openai_format(message) for message in messages]
chat_completion: Union[Stream[ChatCompletionChunk], ChatCompletion] = self.client.beta.chat.completions.parse(
model=self.model,
messages=openai_formatted_messages,
**generation_kwargs)
completions: List[ChatMessage] = []
# since streaming is disabled in beta.chat.completions.parse, check if the completion is a ChatCompletion
if isinstance(chat_completion, ChatCompletion):
completions = [self._build_structured_message(chat_completion, choice) for choice in chat_completion.choices]
# before returning, do post-processing of the completions
for message in completions:
self._check_finish_reason(message)
return {
"replies": [message.content for message in completions],
"meta": [message.meta for message in completions],
"structured_reply": completions[0].content
}
else:
return super().run(prompt, streaming_callback, generation_kwargs)
def _build_structured_message(self, completion: Any, choice: Any) -> ChatMessage:
chat_message = ChatMessage.from_assistant(choice.message.parsed or "")
chat_message.meta.update(
{
"model": completion.model,
"index": choice.index,
"finish_reason": choice.finish_reason,
"usage": dict(completion.usage),
}
)
return chat_message
I get ComponentError: Parameters of 'run' and 'run_async' methods must be the same when I try Tuana's solution. This seems like a pretty simple update while the work around involves copying and pasting big pieces of code that easily get out of sync with new versions
diff --git a/haystack/components/generators/chat/openai.py b/haystack/components/generators/chat/openai.py
index baab37f8..a5771c1a 100644
--- a/haystack/components/generators/chat/openai.py
+++ b/haystack/components/generators/chat/openai.py
@@ -270,9 +270,13 @@ class OpenAIChatGenerator:
tools=tools,
tools_strict=tools_strict,
)
- chat_completion: Union[Stream[ChatCompletionChunk], ChatCompletion] = self.client.chat.completions.create(
- **api_args
- )
+ if "response_format" in api_args:
+ if api_args.pop('stream', None):
+ raise RuntimeError("`stream` not supported in constrained generation")
+ chat_call = self.client.beta.chat.completions.parse
+ else:
+ chat_call = self.client.chat.completions.create
+ chat_completion: Union[Stream[ChatCompletionChunk], ChatCompletion] = chat_call(**api_args)
if streaming_callback is not None:
completions = self._handle_stream_response(