burr icon indicating copy to clipboard operation
burr copied to clipboard

Streaming Async

Open elijahbenizzy opened this issue 1 year ago • 4 comments

This is tricky as async generators have no return capability. The way this will likely work is that the last yield is the final result:

@streaming_action(reads=["prompt"], writes=["prompt"])
def streaming_chat_call(state: State, **run_kwargs) -> Generator[dict, None, Tuple[dict, State]]:
    client = openai.Client()
    response = client.chat.completions.create(
        model='gpt-3.5-turbo',
        messages=[{
            'role': 'user',
            'content': state["prompt"]
        }],
        temperature=0,
        stream=True,
    )
    buffer = []
    for chunk in response:
        delta = chunk.choices[0].delta.content
        buffer.append(delta)
        yield {'response': delta}
    full_response = ''.join(buffer)
    yield {'response': full_response}, state.append(response=full_response)

We may want to consider adding this support for the synchronous one as well, as it could keep things consistent.

elijahbenizzy avatar Mar 10 '24 18:03 elijahbenizzy

could just add an __end__ token?

Acutally just returning a tuple in both cases, and it's None when it hasn't finished, and when not it's done.

This means in the subsequent control flow they will already have the parameter to check to see if the end of the stream has been reached.

skrawcz avatar Mar 10 '24 22:03 skrawcz

just

What would that look like re: code? We still have to return the final result + state update.

elijahbenizzy avatar Mar 10 '24 23:03 elijahbenizzy

just

What would that look like re: code? We still have to return the final result + state update.

@streaming_action(reads=["prompt"], writes=["prompt"])
async def streaming_chat_call(state: State, **run_kwargs) -> Generator[Tuple[dict, None], None, Tuple[dict, State]]:
    client = openai.Client()
    response = client.chat.completions.create(
        model='gpt-3.5-turbo',
        messages=[{
            'role': 'user',
            'content': state["prompt"]
        }],
        temperature=0,
        stream=True,
    )
    buffer = []
    for chunk in response:
        delta = chunk.choices[0].delta.content
        buffer.append(delta)
        yield {'response': delta}, None
    full_response = ''.join(buffer)
    yield {'response': full_response}, state.append(response=full_response)

is what I was thinking ....

skrawcz avatar Mar 10 '24 23:03 skrawcz

OK, working on this shortly. @skrawcz your API makes a lot of sense IMO (although I think needs an async for loop in the loop through response).

elijahbenizzy avatar May 19 '24 01:05 elijahbenizzy

This is complete, see #190

elijahbenizzy avatar Jun 17 '24 23:06 elijahbenizzy