langflow icon indicating copy to clipboard operation
langflow copied to clipboard

Streaming URL is not unique in multi-user environment

Open mieslep opened this issue 1 year ago • 8 comments

Bug Description

When stream=true is passed, the resulting stream_url is always the same, no matter what session_id is passed. Possibly related to #2563 , that stream is completely open so any session can consume tokens from the stream.

Reproduction

  1. Invoke any flow via API with stream=true
  2. Inspect the response init_response["outputs"][0]["outputs"][0]["artifacts"]["stream_url"] : no matter what browser / session ID is passed, this value is always the same.

Expected behavior

1, A unique stream should be created for each API call. 2. Ideally some sort of "authentication" of the caller (even if just expecting the same session_id on the request) is present

Who can help?

No response

Operating System

Windows 11

Langflow Version

1.0.14

Python Version

3.12

Screenshot

No response

Flow File

No response

mieslep avatar Aug 21 '24 09:08 mieslep

The stream_url is not unique in a multi-user environment when stream=true is passed because the initiateSession method constructs the endpoint URL using the flowId and the stream parameter, but it does not include any user-specific information or unique session identifiers. This can lead to the same stream_url being generated for different users or sessions.

To ensure a unique stream and caller authentication for each API call, you can modify the initiateSession method to include a unique session identifier (e.g., a UUID) and user-specific information in the endpoint URL or request body. Additionally, you should ensure that the server-side implementation can handle and differentiate these unique identifiers.

Here is an example of how you can modify the initiateSession method to include a unique session identifier:

import { v4 as uuidv4 } from 'uuid';

async initiateSession(flowId, inputValue, stream = false, tweaks = {}) {
    const sessionId = uuidv4(); // Generate a unique session ID
    const endpoint = `/api/v1/run/${flowId}?stream=${stream}&session_id=${sessionId}`;
    return this.post(endpoint, { input_value: inputValue, tweaks: tweaks });
}

In this example, a unique session ID is generated using the uuid library and included as a query parameter in the endpoint URL. This ensures that each API call has a unique stream_url.

Additionally, you should ensure that the server-side implementation can handle the session_id parameter and generate a unique stream_url for each session [1].

To continue talking to Dosu, mention @dosu.

dosubot[bot] avatar Aug 21 '24 09:08 dosubot[bot]

@dosu passing in different values for session_id does not change the stream_url in any way, nor is it required to pass in session_id on the GET to consume stream tokens.

mieslep avatar Aug 21 '24 09:08 mieslep

The issue arises because the backend does not currently use the session_id to generate a unique stream_url. To address this, you need to ensure that the backend generates a unique stream_url for each session.

Here's how you can modify the backend to handle unique session identifiers and generate unique stream_urls:

  1. Modify the build_vertex_stream function to use session_id: Ensure that the session_id is used to generate a unique stream URL.
@router.get("/build/{flow_id}/{vertex_id}/stream", response_class=StreamingResponse)
async def build_vertex_stream(
    flow_id: str,
    vertex_id: str,
    session_id: Optional[str] = None,
    chat_service: "ChatService" = Depends(get_chat_service),
    session_service: "SessionService" = Depends(get_session_service),
):
    try:
        async def stream_vertex():
            try:
                if not session_id:
                    raise ValueError("Session ID is required for streaming.")
                
                session_data = await session_service.load_session(session_id, flow_id=flow_id)
                graph, artifacts = session_data if session_data else (None, None)
                if not graph:
                    raise ValueError(f"No graph found for {flow_id}.")

                vertex: "ChatVertex" = graph.get_vertex(vertex_id)
                if not hasattr(vertex, "stream"):
                    raise ValueError(f"Vertex {vertex_id} does not support streaming")
                
                if isinstance(vertex._built_result, str) and vertex._built_result:
                    stream_data = StreamData(
                        event="message",
                        data={"message": f"Streaming vertex {vertex_id}"},
                    )
                    yield str(stream_data)
                    stream_data = StreamData(
                        event="message",
                        data={"chunk": vertex._built_result},
                    )
                    yield str(stream_data)
                elif not vertex.frozen or not vertex._built:
                    logger.debug(f"Streaming vertex {vertex_id}")
                    stream_data = StreamData(
                        event="message",
                        data={"message": f"Streaming vertex {vertex_id}"},
                    )
                    yield str(stream_data)
                    async for chunk in vertex.stream():
                        stream_data = StreamData(
                            event="message",
                            data={"chunk": chunk},
                        )
                        yield str(stream_data)
                elif vertex.result is not None:
                    stream_data = StreamData(
                        event="message",
                        data={"chunk": vertex._built_result},
                    )
                    yield str(stream_data)
                else:
                    raise ValueError(f"No result found for vertex {vertex_id}")

            except Exception as exc:
                logger.exception(f"Error building vertex: {exc}")
                exc_message = parse_exception(exc)
                if exc_message == "The message must be an iterator or an async iterator.":
                    exc_message = "This stream has already been closed."
                yield str(StreamData(event="error", data={"error": exc_message}))
            finally:
                logger.debug("Closing stream")
                yield str(StreamData(event="close", data={"message": "Stream closed"}))

        return StreamingResponse(stream_vertex(), media_type="text/event-stream")
    except Exception as exc:
        raise HTTPException(status_code=500, detail="Error building vertex") from exc
  1. Ensure the frontend includes session_id in the request: Modify the initiateSession method to include a unique session_id.
