node-fetch icon indicating copy to clipboard operation
node-fetch copied to clipboard

Errors are not passed through Transform streams correctly

Open jcoglan opened this issue 1 year ago • 4 comments

When piping a response.body into a Transform stream and using an async iterator to consume the result, the caller often just stops reading rather than throwing an error if the source stream is interrupted prematurely. This makes it very difficult to handle interrupted connections without doing a lot of event juggling and understanding stream internals in depth, and it makes async iterations effectively unusable.

Reproduction

To demonstrate this happening, we start a server whose job is just to produce a long streaming response:

// server.js

'use strict'

const http = require('http')
const url = require('url')

function sleep (ms) {
  return new Promise((resolve) => setTimeout(resolve, ms))
}

let server = http.createServer(async (req, res) => {
  let { query } = url.parse(req.url, true)
  let n = parseInt(query.n, 10)

  res.write('[\n')

  for (let i = 0; i < n; i++) {
    res.write(JSON.stringify({ id: i + 1 }))
    if (i < n - 1) res.write(',')
    res.write('\n')
    await sleep(10)
  }

  res.end(']\n')
})

let port = process.argv[2]
server.listen(port)

Run this using node server.js 8000. Check it produces the expected output:

$ curl -si 'http://127.0.0.1:8000/?n=5'
HTTP/1.1 200 OK
Date: Fri, 24 Feb 2023 16:09:17 GMT
Connection: keep-alive
Keep-Alive: timeout=5
Transfer-Encoding: chunked

[
{"id":1},
{"id":2},
{"id":3},
{"id":4},
{"id":5}
]

Next, run the following code that makes a request to this server and pipes the body through an identity transform:

// stream-bug.js

'use strict'

const { Transform } = require('stream')
const fetch = require('node-fetch')

function noopTransform () {
  return new Transform({
    transform (chunk, encoding, callback) {
      callback(null, chunk)
    }
  })
}

async function main () {
  let { body: stream } = await fetch('http://0.0.0.0:8000/?n=1000')

  for (let ev of ['close', 'end', 'error']) {
    stream.on(ev, (...args) => console.log({ ev }, args))
  }
  stream = stream.pipe(noopTransform())

  try {
    console.log('stream start...')

    for await (let item of stream) {
      // console.log({ item })
    }
  } catch (error) {
    console.log('stream catch', error)
  } finally {
    console.log('stream finally')
  }

  console.log('done.')
}

main().then(
  (result) => console.log('main success'),
  (error) => console.log('main error', error)
)

This should run for a while consuming the response. While it's running, kill the server. We see the following output:

$ node stream-bug.js
stream start...
{ ev: 'error' } [
  Error: Premature close
      at IncomingMessage.<anonymous> (node_modules/node-fetch/lib/index.js:1749:18)
      at Object.onceWrapper (node:events:627:28)
      at IncomingMessage.emit (node:events:513:28)
      at emitCloseNT (node:internal/streams/destroy:132:10)
      at process.processTicksAndRejections (node:internal/process/task_queues:81:21) {
    code: 'ERR_STREAM_PREMATURE_CLOSE'
  }
]
{ ev: 'close' } []

That is, the event listeners detect the body emitting error and then close. However, the other log lines in the script's try/catch/finally block do not run -- the function just aborts as the async iterator is not resumed after the error event.

Expected behavior

We have tried the same scenario but using a synthetic readable stream that emits an error as the source, instead of a response body. Here is the implementation for that:

const { Readable } = require('stream')

function source () {
  let running = true

  async function * gen () {
    let id = 1

    while (running) {
      let msg = JSON.stringify({ id })
      yield msg + '\n'
      id += 1
      await sleep(100)
    }
  }

  let stream = Readable.from(gen())
  stream.on('error', () => {})

  setTimeout(() => {
    running = false
    stream.emit('error', new Error('connection closed'))
    stream.emit('close')
  }, 1000)

  return stream
}

function sleep (ms) {
  return new Promise((resolve) => setTimeout(resolve, ms))
}

Then we use this source in place of the fetch() response in our test script:

async function main () {
  let stream = source()

  for (let ev of ['close', 'end', 'error']) {
    stream.on(ev, (...args) => console.log({ ev }, args))
  }
  stream = stream.pipe(noopTransform())

  try {
    console.log('stream start...')

    for await (let item of stream) {
      // console.log({ item })
    }
  } catch (error) {
    console.log('stream catch', error)
  } finally {
    console.log('stream finally')
  }

  console.log('done.')
}

main().then(
  (result) => console.log('main success'),
  (error) => console.log('main error', error)
)

When we run this we get the following output:

$ node stream-bug.js
stream start...
{ ev: 'error' } [
  Error: connection closed
      at Timeout._onTimeout (stream-bug.js:25:26)
      at listOnTimeout (node:internal/timers:569:17)
      at process.processTimers (node:internal/timers:512:7)
]
{ ev: 'close' } []
{ ev: 'end' } []
{ ev: 'close' } []
stream finally
done.
main success

So the events are logged, and also the finally block and the rest of the function up to done executes correctly. We're not sure why the catch block does not execute in this case, but ideally it should execute if the response body emits an error. However it would be sufficient for our purposes to at least make the function resume after the for loop correctly, as we can detect that the stream was not completed from the information received before it ended.

Your Environment

software version
node-fetch 2.6.9
node 14.12.2, 16.19.1, 18.14.2
npm 9.5.0
Operating System macOS Ventura

Additional context

We notice that handling errors for async iterators on the body itself was only very recently implemented in version 2.6.8, based on our own testing. We are unable to move to v3 at this point as it would require migration to ES modules.

We originally noticed this behaviour when piping the body into the split2 module. We have written the reproduction using a basic Transform instance as that's what split2 is, and we wanted to rule out additional third party modules from the causes of the problem to the extent possible.

jcoglan avatar Feb 24 '23 16:02 jcoglan

I think this is the same root cause as #1231, which I just ran into.

It seems that when the response stream has an error, neither the end or error event is emitted to the body stream.

jportner avatar Mar 04 '23 18:03 jportner

Any progress on this issue?

rbnayax avatar Apr 03 '23 08:04 rbnayax

According to the node docs...

In the case of an error, 'end' should not be emitted.

It seems to me (I could very well be wrong) like streams further down the pipeline are waiting for an "end" event before they throw an error. But the node docs specify not to.

jameskerr avatar Sep 26 '23 20:09 jameskerr

I am having this error in reverse. I am giving node-fetch a readable stream as a post body. Before giving it to node-fetch, I pass it through a transform stream. When the transform stream emits an error, the node-fetch request should throw an error, but it does not. It just hangs.

jameskerr avatar Sep 26 '23 20:09 jameskerr