go-openai icon indicating copy to clipboard operation
go-openai copied to clipboard

Yet Another V2 Assistant Streaming Implemenetation

Open hayeah opened this issue 9 months ago • 1 comments

This PR implements Assistant Streaming for the OpenAI API V2 (BETA).

https://platform.openai.com/docs/api-reference/assistants-streaming

Key Features

  • No additional goroutines are created.
  • Provides a bufio.Scanner-inspired API for accessing stream events.
  • Implements type-safe processing of stream events through type switching.
  • Includes convenience unwrappers for common event types.
  • The streamer is an io.Reader, simplifying text streaming and integration with Go's standard libraries.

This PR is built on the excellent work of https://github.com/sashabaranov/go-openai/pull/737. The main departure from @coolbaluk's work is to improve the streaming response type AssistantStreamEvent to handle the polymorphic nature of the stream events better.

Work in progress. Would love to seek feedback & suggestions from the community.

Tagging:

@coolbaluk #737

@tanzyy96

@CallOrRet #731

@sashabaranov

Stream Events as Scanner

The StreamerV2 struct Scan stream events in a loop as they are received from the server.

Unlike the current implemenetation of the CompletionStreaming, he events in the stream are polymorphic and are handled naturally through type switching.

The Scan API mimics the bufio.Scanner class:

var oa *openai.Client
var req openai.CreateThreadAndRunRequest

stream, err := oa.CreateThreadAndStream(ctx, req)
if err != nil {
    return err
}
defer stream.Close()

// read the stream events until there is no Next
for stream.Next() {
    // get the current event
    event := stream.Event()

    // type switch on the event type
    switch event := event.(type) {
    case openai.StreamThreadMessageDelta:
        // type safe access to the event
        fmt.Println("Message:", event.Delta.Content)
        for _, content := range event.Delta.Content {
            if content.Text != nil {
                fmt.Println("Text:", content.Text.Value)
            }

            if content.ImageFile != nil {
                fmt.Println("ImageFile:", content.ImageFile)
            }
        }
    case openai.StreamDone:
        fmt.Println("Done")
    }
}

// check for streaming error
if err := stream.Err(); err != nil {
		return err
}

Unwrappers for Polymporphic Events

Type switching on stream events allows precise and type-safe access. However, it can be tedious to write user-level code for simpler cases. To address this, unwrapper helpers "cast" an event to a specific type and return the value if the cast is successful:

for stream.Next() {
    // we are only interested in the text deltas
    text, ok := stream.MessageDeltaText()
    if !ok {
        // skip this event if it is not a text delta
        continue
    }

    fmt.Print(text)
}

This is similar to the Bytes and Text methods in bufio.Scanner, which provide access to the current item in different forms:

scanner.Bytes()
scanner.Text()

Libraries designed to handle polymorphic data often provide similar unwrapper helpers. For example, the [gjson]https://github.com/tidwall/gjson?tab=readme-ov-file#result-type) library, offer these unwrapper methods for the polymorphic Result type:

result.Exists() bool
result.Value() interface{}
result.Int() int64
result.Uint() uint64
result.Float() float64
result.String() string
result.Bool() bool
result.Time() time.Time

Stream Events as io.Reader

The most common use case is to just stream the text, and it would be nice to have a familiar & obvious API for this.

The StreamerV2 struct implements the io.Reader interface, which wraps the Next/Scan API, to provide a simple way read the text deltas from the thread.message.delta events.

var oa *openai.Client
var req openai.CreateThreadAndRunRequest

// The StreamerV2 is also an io.Reader
s, err := oa.CreateThreadAndStream(ctx, req)
if err != nil {
    return err
}
defer s.Close()

_, err = io.Copy(os.Stdout, s)
return err

TODO Items

  • Naming convention for the stream events? There are quite a few... should they be put in a v2 package?
  • Fuzz the new SSE reader.
  • Complete the type mappings for all the stream events.
  • Integrate Stream into other methods like CreateThreadAndStream.
  • Make a common interface for stream events to return Type()

hayeah avatar May 19 '24 03:05 hayeah