burr icon indicating copy to clipboard operation
burr copied to clipboard

Allow streaming actions to yield just state for all but the last one

Open elijahbenizzy opened this issue 1 year ago • 3 comments

Is your feature request related to a problem? Please describe.

This should work:

@streaming_action(reads=["prompt"], writes=['response'])
def streaming_response(state: State) -> Generator[dict, None, tuple[dict, State]]:
    response = client.chat.completions.create(
        model='gpt-3.5-turbo',
        messages=[{
            'role': 'user',
            'content': state["prompt"]
            }],
        temperature=0,
    )
    buffer = []
    for chunk in response:
        delta = chunk.choices[0].delta.content
        buffer.append(delta)
        # yield partial results
        yield {'response': delta}
    full_response = ''.join(buffer)
    # return the final result
    return {'response': full_response}, state.update(response=full_response)

Instead the yield needs to be {"response": delta}, None

Describe the solution you'd like If it's not the last yield, we should just yield the result, no need to specify None for the state update.

Tasks:

  1. Make the change here and in the function below: https://github.com/DAGWorks-Inc/burr/blob/f883885e98a0bdedc1502655ea5fb2ba81f20f49/burr/core/application.py#L253
  2. Add tests for this, copy from those that use this class, and corresponding async.

Describe alternatives you've considered Not doing it

elijahbenizzy avatar Jul 19 '24 15:07 elijahbenizzy

Is your feature request related to a problem? Please describe.

This should work:

@streaming_action(reads=["prompt"], writes=['response'])
        def streaming_response(state: State) -> Generator[dict, None, tuple[dict, State]]:
            response = client.chat.completions.create(
                model='gpt-3.5-turbo',
                messages=[{
                    'role': 'user',
                    'content': state["prompt"]
                    }],
                temperature=0,
            )
            buffer = []
            for chunk in response:
                delta = chunk.choices[0].delta.content
                buffer.append(delta)
                # yield partial results
                yield {'response': delta}
            full_response = ''.join(buffer)
            # return the final result
            return {'response': full_response}, state.update(response=full_response)

Instead the yield needs to be {"response": delta}, None

Describe the solution you'd like If it's not the last yield, we should just yield the result, no need to specify None for the state update.

Tasks:

1. Make the change here and in the function below: https://github.com/DAGWorks-Inc/burr/blob/f883885e98a0bdedc1502655ea5fb2ba81f20f49/burr/core/application.py#L253

2. Add tests for this, copy from those that use [this class](https://github.com/DAGWorks-Inc/burr/blob/f883885e98a0bdedc1502655ea5fb2ba81f20f49/tests/core/test_application.py#L555), and corresponding async.

Describe alternatives you've considered Not doing it

Hi @elijahbenizzy I'm going through this one, which is a bit more in my wheelhouse, Python.

If I understand what you want done is returning just the yield and not the state. In my understanding, by the last yield we should be at the return, which gives the state update.

I also think that the streaming_response function you've shared achieves this functionality. But I would need to incorporate it in the _run_single_step_streaming_action function in burr/burr/core/application.py?

I should also say I only have a couple of hours a week to contribute so my progress may be a bit slow.

LMK.

AgnesNM avatar Oct 14 '24 08:10 AgnesNM

Is your feature request related to a problem? Please describe. This should work:

@streaming_action(reads=["prompt"], writes=['response'])
        def streaming_response(state: State) -> Generator[dict, None, tuple[dict, State]]:
            response = client.chat.completions.create(
                model='gpt-3.5-turbo',
                messages=[{
                    'role': 'user',
                    'content': state["prompt"]
                    }],
                temperature=0,
            )
            buffer = []
            for chunk in response:
                delta = chunk.choices[0].delta.content
                buffer.append(delta)
                # yield partial results
                yield {'response': delta}
            full_response = ''.join(buffer)
            # return the final result
            return {'response': full_response}, state.update(response=full_response)

Instead the yield needs to be {"response": delta}, None Describe the solution you'd like If it's not the last yield, we should just yield the result, no need to specify None for the state update. Tasks:

1. Make the change here and in the function below: https://github.com/DAGWorks-Inc/burr/blob/f883885e98a0bdedc1502655ea5fb2ba81f20f49/burr/core/application.py#L253

2. Add tests for this, copy from those that use [this class](https://github.com/DAGWorks-Inc/burr/blob/f883885e98a0bdedc1502655ea5fb2ba81f20f49/tests/core/test_application.py#L555), and corresponding async.

Describe alternatives you've considered Not doing it

Hi @elijahbenizzy I'm going through this one, which is a bit more in my wheelhouse, Python.

If I understand what you want done is returning just the yield and not the state. In my understanding, by the last yield we should be at the return, which gives the state update.

I also think that the streaming_response function you've shared achieves this functionality. But I would need to incorporate it in the _run_single_step_streaming_action function in burr/burr/core/application.py?

I should also say I only have a couple of hours a week to contribute so my progress may be a bit slow.

LMK.

Hey! Yes, I think this is a good first issue for a python dev.

At a high-level, what we currently have is this -- see the yield {'response' : delta}, None line:

@streaming_action(reads=["prompt"], writes=['response'])
def streaming_response(state: State) -> Generator[dict, None, tuple[dict, State]]:
    response = client.chat.completions.create(
        model='gpt-3.5-turbo',
        messages=[{
            'role': 'user',
            'content': state["prompt"]
            }],
        temperature=0,
    )
    buffer = []
    for chunk in response:
        delta = chunk.choices[0].delta.content
        buffer.append(delta)
        # yield partial results
        yield {'response': delta}, None #### This is the line ####
    full_response = ''.join(buffer)
    # return the final result
    return {'response': full_response}, state.update(response=full_response)

What this enables is this:

@streaming_action(reads=["prompt"], writes=['response'])
def streaming_response(state: State) -> Generator[dict, None, tuple[dict, State]]:
    response = client.chat.completions.create(
        model='gpt-3.5-turbo',
        messages=[{
            'role': 'user',
            'content': state["prompt"]
            }],
        temperature=0,
    )
    buffer = []
    for chunk in response:
        delta = chunk.choices[0].delta.content
        buffer.append(delta)
        # yield partial results
        yield {'response': delta} #### No `None` here! ####
    full_response = ''.join(buffer)
    # return the final result
    return {'response': full_response}, state.update(response=full_response)

We have the TODO statement here, and here, which currently check for whether it is a tuple. Specifically, if it is a dict, then we should just set state to None (E.G. as if it were left out), and continue. If it is, then we check for the state value and proceed.

Should be a few lines of code, then a few tests.

The tests should be taking these:

  • https://github.com/DAGWorks-Inc/burr/blob/a1a0b3bcb0f64790615042527c0e173a6c436083/tests/core/test_application.py#L1098
  • https://github.com/DAGWorks-Inc/burr/blob/a1a0b3bcb0f64790615042527c0e173a6c436083/tests/core/test_application.py#L1151

And replacing it with an action that has just a yield state.update(...) rather than a None for all but the last.

That means copying/modifying this class, as well as this class to create variants that have the single yield, rather than yielding the tuple.

elijahbenizzy avatar Oct 14 '24 16:10 elijahbenizzy

Hey @elijahbenizzy my apologies I never got to this. Unfortunately I will not be able to contribute at this time, but I hope to do so in the near future. Just freeing this up for any other contributors.

AgnesNM avatar Jan 27 '25 06:01 AgnesNM