[Question]: The stream interface of converse with chat assistant returned the full data, how to achieve incremental SSE?
Describe your problem
I need to get the answer in the streaming data and render it on the page, because the answer returned every time is full, so I need to replace the rendered answer content to achieve incremental update.
The problem is that the first half of the returned content is not exactly the same, so I can't replace it. Can you return only incremental data each time, just like the ChatGPT interface? It does not return what has already been returned.
It needs a lot of alteration both in backend and front-end code.
It needs a lot of alteration both in backend and front-end code.
How does your chat ui work, by overwriting the old content with new content each time?
It needs a lot of alteration both in backend and front-end code.
How does your chat ui work, by overwriting the old content with new content each time?
It needs a lot of alteration both in backend and front-end code.
How does your chat ui work, by overwriting the old content with new content each time?
Thank you very much! I'll do some research. 😁
Here some snippet doing it the angular way:
const request: CompletionRequest = {
question: <your-question>,
stream: true,
session_id: sessionId
};
const url = `https://<ragflow-host>/api/v1/agents/<your-agent-id>/completions`;
return new Observable<CompletionResponse>((s) => {
// Get completion event stream from agent
this.http.request(
'post',
url,
{
body: request,
observe: 'events',
reportProgress: true,
responseType: 'text'
}).subscribe((e: HttpEvent<string>) => {
if (e.type === HttpEventType.DownloadProgress) {
const text = (e as HttpDownloadProgressEvent).partialText!;
// Quick and dirty observation of the stream
// ...needs improvement!!
const data = text.split('data:');
if (data.length > 2) {
const part = JSON.parse(data[data.length - 2]) as CompletionResponse;
s.next(part);
}
} else if (e.type === HttpEventType.Response) {
s.complete();
}
});
});
It needs a lot of alteration both in backend and front-end code.
In our scenario, streaming output generates very long texts. Returning the full content at once results in an enormous amount of data being transferred through the interface. We’re very much looking forward to improvements in this area.
I have the same problem, and here's my solution
async def consult(session, prompt):
last_chunk_time = asyncio.get_event_loop().time()
first_chunk_received = False
try:
url = f"<YOUR_HOST>/api/v1/chats/{config.chat_id}/completions"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {config.api_key}"
}
payload = {
"question": prompt,
"stream": True,
"session_id": session
}
yield f": initial heartbeat\n\n"
data_size = 0
with requests.post(url, headers=headers, data=json.dumps(payload), stream=True, timeout=(5, 120)) as response:
response.raise_for_status()
for line in response.iter_lines():
current_time = asyncio.get_event_loop().time()
# Send heartbeat every 5 seconds
if current_time - last_chunk_time > 5:
yield f": heartbeat\n\n"
last_chunk_time = current_time
if line:
if not first_chunk_received:
first_chunk_received = True
decoded_line = line.decode('utf-8').strip()
if decoded_line.startswith('data:'):
data = decoded_line[5:].strip()
try:
chunk = json.loads(data)
content = chunk.get('data', {})
if content == True:
yield f"data: [DONE]\n\n"
break
answer = content.get('answer', '')
size = len(answer)
if size != data_size:
content['answer'] = answer[data_size:]
data_size = size
if content:
yield f"data: {json.dumps(content)}\n\n"
last_chunk_time = current_time
except Exception:
yield f"data: {json.dumps({'error': 'Malformed JSON in chunk'})}\n\n"
logger.error(f"Malformed JSON: {data}")
await asyncio.sleep(0.01)
except requests.exceptions.Timeout:
yield f"data: {json.dumps({'error': 'Request timed out while waiting for response'})}\n\n"
logger.error(f"Stream Timeout error")
except requests.exceptions.RequestException as e:
yield f"data: {json.dumps({'error': str(e)})}\n\n"
logger.error(f"Stream Request error: {str(e)}")
