go-openai
go-openai copied to clipboard
Yet Another V2 Assistant Streaming Implemenetation
This PR implements Assistant Streaming for the OpenAI API V2 (BETA).
https://platform.openai.com/docs/api-reference/assistants-streaming
Key Features
- No additional goroutines are created.
- Provides a bufio.Scanner-inspired API for accessing stream events.
- Implements type-safe processing of stream events through type switching.
- Includes convenience unwrappers for common event types.
- The streamer is an io.Reader, simplifying text streaming and integration with Go's standard libraries.
This PR is built on the excellent work of https://github.com/sashabaranov/go-openai/pull/737. The main departure from @coolbaluk's work is to improve the streaming response type AssistantStreamEvent
to handle the polymorphic nature of the stream events better.
Work in progress. Would love to seek feedback & suggestions from the community.
Tagging:
@coolbaluk #737
@tanzyy96
@CallOrRet #731
@sashabaranov
Stream Events as Scanner
The StreamerV2 struct Scan
stream events in a loop as they are received from the server.
Unlike the current implemenetation of the CompletionStreaming
, he events in the stream are polymorphic and are handled naturally through type switching.
The Scan
API mimics the bufio.Scanner
class:
var oa *openai.Client
var req openai.CreateThreadAndRunRequest
stream, err := oa.CreateThreadAndStream(ctx, req)
if err != nil {
return err
}
defer stream.Close()
// read the stream events until there is no Next
for stream.Next() {
// get the current event
event := stream.Event()
// type switch on the event type
switch event := event.(type) {
case openai.StreamThreadMessageDelta:
// type safe access to the event
fmt.Println("Message:", event.Delta.Content)
for _, content := range event.Delta.Content {
if content.Text != nil {
fmt.Println("Text:", content.Text.Value)
}
if content.ImageFile != nil {
fmt.Println("ImageFile:", content.ImageFile)
}
}
case openai.StreamDone:
fmt.Println("Done")
}
}
// check for streaming error
if err := stream.Err(); err != nil {
return err
}
Unwrappers for Polymporphic Events
Type switching on stream events allows precise and type-safe access. However, it can be tedious to write user-level code for simpler cases. To address this, unwrapper helpers "cast" an event to a specific type and return the value if the cast is successful:
for stream.Next() {
// we are only interested in the text deltas
text, ok := stream.MessageDeltaText()
if !ok {
// skip this event if it is not a text delta
continue
}
fmt.Print(text)
}
This is similar to the Bytes and Text methods in bufio.Scanner, which provide access to the current item in different forms:
scanner.Bytes()
scanner.Text()
Libraries designed to handle polymorphic data often provide similar unwrapper helpers. For example, the [gjson]https://github.com/tidwall/gjson?tab=readme-ov-file#result-type) library, offer these unwrapper methods for the polymorphic Result
type:
result.Exists() bool
result.Value() interface{}
result.Int() int64
result.Uint() uint64
result.Float() float64
result.String() string
result.Bool() bool
result.Time() time.Time
Stream Events as io.Reader
The most common use case is to just stream the text, and it would be nice to have a familiar & obvious API for this.
The StreamerV2
struct implements the io.Reader
interface, which wraps the
Next/Scan
API, to provide a simple way read the text deltas from the
thread.message.delta
events.
var oa *openai.Client
var req openai.CreateThreadAndRunRequest
// The StreamerV2 is also an io.Reader
s, err := oa.CreateThreadAndStream(ctx, req)
if err != nil {
return err
}
defer s.Close()
_, err = io.Copy(os.Stdout, s)
return err
TODO Items
- Naming convention for the stream events? There are quite a few... should they be put in a v2 package?
- Fuzz the new SSE reader.
- Complete the type mappings for all the stream events.
- Integrate
Stream
into other methods likeCreateThreadAndStream
. - Make a common interface for stream events to return
Type()