import { v4 as uuidv4 } from 'uuid';

async initiateSession(flowId, inputValue, stream = false, tweaks = {}) {
    const sessionId = uuidv4(); // Generate a unique session ID
    const endpoint = `/api/v1/run/${flowId}?stream=${stream}&session_id=${sessionId}`;
    return this.post(endpoint, { input_value: inputValue, tweaks: tweaks });
}
  1. Handle the session_id in the backend: Ensure that the backend uses the session_id to generate a unique stream_url.

By making these changes, each session will have a unique stream_url, ensuring that streams are not shared across different sessions [1][2].

To continue talking to Dosu, mention @dosu.

dosubot[bot] avatar Aug 21 '24 09:08 dosubot[bot]

Hello, is there any update on this issue? It would be nice if by calling the flow with the stream=true parameter you could send a response directly as a stream, avoiding using the stream_url afterwards. This way you could directly use the session logic that has already been implemented on the flow

therockerline avatar Sep 02 '24 08:09 therockerline

我也有同样的问题。在web应用程序中,让每个请求拥有自己独立的处理线程和作用域来提高并发太常见了。我在使用stream流式输出时,返回的stream url都是固定的,这导致我同时发起两个回话请求时,stream url输出的总是最后一个会话请求的数据,而不是分别返回各自会话请求的回答。我不知道该如何解决这个问题,如果要使用串行化,那样对web应用来说处理效率实在是太低了。如果有人知道如何解决这个问题,麻烦方便告知下,谢谢

guojingfeng1994 avatar Sep 07 '24 11:09 guojingfeng1994

This is a blocking issue for us, and we can't use the framework without this feature. Do you have some info if you plan to fix it, or keep it as is?

egerszu avatar Sep 25 '24 09:09 egerszu

@egerszu we are updating the streaming logic, it should be available soon

anovazzi1 avatar Sep 26 '24 18:09 anovazzi1

There also needs to be a way to authenticate the request. Just having a session ID is not enough.

devinbost avatar Oct 01 '24 21:10 devinbost

the stream right now does not use the stream url anymore since 1.0

anovazzi1 avatar Nov 29 '24 18:11 anovazzi1

@anovazzi1 I am on 1.1.1 and it still uses the stream_url. How to access the new API?

egerszu avatar Dec 09 '24 14:12 egerszu

the stream right now does not use the stream url anymore since 1.0

@anovazzi1 Hi, how to use streaming API in multi-user environment now?

yangxikun avatar Dec 20 '24 03:12 yangxikun

hey @yangxikun to use stream you can run langflow using the same endpoint as the ui, and handle the streaming directly in the build response. now our build post is basically an SSE that receives multiple events and tokens, that way we can stream without a stream url

anovazzi1 avatar Dec 20 '24 06:12 anovazzi1

@anovazzi1 Do you mean call this api?

curl 'http://localhost:3000/api/v1/build/f99e2ba4-5550-4a30-9d88-39ad1134ac39/flow?&start_component_id=ChatInput-QH6jh&log_builds=true' \
-X 'POST' \
-H 'Content-Type: application/json' \
-H 'Accept: */*' \
-H 'x-api-key: sk-xxxxxxxxxx' \
--data-binary $'{"inputs":{"input_value":"你是谁","session":"Session Dec 20 at 06:30:38"}}'

yangxikun avatar Dec 20 '24 07:12 yangxikun

yes, the response body of this api is async, so it can stream the model response

anovazzi1 avatar Dec 20 '24 22:12 anovazzi1

@anovazzi1 How can I pass extra fields(MetaData) in Chat Input through /api/v1/build/{flow_Id}/flow?

The InputValueRequest just allow a fixed field input_value.

Clipboard_Screenshot_1735025846

yangxikun avatar Dec 24 '24 07:12 yangxikun

Unfortunately you can't right now with this component. But you can update the component or create a custom component that generates a message as output and has the extra fields you need

anovazzi1 avatar Dec 24 '24 12:12 anovazzi1