Allow streaming actions to yield just state for all but the last one
Is your feature request related to a problem? Please describe.
This should work:
@streaming_action(reads=["prompt"], writes=['response'])
def streaming_response(state: State) -> Generator[dict, None, tuple[dict, State]]:
response = client.chat.completions.create(
model='gpt-3.5-turbo',
messages=[{
'role': 'user',
'content': state["prompt"]
}],
temperature=0,
)
buffer = []
for chunk in response:
delta = chunk.choices[0].delta.content
buffer.append(delta)
# yield partial results
yield {'response': delta}
full_response = ''.join(buffer)
# return the final result
return {'response': full_response}, state.update(response=full_response)
Instead the yield needs to be {"response": delta}, None
Describe the solution you'd like
If it's not the last yield, we should just yield the result, no need to specify None for the state update.
Tasks:
- Make the change here and in the function below: https://github.com/DAGWorks-Inc/burr/blob/f883885e98a0bdedc1502655ea5fb2ba81f20f49/burr/core/application.py#L253
- Add tests for this, copy from those that use this class, and corresponding async.
Describe alternatives you've considered Not doing it
Is your feature request related to a problem? Please describe.
This should work:
@streaming_action(reads=["prompt"], writes=['response']) def streaming_response(state: State) -> Generator[dict, None, tuple[dict, State]]: response = client.chat.completions.create( model='gpt-3.5-turbo', messages=[{ 'role': 'user', 'content': state["prompt"] }], temperature=0, ) buffer = [] for chunk in response: delta = chunk.choices[0].delta.content buffer.append(delta) # yield partial results yield {'response': delta} full_response = ''.join(buffer) # return the final result return {'response': full_response}, state.update(response=full_response)Instead the yield needs to be
{"response": delta}, NoneDescribe the solution you'd like If it's not the last yield, we should just yield the result, no need to specify
Nonefor the state update.Tasks:
1. Make the change here and in the function below: https://github.com/DAGWorks-Inc/burr/blob/f883885e98a0bdedc1502655ea5fb2ba81f20f49/burr/core/application.py#L253 2. Add tests for this, copy from those that use [this class](https://github.com/DAGWorks-Inc/burr/blob/f883885e98a0bdedc1502655ea5fb2ba81f20f49/tests/core/test_application.py#L555), and corresponding async.Describe alternatives you've considered Not doing it
Hi @elijahbenizzy I'm going through this one, which is a bit more in my wheelhouse, Python.
If I understand what you want done is returning just the yield and not the state. In my understanding, by the last yield we should be at the return, which gives the state update.
I also think that the streaming_response function you've shared achieves this functionality. But I would need to incorporate it in the _run_single_step_streaming_action function in burr/burr/core/application.py?
I should also say I only have a couple of hours a week to contribute so my progress may be a bit slow.
LMK.
Is your feature request related to a problem? Please describe. This should work:
@streaming_action(reads=["prompt"], writes=['response']) def streaming_response(state: State) -> Generator[dict, None, tuple[dict, State]]: response = client.chat.completions.create( model='gpt-3.5-turbo', messages=[{ 'role': 'user', 'content': state["prompt"] }], temperature=0, ) buffer = [] for chunk in response: delta = chunk.choices[0].delta.content buffer.append(delta) # yield partial results yield {'response': delta} full_response = ''.join(buffer) # return the final result return {'response': full_response}, state.update(response=full_response)Instead the yield needs to be
{"response": delta}, NoneDescribe the solution you'd like If it's not the last yield, we should just yield the result, no need to specifyNonefor the state update. Tasks:1. Make the change here and in the function below: https://github.com/DAGWorks-Inc/burr/blob/f883885e98a0bdedc1502655ea5fb2ba81f20f49/burr/core/application.py#L253 2. Add tests for this, copy from those that use [this class](https://github.com/DAGWorks-Inc/burr/blob/f883885e98a0bdedc1502655ea5fb2ba81f20f49/tests/core/test_application.py#L555), and corresponding async.Describe alternatives you've considered Not doing it
Hi @elijahbenizzy I'm going through this one, which is a bit more in my wheelhouse, Python.
If I understand what you want done is returning just the yield and not the state. In my understanding, by the last yield we should be at the return, which gives the state update.
I also think that the streaming_response function you've shared achieves this functionality. But I would need to incorporate it in the _run_single_step_streaming_action function in burr/burr/core/application.py?
I should also say I only have a couple of hours a week to contribute so my progress may be a bit slow.
LMK.
Hey! Yes, I think this is a good first issue for a python dev.
At a high-level, what we currently have is this -- see the yield {'response' : delta}, None line:
@streaming_action(reads=["prompt"], writes=['response'])
def streaming_response(state: State) -> Generator[dict, None, tuple[dict, State]]:
response = client.chat.completions.create(
model='gpt-3.5-turbo',
messages=[{
'role': 'user',
'content': state["prompt"]
}],
temperature=0,
)
buffer = []
for chunk in response:
delta = chunk.choices[0].delta.content
buffer.append(delta)
# yield partial results
yield {'response': delta}, None #### This is the line ####
full_response = ''.join(buffer)
# return the final result
return {'response': full_response}, state.update(response=full_response)
What this enables is this:
@streaming_action(reads=["prompt"], writes=['response'])
def streaming_response(state: State) -> Generator[dict, None, tuple[dict, State]]:
response = client.chat.completions.create(
model='gpt-3.5-turbo',
messages=[{
'role': 'user',
'content': state["prompt"]
}],
temperature=0,
)
buffer = []
for chunk in response:
delta = chunk.choices[0].delta.content
buffer.append(delta)
# yield partial results
yield {'response': delta} #### No `None` here! ####
full_response = ''.join(buffer)
# return the final result
return {'response': full_response}, state.update(response=full_response)
We have the TODO statement here, and here, which currently check for whether it is a tuple. Specifically, if it is a dict, then we should just set state to None (E.G. as if it were left out), and continue. If it is, then we check for the state value and proceed.
Should be a few lines of code, then a few tests.
The tests should be taking these:
- https://github.com/DAGWorks-Inc/burr/blob/a1a0b3bcb0f64790615042527c0e173a6c436083/tests/core/test_application.py#L1098
- https://github.com/DAGWorks-Inc/burr/blob/a1a0b3bcb0f64790615042527c0e173a6c436083/tests/core/test_application.py#L1151
And replacing it with an action that has just a yield state.update(...) rather than a None for all but the last.
That means copying/modifying this class, as well as this class to create variants that have the single yield, rather than yielding the tuple.
Hey @elijahbenizzy my apologies I never got to this. Unfortunately I will not be able to contribute at this time, but I hope to do so in the near future. Just freeing this up for any other contributors.