OpenOpenAI icon indicating copy to clipboard operation
OpenOpenAI copied to clipboard

Add streaming support for runs

Open transitive-bullshit opened this issue 2 years ago • 2 comments

This isn't supported in the official OpenAI API yet, but it was mentioned at the OpenAI dev day that it will be coming soon, possibly via websocket and/or webhook support.

See this related issue in the OpenAI developer community.

The toughest part of this is that the runner is completely disparate from the HTTP server, as it should be, to process thread runs in an async task queue. The runner is responsible for making chat completion calls, which are streamable, so we'd have to either:

  • do some plumbing to connect the runner's execution to the result of the createRun or createThreadAndRun operations, and then pipe the chat completion calls into this stream
  • or we could move the run implementation to not be handled by an async task queue, but rather live within createRun / createThreadAndRun
    • this approach would be quite a bit simpler, but I have a feeling it's the wrong approach long-term, as runs conceptually lend themselves to being decoupled from the HTTP call. this also makes the most sense from a sandboxing perspective, and to keep the HTTP server lightweight without long-running HTTP responses
  • or move to a websocket and/or webhook approach, which is fine in and of itself, but has the huge downside of being completely different from the current SSE streaming that the chat completion API has embraced, and thinking about building apps that would potentially have to support both of these streaming approaches would make me a really sad panda

transitive-bullshit avatar Nov 15 '23 20:11 transitive-bullshit

Hello! We solved the stream problem bypassing it. Our solution is probably not the best, but it will do as a temporary solution.

What we did: we passed the callback to the runner’s chat model and received chunks with which we update the message in the prism database as updates arrive.

const handleUpdate = async (chunk) => {
          messageText += chunk
          await prisma.message.update({
            where: { id: newMessageId },
            data: { content: [
                {
                  type: 'text',
                  text: {
                    value: messageText,
                    annotations: []
                  }
                }
              ]
            }
          })
        }

const chatCompletionParams: Parameters<typeof chatModel.run>[0] = {
          messages: chatMessages,
          model: assistant.model,
          handleUpdate: handleUpdate,
          tools: convertAssistantToolsToChatMessageTools(assistant.tools),
          tool_choice:
            runSteps.length >= config.runs.maxRunSteps ? 'none' : 'auto',
}       

Also, we slightly changed the order of adding a message entry to the database. So now the answer from the assistant appears immediately, and then is updated.

On the frontend service we use visual character pooling to update new text. So, the frontend service updates the message once a second, but the user sees a smooth set (as far as possible in accordance with the current load).

If our solution is satisfactory, we can prepare a pull request. Or it can wait for more optimal solutions to emerge.

dacom-dark-sun avatar Jan 13 '24 13:01 dacom-dark-sun

Would be interesting to see the community come up with what this should look like from a purely end user UX perspective.

To me the end user would be able to pass stream: True to run creation and get an SSE stream maybe directly or maybe from the message once a new status is reached. Thoughts?

phact avatar Jan 16 '24 19:01 phact