ragflow icon indicating copy to clipboard operation
ragflow copied to clipboard

[Question]: The stream interface of converse with chat assistant returned the full data, how to achieve incremental SSE?

Open tilince opened this issue 1 year ago • 7 comments

Describe your problem

I need to get the answer in the streaming data and render it on the page, because the answer returned every time is full, so I need to replace the rendered answer content to achieve incremental update.

The problem is that the first half of the returned content is not exactly the same, so I can't replace it. Can you return only incremental data each time, just like the ChatGPT interface? It does not return what has already been returned.

tilince avatar Nov 27 '24 06:11 tilince

It needs a lot of alteration both in backend and front-end code.

KevinHuSh avatar Nov 28 '24 03:11 KevinHuSh

It needs a lot of alteration both in backend and front-end code.

How does your chat ui work, by overwriting the old content with new content each time?

tilince avatar Nov 28 '24 09:11 tilince

It needs a lot of alteration both in backend and front-end code.

How does your chat ui work, by overwriting the old content with new content each time?

image

cike8899 avatar Nov 29 '24 07:11 cike8899

It needs a lot of alteration both in backend and front-end code.

How does your chat ui work, by overwriting the old content with new content each time?

image

Thank you very much! I'll do some research. 😁

tilince avatar Dec 02 '24 02:12 tilince

Here some snippet doing it the angular way:


const request: CompletionRequest = {
      question: <your-question>,
      stream: true,
      session_id: sessionId
    };

const url = `https://<ragflow-host>/api/v1/agents/<your-agent-id>/completions`;

return new Observable<CompletionResponse>((s) => {

      // Get completion event stream from agent
      this.http.request(
        'post',
        url,
        {
          body: request,
          observe: 'events',
          reportProgress: true,
          responseType: 'text'
        }).subscribe((e: HttpEvent<string>) => {
          if (e.type === HttpEventType.DownloadProgress) {
            const text = (e as HttpDownloadProgressEvent).partialText!;

            // Quick and dirty observation of the stream
            // ...needs improvement!!
            const data = text.split('data:');
            if (data.length > 2) {
              const part = JSON.parse(data[data.length - 2]) as CompletionResponse;              
              s.next(part);
            }
          } else if (e.type === HttpEventType.Response) {
            s.complete();
          }
        });
    });

hoharald avatar Feb 17 '25 09:02 hoharald

It needs a lot of alteration both in backend and front-end code.

In our scenario, streaming output generates very long texts. Returning the full content at once results in an enormous amount of data being transferred through the interface. We’re very much looking forward to improvements in this area.

ChanningZhang avatar May 12 '25 02:05 ChanningZhang

I have the same problem, and here's my solution

async def consult(session, prompt):
  last_chunk_time = asyncio.get_event_loop().time()
  first_chunk_received = False
  try:
    url = f"<YOUR_HOST>/api/v1/chats/{config.chat_id}/completions"
    headers = {
      "Content-Type": "application/json",
      "Authorization": f"Bearer {config.api_key}"
    }
    payload = {
      "question": prompt,
      "stream": True,
      "session_id": session
    }
    yield f": initial heartbeat\n\n"
    data_size = 0
    with requests.post(url, headers=headers, data=json.dumps(payload), stream=True, timeout=(5, 120)) as response:
      response.raise_for_status()
      for line in response.iter_lines():
        current_time = asyncio.get_event_loop().time()
        # Send heartbeat every 5 seconds
        if current_time - last_chunk_time > 5:
            yield f": heartbeat\n\n"
            last_chunk_time = current_time
        if line:
          if not first_chunk_received:
              first_chunk_received = True
          decoded_line = line.decode('utf-8').strip()
          if decoded_line.startswith('data:'):
              data = decoded_line[5:].strip()
              try:
                  chunk = json.loads(data)
                  content = chunk.get('data', {})
                  if content == True:
                    yield f"data: [DONE]\n\n"
                    break
                  answer = content.get('answer', '')
                  size = len(answer)
                  if size != data_size:
                    content['answer'] = answer[data_size:]
                    data_size = size
                    if content:
                        yield f"data: {json.dumps(content)}\n\n"
                        last_chunk_time = current_time
              except Exception:
                  yield f"data: {json.dumps({'error': 'Malformed JSON in chunk'})}\n\n"
                  logger.error(f"Malformed JSON: {data}")
      await asyncio.sleep(0.01)
  except requests.exceptions.Timeout:
      yield f"data: {json.dumps({'error': 'Request timed out while waiting for response'})}\n\n"
      logger.error(f"Stream Timeout error")
  except requests.exceptions.RequestException as e:
      yield f"data: {json.dumps({'error': str(e)})}\n\n"
      logger.error(f"Stream Request error: {str(e)}")

aliasmaya avatar Jun 17 '25 09:06 aliasmaya