langflow icon indicating copy to clipboard operation
langflow copied to clipboard

how to get stream response from langflow flow api?

Open WestLi opened this issue 1 year ago • 1 comments

It seem like impossible to fetch stream response from running the langflow flow API, so far. Is there any solution to figure it out? We found run ui page input could get the stream effect, but ui interaction was not a common way for our team. thanks.

WestLi avatar Apr 18 '24 13:04 WestLi

Langflow does not yet support streaming through their API, but after reviewing the recent configuration of API v1, I noticed that a stream flag has been added to the API endpoint. It seems likely that support will be available soon.

YamonBot avatar Apr 22 '24 01:04 YamonBot

Hi, Do we have any update about this feature ?

nhha1602 avatar Jun 10 '24 02:06 nhha1602

Hey all.

Streaming in Langflow currently works in two steps:

  1. You call the api/v1/run with ?stream=true which will return a stream_url
  2. You call the stream_url which will stream the result.

I have this script that I use to test this:

import argparse
import logging
import time
from collections import namedtuple

import httpx


def is_healthy():
    try:
        url = "http://localhost:3000/health"
        response = httpx.get(url)
        return response.status_code == 200
    except Exception as e:
        logging.error(f"Error: {str(e)}")
        return False


def initiate_session(flow_id, input_value, stream: bool = False):
    url = f"http://localhost:3000/api/v1/run/{flow_id}?stream={stream}"
    headers = {"Content-Type": "application/json"}
    data = {
        "input_value": input_value,
    }
    logging.info(f"Initiating session with data: {data}")
    healthy = is_healthy()
    while not healthy:
        time.sleep(1)
        healthy = is_healthy()

    logging.info(f"Health check passed: {healthy}")

    response = httpx.post(url, json=data, headers=headers)
    response.raise_for_status()
    if response.status_code == 200:
        return response.json()
    else:
        raise Exception("Failed to initiate session")


def stream_response(stream_url, session_id):
    stream_url = f"http://localhost:3000{stream_url}"
    params = {"session_id": session_id}

    with httpx.stream("GET", stream_url, params=params, timeout=None) as response:
        for line in response.iter_lines():
            # lines are data: {"chunk":","} and are strings
            logging.info(f"Line: {line}")


def main():
    parser = argparse.ArgumentParser(description="Test Streaming Endpoint")
    parser.add_argument(
        "--flow_id",
        default="1d5f0ac4-2312-41ee-9956-1061b5863328",
        type=str,
        help="Flow ID for the endpoint",
    )
    parser.add_argument(
        "--input_value",
        type=str,
        default="bacon related phrase",
        help="Input value for the request",
    )
    parser.add_argument(
        "--stream",
        action="store_true",
        default=True,
        help="Stream the response from the model",
    )
    try:
        args = parser.parse_args()
    except Exception as e:
        logging.exception(f"Error: {str(e)}")
        args = namedtuple("args", ["flow_id", "input_value"])(None, None)

    logging.basicConfig(level=logging.INFO, format="%(message)s")

    try:
        init_response = initiate_session(
            flow_id=args.flow_id,
            input_value=args.input_value,
            stream=args.stream,
        )
        logging.info(f"Init Response: {init_response}")
        session_id = init_response["session_id"]
        has_stream_url = "stream_url" in init_response["outputs"][0]["outputs"][0]["artifacts"]
        if not has_stream_url and args.stream:
            raise Exception("No stream URL returned")
        stream_url = init_response["outputs"][0]["outputs"][0]["artifacts"]["stream_url"]

        logging.info(f"Initiated session with ID: {session_id}")
        if stream_url:
            stream_response(stream_url, session_id)
        else:
            logging.error("No stream URL returned")
    except Exception as e:
        logging.exception(f"Error: {str(e)}")


if __name__ == "__main__":
    main()

ogabrielluiz avatar Jun 14 '24 11:06 ogabrielluiz

Do we have a JavaScript example for this? Does the stream URL implement SSE (Server-Sent Events)?

TejasQ avatar Jun 14 '24 11:06 TejasQ

At last, it has been implemented!

YamonBot avatar Jun 14 '24 11:06 YamonBot

Hi @WestLi and @YamonBot

We hope you're doing well. Just a friendly reminder that if we do not hear back from you within the next 3 days, we will close this issue. If you need more time or further assistance, please let us know.


Thank you for your understanding!

carlosrcoelho avatar Jul 17 '24 14:07 carlosrcoelho

Thank you for your contribution! This issue will be closed. If you have any questions or encounter another problem, please open a new issue and we will be ready to assist you.

carlosrcoelho avatar Jul 22 '24 00:07 carlosrcoelho

Hi there : do you have a version of the code that includes the API key parameter ? Thanks !

Arron-Clague avatar Aug 09 '24 12:08 Arron-Clague

@Arron-Clague If you ask @dosu, you'll get the answer you want!

YamonBot avatar Aug 09 '24 12:08 YamonBot

Yes : where is @dosu when you need them ? He always has the answer !

Arron-Clague avatar Aug 09 '24 13:08 Arron-Clague