Request/Response streaming for Finch adapter, SSE middleware
- Request streaming support #506
- First take on response streaming support #271
The implementation requires spawning a process, I'm not sure there's a way to implement streaming without it given Finch.stream/5 only returns after all data has been streamed and the idea for tesla response streaming is to return Tesla.Env with status & headers and body as a stream.
Example streaming OpenAI client:
defmodule OpenAI do
def new(token) do
middleware = [
{Tesla.Middleware.BaseUrl, "https://api.openai.com/v1"},
{Tesla.Middleware.BearerAuth, token: token},
# decode text/event-stream response
{Tesla.Middleware.JSON, decode_content_types: ["text/event-stream"]},
# decode SSE and return stream of data only (to be decoded by JSON)
{Tesla.Middleware.SSE, only: :data}
]
# use Finch as adapter
Tesla.client(middleware, {Tesla.Adapter.Finch, name: MyFinch})
end
def completion(client, prompt) do
data = %{
model: "gpt-3.5-turbo",
messages: [
%{role: "user", content: prompt}
],
# tell OpenAI to stream the response
stream: true
}
# use the new [response: :stream] adapter option
Tesla.post(client, "/chat/completions", data, opts: [adapter: [response: :stream]])
end
end
{:ok, pid} = Finch.start_link(name: MyFinch)
client = OpenAI.new("<token>")
{:ok, env} = OpenAI.completion(client, "Count to 5, with a comma between each number and no newlines.")
env.body
|> Stream.each(&IO.inspect(&1, label: "chunk"))
|> Stream.run()
Output:
chunk: %{
"choices" => [
%{"delta" => %{"role" => "assistant"}, "finish_reason" => nil, "index" => 0}
],
"created" => 1681734469,
"id" => "chatcmpl-76IFRG54YaoTcpb7bPEw5SvJZT5N6",
"model" => "gpt-3.5-turbo-0301",
"object" => "chat.completion.chunk"
}
chunk: %{
"choices" => [
%{"delta" => %{"content" => "1"}, "finish_reason" => nil, "index" => 0}
],
"created" => 1681734469,
"id" => "chatcmpl-76IFRG54YaoTcpb7bPEw5SvJZT5N6",
"model" => "gpt-3.5-turbo-0301",
"object" => "chat.completion.chunk"
}
chunk: %{
"choices" => [
%{"delta" => %{"content" => ","}, "finish_reason" => nil, "index" => 0}
],
"created" => 1681734469,
"id" => "chatcmpl-76IFRG54YaoTcpb7bPEw5SvJZT5N6",
"model" => "gpt-3.5-turbo-0301",
"object" => "chat.completion.chunk"
}
chunk: %{
"choices" => [
%{"delta" => %{"content" => " "}, "finish_reason" => nil, "index" => 0}
],
"created" => 1681734469,
"id" => "chatcmpl-76IFRG54YaoTcpb7bPEw5SvJZT5N6",
"model" => "gpt-3.5-turbo-0301",
"object" => "chat.completion.chunk"
}
chunk: %{
"choices" => [
%{"delta" => %{"content" => "2"}, "finish_reason" => nil, "index" => 0}
],
"created" => 1681734469,
"id" => "chatcmpl-76IFRG54YaoTcpb7bPEw5SvJZT5N6",
"model" => "gpt-3.5-turbo-0301",
"object" => "chat.completion.chunk"
}
chunk: %{
"choices" => [
%{"delta" => %{"content" => ","}, "finish_reason" => nil, "index" => 0}
],
"created" => 1681734469,
"id" => "chatcmpl-76IFRG54YaoTcpb7bPEw5SvJZT5N6",
"model" => "gpt-3.5-turbo-0301",
"object" => "chat.completion.chunk"
}
chunk: %{
"choices" => [
%{"delta" => %{"content" => " "}, "finish_reason" => nil, "index" => 0}
],
"created" => 1681734469,
"id" => "chatcmpl-76IFRG54YaoTcpb7bPEw5SvJZT5N6",
"model" => "gpt-3.5-turbo-0301",
"object" => "chat.completion.chunk"
}
chunk: %{
"choices" => [
%{"delta" => %{"content" => "3"}, "finish_reason" => nil, "index" => 0}
],
"created" => 1681734469,
"id" => "chatcmpl-76IFRG54YaoTcpb7bPEw5SvJZT5N6",
"model" => "gpt-3.5-turbo-0301",
"object" => "chat.completion.chunk"
}
chunk: %{
"choices" => [
%{"delta" => %{"content" => ","}, "finish_reason" => nil, "index" => 0}
],
"created" => 1681734469,
"id" => "chatcmpl-76IFRG54YaoTcpb7bPEw5SvJZT5N6",
"model" => "gpt-3.5-turbo-0301",
"object" => "chat.completion.chunk"
}
chunk: %{
"choices" => [
%{"delta" => %{"content" => " "}, "finish_reason" => nil, "index" => 0}
],
"created" => 1681734469,
"id" => "chatcmpl-76IFRG54YaoTcpb7bPEw5SvJZT5N6",
"model" => "gpt-3.5-turbo-0301",
"object" => "chat.completion.chunk"
}
chunk: %{
"choices" => [
%{"delta" => %{"content" => "4"}, "finish_reason" => nil, "index" => 0}
],
"created" => 1681734469,
"id" => "chatcmpl-76IFRG54YaoTcpb7bPEw5SvJZT5N6",
"model" => "gpt-3.5-turbo-0301",
"object" => "chat.completion.chunk"
}
chunk: %{
"choices" => [
%{"delta" => %{"content" => ","}, "finish_reason" => nil, "index" => 0}
],
"created" => 1681734469,
"id" => "chatcmpl-76IFRG54YaoTcpb7bPEw5SvJZT5N6",
"model" => "gpt-3.5-turbo-0301",
"object" => "chat.completion.chunk"
}
chunk: %{
"choices" => [
%{"delta" => %{"content" => " "}, "finish_reason" => nil, "index" => 0}
],
"created" => 1681734469,
"id" => "chatcmpl-76IFRG54YaoTcpb7bPEw5SvJZT5N6",
"model" => "gpt-3.5-turbo-0301",
"object" => "chat.completion.chunk"
}
chunk: %{
"choices" => [
%{"delta" => %{"content" => "5"}, "finish_reason" => nil, "index" => 0}
],
"created" => 1681734469,
"id" => "chatcmpl-76IFRG54YaoTcpb7bPEw5SvJZT5N6",
"model" => "gpt-3.5-turbo-0301",
"object" => "chat.completion.chunk"
}
chunk: %{
"choices" => [%{"delta" => %{}, "finish_reason" => "stop", "index" => 0}],
"created" => 1681734469,
"id" => "chatcmpl-76IFRG54YaoTcpb7bPEw5SvJZT5N6",
"model" => "gpt-3.5-turbo-0301",
"object" => "chat.completion.chunk"
}
chunk: "[DONE]"
Love it, would like to use it, here's a PR with error handling fix: https://github.com/elixir-tesla/tesla/pull/573
Thanks for this. I'd love to be able to use the streaming. This would allow me to move away from HTTPoison for streaming a response. Are there plans for this to be merged soon?
Please merge!! 😄