Streaming URL is not unique in multi-user environment
Bug Description
When stream=true is passed, the resulting stream_url is always the same, no matter what session_id is passed. Possibly related to #2563 , that stream is completely open so any session can consume tokens from the stream.
Reproduction
- Invoke any flow via API with
stream=true - Inspect the response
init_response["outputs"][0]["outputs"][0]["artifacts"]["stream_url"]: no matter what browser / session ID is passed, this value is always the same.
Expected behavior
1, A unique stream should be created for each API call.
2. Ideally some sort of "authentication" of the caller (even if just expecting the same session_id on the request) is present
Who can help?
No response
Operating System
Windows 11
Langflow Version
1.0.14
Python Version
3.12
Screenshot
No response
Flow File
No response
The stream_url is not unique in a multi-user environment when stream=true is passed because the initiateSession method constructs the endpoint URL using the flowId and the stream parameter, but it does not include any user-specific information or unique session identifiers. This can lead to the same stream_url being generated for different users or sessions.
To ensure a unique stream and caller authentication for each API call, you can modify the initiateSession method to include a unique session identifier (e.g., a UUID) and user-specific information in the endpoint URL or request body. Additionally, you should ensure that the server-side implementation can handle and differentiate these unique identifiers.
Here is an example of how you can modify the initiateSession method to include a unique session identifier:
import { v4 as uuidv4 } from 'uuid';
async initiateSession(flowId, inputValue, stream = false, tweaks = {}) {
const sessionId = uuidv4(); // Generate a unique session ID
const endpoint = `/api/v1/run/${flowId}?stream=${stream}&session_id=${sessionId}`;
return this.post(endpoint, { input_value: inputValue, tweaks: tweaks });
}
In this example, a unique session ID is generated using the uuid library and included as a query parameter in the endpoint URL. This ensures that each API call has a unique stream_url.
Additionally, you should ensure that the server-side implementation can handle the session_id parameter and generate a unique stream_url for each session [1].
To continue talking to Dosu, mention @dosu.
@dosu passing in different values for session_id does not change the stream_url in any way, nor is it required to pass in session_id on the GET to consume stream tokens.
The issue arises because the backend does not currently use the session_id to generate a unique stream_url. To address this, you need to ensure that the backend generates a unique stream_url for each session.
Here's how you can modify the backend to handle unique session identifiers and generate unique stream_urls:
- Modify the
build_vertex_streamfunction to usesession_id: Ensure that thesession_idis used to generate a unique stream URL.
@router.get("/build/{flow_id}/{vertex_id}/stream", response_class=StreamingResponse)
async def build_vertex_stream(
flow_id: str,
vertex_id: str,
session_id: Optional[str] = None,
chat_service: "ChatService" = Depends(get_chat_service),
session_service: "SessionService" = Depends(get_session_service),
):
try:
async def stream_vertex():
try:
if not session_id:
raise ValueError("Session ID is required for streaming.")
session_data = await session_service.load_session(session_id, flow_id=flow_id)
graph, artifacts = session_data if session_data else (None, None)
if not graph:
raise ValueError(f"No graph found for {flow_id}.")
vertex: "ChatVertex" = graph.get_vertex(vertex_id)
if not hasattr(vertex, "stream"):
raise ValueError(f"Vertex {vertex_id} does not support streaming")
if isinstance(vertex._built_result, str) and vertex._built_result:
stream_data = StreamData(
event="message",
data={"message": f"Streaming vertex {vertex_id}"},
)
yield str(stream_data)
stream_data = StreamData(
event="message",
data={"chunk": vertex._built_result},
)
yield str(stream_data)
elif not vertex.frozen or not vertex._built:
logger.debug(f"Streaming vertex {vertex_id}")
stream_data = StreamData(
event="message",
data={"message": f"Streaming vertex {vertex_id}"},
)
yield str(stream_data)
async for chunk in vertex.stream():
stream_data = StreamData(
event="message",
data={"chunk": chunk},
)
yield str(stream_data)
elif vertex.result is not None:
stream_data = StreamData(
event="message",
data={"chunk": vertex._built_result},
)
yield str(stream_data)
else:
raise ValueError(f"No result found for vertex {vertex_id}")
except Exception as exc:
logger.exception(f"Error building vertex: {exc}")
exc_message = parse_exception(exc)
if exc_message == "The message must be an iterator or an async iterator.":
exc_message = "This stream has already been closed."
yield str(StreamData(event="error", data={"error": exc_message}))
finally:
logger.debug("Closing stream")
yield str(StreamData(event="close", data={"message": "Stream closed"}))
return StreamingResponse(stream_vertex(), media_type="text/event-stream")
except Exception as exc:
raise HTTPException(status_code=500, detail="Error building vertex") from exc
- Ensure the frontend includes
session_idin the request: Modify theinitiateSessionmethod to include a uniquesession_id.
import { v4 as uuidv4 } from 'uuid';
async initiateSession(flowId, inputValue, stream = false, tweaks = {}) {
const sessionId = uuidv4(); // Generate a unique session ID
const endpoint = `/api/v1/run/${flowId}?stream=${stream}&session_id=${sessionId}`;
return this.post(endpoint, { input_value: inputValue, tweaks: tweaks });
}
- Handle the
session_idin the backend: Ensure that the backend uses thesession_idto generate a uniquestream_url.
By making these changes, each session will have a unique stream_url, ensuring that streams are not shared across different sessions [1][2].
To continue talking to Dosu, mention @dosu.
Hello, is there any update on this issue? It would be nice if by calling the flow with the stream=true parameter you could send a response directly as a stream, avoiding using the stream_url afterwards.
This way you could directly use the session logic that has already been implemented on the flow
我也有同样的问题。在web应用程序中,让每个请求拥有自己独立的处理线程和作用域来提高并发太常见了。我在使用stream流式输出时,返回的stream url都是固定的,这导致我同时发起两个回话请求时,stream url输出的总是最后一个会话请求的数据,而不是分别返回各自会话请求的回答。我不知道该如何解决这个问题,如果要使用串行化,那样对web应用来说处理效率实在是太低了。如果有人知道如何解决这个问题,麻烦方便告知下,谢谢
This is a blocking issue for us, and we can't use the framework without this feature. Do you have some info if you plan to fix it, or keep it as is?
@egerszu we are updating the streaming logic, it should be available soon
There also needs to be a way to authenticate the request. Just having a session ID is not enough.
the stream right now does not use the stream url anymore since 1.0
@anovazzi1 I am on 1.1.1 and it still uses the stream_url. How to access the new API?
the stream right now does not use the stream url anymore since 1.0
@anovazzi1 Hi, how to use streaming API in multi-user environment now?
hey @yangxikun to use stream you can run langflow using the same endpoint as the ui, and handle the streaming directly in the build response. now our build post is basically an SSE that receives multiple events and tokens, that way we can stream without a stream url
@anovazzi1 Do you mean call this api?
curl 'http://localhost:3000/api/v1/build/f99e2ba4-5550-4a30-9d88-39ad1134ac39/flow?&start_component_id=ChatInput-QH6jh&log_builds=true' \
-X 'POST' \
-H 'Content-Type: application/json' \
-H 'Accept: */*' \
-H 'x-api-key: sk-xxxxxxxxxx' \
--data-binary $'{"inputs":{"input_value":"你是谁","session":"Session Dec 20 at 06:30:38"}}'
yes, the response body of this api is async, so it can stream the model response
@anovazzi1 How can I pass extra fields(MetaData) in Chat Input through /api/v1/build/{flow_Id}/flow?
The InputValueRequest just allow a fixed field input_value.
Unfortunately you can't right now with this component. But you can update the component or create a custom component that generates a message as output and has the extra fields you need