haystack icon indicating copy to clipboard operation
haystack copied to clipboard

Option to enable structured outputs with OpenAI Generators

Open TuanaCelik opened this issue 1 year ago • 9 comments

Is your feature request related to a problem? Please describe. OpenAI and many other llm providers are introducing structured outputs. See this doc

  • It will only be enabled for some models
  • It uses .parse and not .create the way we use in our generators right now

Describe the solution you'd like It would be great if we can add support for this either here, or as an experimental feature while this is still in beta. For example, here's what I've tried to build:

from haystack import Pipeline
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator
from pydantic import BaseModel

class DecomposedQuestions(BaseModel):
    questions: list[str]

splitter_prompt = """
You are a query engine.
You prepare queries that will be sent to a web search component.
Sometimes, these queries are very complex.
You split up complex queries into multiple queries so that you can run multiple searches to find an answer.
When you split a query, you separate the sub-queries with '//'.
If the query is simple, then keep it as it is.
###
Example 1:
Query: Did Microsoft or Google make more money last year?
Decomposed Questions: DecomposedQuestions(questions=['How much profit did Microsoft make?', 'How much profit did Google make?'])
###
Example 2:
Query: What is the capital of Germany?
Decomposed Questions: DecomposedQuestions(questions=['What is the capital of Germany?'])
###
Example 3:
Query: {{question}}
Decomposed Questions:
"""

builder = PromptBuilder(splitter_prompt)
llm = OpenAIGenerator(model="gpt-4o-mini", generation_kwargs={"response_format": DecomposedQuestions})

pipeline = Pipeline()

pipeline.add_component("prompt", builder)
pipeline.add_component("llm", llm)

pipeline.connect("prompt", "llm")

Or something similar..

currently this will result in the following error:

[/usr/local/lib/python3.10/dist-packages/openai/resources/chat/completions.py](https://localhost:8080/#) in validate_response_format(response_format)
   1414 def validate_response_format(response_format: object) -> None:
   1415     if inspect.isclass(response_format) and issubclass(response_format, pydantic.BaseModel):
-> 1416         raise TypeError(
   1417             "You tried to pass a `BaseModel` class to `chat.completions.create()`; You must use `beta.chat.completions.parse()` instead"
   1418         )

TypeError: You tried to pass a `BaseModel` class to `chat.completions.create()`; You must use `beta.chat.completions.parse()` instead

TuanaCelik avatar Aug 23 '24 10:08 TuanaCelik

Yes, please implement this : )

dashinja avatar Sep 11 '24 01:09 dashinja

@TuanaCelik, thank you for proposing this. Did you implement it as a custom component in the near-term?

thompsondt avatar Oct 02 '24 17:10 thompsondt

Hey @dashinja and @thompsondt

Check out the query decomposition article/recipe. It's not an integration/component officially but I sneaked in an implementation there to help out in the meantime:

https://haystack.deepset.ai/blog/query-decomposition

TuanaCelik avatar Oct 02 '24 17:10 TuanaCelik

I'm going to try implementing this. The decomposition example maps almost 1:1 to what I'm anticipating in multiple queries.

thompsondt avatar Oct 02 '24 18:10 thompsondt

Thanks @TuanaCelik ! I found this cookbook post of yours (which actually implements the extended OpenAIGenerator) much more helpful than the blog, which skipped the implementation!

https://haystack.deepset.ai/cookbook/query_decomposition

arubisov avatar Oct 09 '24 17:10 arubisov

Thanks @TuanaCelik ! I found this cookbook post of yours (which actually implements the extended OpenAIGenerator) much more helpful than the blog, which skipped the implementation!

https://haystack.deepset.ai/cookbook/query_decomposition

Yep! It's linked from the article too! You can open the colab from there as well :)

TuanaCelik avatar Oct 09 '24 17:10 TuanaCelik

I created the OpenAIChatGenerator (very much inspired from what @TuanaCelik has done)

class CustomOpenAIGenerator(OpenAIChatGenerator):
    """Same as OpenAIGenerator but with response_format"""
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    @component.output_types(replies=List[str], meta=List[Dict[str, Any]], structured_reply=BaseModel)
    def run(self,
        messages: List[ChatMessage],
        streaming_callback: Optional[Callable[[StreamingChunk], None]] = None, 
        generation_kwargs: Optional[Dict[str, Any]] = None,):
            
      generation_kwargs = {**self.generation_kwargs, **(generation_kwargs or {})}
      # check if streaming_callback is passed
      if "response_format" in generation_kwargs.keys():
        streaming_callback = streaming_callback or self.streaming_callback
        openai_formatted_messages = [_convert_message_to_openai_format(message) for message in messages]
        chat_completion: Union[Stream[ChatCompletionChunk], ChatCompletion] = self.client.beta.chat.completions.parse(
            model=self.model,
            messages=openai_formatted_messages,

            **generation_kwargs)

            
        completions: List[ChatMessage] = []
        # since streaming is disabled in beta.chat.completions.parse, check if the completion is a ChatCompletion
        if isinstance(chat_completion, ChatCompletion):
            completions = [self._build_structured_message(chat_completion, choice) for choice in chat_completion.choices]

        # before returning, do post-processing of the completions
        for message in completions:
            self._check_finish_reason(message)
        
        
        return {
                "replies": [message.content for message in completions],
                "meta": [message.meta for message in completions],
                "structured_reply": completions[0].content
                }

      else:
          return super().run(prompt, streaming_callback, generation_kwargs)
        

    def _build_structured_message(self, completion: Any, choice: Any) -> ChatMessage:
        chat_message = ChatMessage.from_assistant(choice.message.parsed or "")
        chat_message.meta.update(
            {
                "model": completion.model,
                "index": choice.index,
                "finish_reason": choice.finish_reason,
                "usage": dict(completion.usage),
            }
        )
        return chat_message

rhajou avatar Jan 22 '25 08:01 rhajou

I get ComponentError: Parameters of 'run' and 'run_async' methods must be the same when I try Tuana's solution. This seems like a pretty simple update while the work around involves copying and pasting big pieces of code that easily get out of sync with new versions

Permafacture avatar Apr 24 '25 16:04 Permafacture

diff --git a/haystack/components/generators/chat/openai.py b/haystack/components/generators/chat/openai.py
index baab37f8..a5771c1a 100644
--- a/haystack/components/generators/chat/openai.py
+++ b/haystack/components/generators/chat/openai.py
@@ -270,9 +270,13 @@ class OpenAIChatGenerator:
             tools=tools,
             tools_strict=tools_strict,
         )
-        chat_completion: Union[Stream[ChatCompletionChunk], ChatCompletion] = self.client.chat.completions.create(
-            **api_args
-        )
+        if "response_format" in api_args:
+            if api_args.pop('stream', None):
+                raise RuntimeError("`stream` not supported in constrained generation")
+            chat_call = self.client.beta.chat.completions.parse
+        else:
+            chat_call = self.client.chat.completions.create
+        chat_completion: Union[Stream[ChatCompletionChunk], ChatCompletion] = chat_call(**api_args)

         if streaming_callback is not None:
             completions = self._handle_stream_response(

Permafacture avatar Apr 24 '25 17:04 Permafacture