burr icon indicating copy to clipboard operation
burr copied to clipboard

broken stream when handling raised exceptions

Open kajocina opened this issue 3 months ago • 0 comments

The streaming state machine when having a raised error inside one of the actions doesn't have a way of gracefully shutting down the stream in the "finally" clause of try/except/finally block.

Steps to replicate behavior

This is the self-contained snipper to reproduce the behaviour. It's a FastAPI endpoint which you can run with python and then curl it. The code raises an error suddenly when the counter is equal 5, but you can comment it out to see that it should finish without this: curl: (18) transfer closed with outstanding read data remaining

Python sippet:

import datetime
from typing import AsyncGenerator
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import uvicorn

from burr.core import State, ApplicationBuilder, expr
from burr.core.action import streaming_action

app = FastAPI(title="Bug Report Demo")

@streaming_action(reads=["counter"], writes=["counter"])
async def increment(state: State):
    """Mock action for report - increment the counter by 1"""
    current_count = state["counter"]
    exception_occurred = False
    try:
        if current_count == 5:
            raise ValueError("Raising to show some exception from some function in the action!")
    except ValueError:
        yield {"response": "error! some helpful info for the frontend..."}, None
        exception_occurred = True
        raise
    finally:
        if exception_occurred:
            yield {"response": "gracefully stopping the stream..."}, None
            yield {}, state
    current_count += 1
    print("Count: ", current_count)
    yield {"response": "increment"}, None
    yield {}, state.update(counter=current_count)


@streaming_action(reads=["counter"], writes=[])
async def exit_counter(state: State):
    """Print the current count and the current time"""
    current_count = state["counter"]
    print(f"Finished counting to {current_count} at {datetime.datetime.now():%H:%M:%S %Y-%m-%d}")
    yield {"response": "exit_counter"}, None
    yield {}, state


async def build_app():
    """Build the state machine application"""
    return await (
        ApplicationBuilder()
        .with_actions(increment, exit_counter)
        .with_transitions(
            ("increment", "increment", expr("counter < 10")),
            ("increment", "exit_counter"),
        )
        .with_state(counter=0)
        .with_entrypoint("increment")
        .abuild()
    )

async def generate_stream() -> AsyncGenerator[str, None]:
    """Generate the stream of responses"""
    app = await build_app()
    async for action, streaming_container in app.astream_iterate(
        inputs={},
        halt_after=["exit_counter"],
    ):
        async for item in streaming_container:
            yield str(item) + "\n\n"

@app.get("/stream")
async def stream_endpoint():
    """Endpoint to demonstrate the bug report with streaming response"""
    return StreamingResponse(
        generate_stream(),
        media_type="text/plain",
    )

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)


Stack Traces

Curl output:

`curl -N http://localhost:8000/stream {'response': 'increment'}

{'response': 'increment'}

{'response': 'increment'}

{'response': 'increment'}

{'response': 'increment'}

{'response': 'error! some helpful info for the frontend...'}

{'response': 'gracefully stopping the stream...'}

curl: (18) transfer closed with outstanding read data remaining`

Expected behavior

I would expect the ability to gracefully close the stream i.e. stream back the error contents to the frontend (or any other consumer) after raising the Exception and then closing the burr operation by yielding the State instead of None. It seems now it's not really possible because the last yield in finally "gracefully stopping the stream..." is the last thing that is ever ran.

It would perhaps be good to be able to yield a textual response while closing the stream with:

yield response_with_error_info, state

kajocina avatar Sep 28 '25 13:09 kajocina