react-native-sse icon indicating copy to clipboard operation
react-native-sse copied to clipboard

Consistency?

Open taylorgoolsby opened this issue 11 months ago • 2 comments

Hi,

This is not an issue, but I am wondering about consistency and safety in using this package for LLM inference.

In Node, I have used axios to stream the responses from an LLM. However, in that implementation, which does not use XHR, I noticed that sometimes it is possible to receive data packets which are incomplete.

For example, I have seen these cases happen:

The expected case:

data: {"id":"chatcmpl-94iCaF6K7PsuvzxvyoPS1v859nfJl","object":"chat.completion.chunk","created":1710910252,"model":"gpt-3.5-turbo-0125","system_fingerprint":"fp_4f0b692a78","choices":[{"index":0,"delta":{"content":" life"},"logprobs":null,"finish_reason":null}]}\n\n

Incomplete data:

data: {"id":"chatcmpl-94iCaF6K7Psuvzxv

Missing both new lines:

data: {"id":"chatcmpl-94iCaF6K7PsuvzxvyoPS1v859nfJl","object":"chat.completion.chunk","created":1710910252,"model":"gpt-3.5-turbo-0125","system_fingerprint":"fp_4f0b692a78","choices":[{"index":0,"delta":{"content":" life"},"logprobs":null,"finish_reason":null}]}

Missing one new line:

data: {"id":"chatcmpl-94iCaF6K7PsuvzxvyoPS1v859nfJl","object":"chat.completion.chunk","created":1710910252,"model":"gpt-3.5-turbo-0125","system_fingerprint":"fp_4f0b692a78","choices":[{"index":0,"delta":{"content":" life"},"logprobs":null,"finish_reason":null}]}\n

To handle these cases, my implementation in axios ended up looking like this:

import axios from "axios";

let dataLog = []
let buffer = ''
axios({
  method: 'POST',
  url: `${apiBase}/v1/chat/completions`,
  headers,
  data,
  responseType: 'stream',
  timeout: 10000,
})
  .then((response) => {
    response.data.on('data', (chunk) => {
      const data = chunk.toString()
      
      if (data === undefined) return

      dataLog.push(data)
      
      buffer += data

      const items = buffer.split('\n\n')

      for (let i = 0; i < items.length; i++) {
        let item = items[i]

        // item might end with 0, 1, or 2 new lines.
        // So the next item might start with 2, 1, or 0 new lines.
        // Remove any newlines at the beginning:
        item = item.replace(/^\n+/, '')

        if (item === '') continue

        if (/^data: \[DONE\]/.test(item)) {
          buffer = items.slice(i + 1).join('\n\n')
          return
        }

        let parsedPayload
        try {
          parsedPayload = JSON.parse(item.replace(/^data: /, ''))
        } catch (err) {
          buffer = items.slice(i).join('\n\n')
          return
        }

        try {
          onData(parsedPayload)
        } catch (err) {
          console.error(err)
        }
      }
      // All items in the array have been processed, so clear the buffer.
      // Equivalent to items.slice(items.length).join('\n\n')
      buffer = ''
    })
    response.data.on('end', () => {
      // console.log('closed third party')
      if (buffer) {
        console.debug(dataLog)
        console.debug(buffer)
        console.error(new Error('buffer is not empty'))
      }
    })
  })

I have been using this implementation for a while, and I am fairly certain this is able to handle all possible incomplete data packet cases.

But looking at the implementation of this package, I saw that it does not make use of any buffering in case this._handleEvent(xhr.responseText || ''); has incomplete data.

Is this not a concern with XHR? Maybe axios or node processes incoming data packets differently (faster event loop), causing events with partial data? I am wondering what would happen if maybe when using this package on a mobile device, with slow internet connection, if data packets could be emitted with incomplete payloads.

taylorgoolsby avatar Mar 20 '24 05:03 taylorgoolsby