node-fetch
node-fetch copied to clipboard
Errors are not passed through Transform streams correctly
When piping a response.body
into a Transform
stream and using an async iterator to consume the result, the caller often just stops reading rather than throwing an error if the source stream is interrupted prematurely. This makes it very difficult to handle interrupted connections without doing a lot of event juggling and understanding stream internals in depth, and it makes async iterations effectively unusable.
Reproduction
To demonstrate this happening, we start a server whose job is just to produce a long streaming response:
// server.js
'use strict'
const http = require('http')
const url = require('url')
function sleep (ms) {
return new Promise((resolve) => setTimeout(resolve, ms))
}
let server = http.createServer(async (req, res) => {
let { query } = url.parse(req.url, true)
let n = parseInt(query.n, 10)
res.write('[\n')
for (let i = 0; i < n; i++) {
res.write(JSON.stringify({ id: i + 1 }))
if (i < n - 1) res.write(',')
res.write('\n')
await sleep(10)
}
res.end(']\n')
})
let port = process.argv[2]
server.listen(port)
Run this using node server.js 8000
. Check it produces the expected output:
$ curl -si 'http://127.0.0.1:8000/?n=5'
HTTP/1.1 200 OK
Date: Fri, 24 Feb 2023 16:09:17 GMT
Connection: keep-alive
Keep-Alive: timeout=5
Transfer-Encoding: chunked
[
{"id":1},
{"id":2},
{"id":3},
{"id":4},
{"id":5}
]
Next, run the following code that makes a request to this server and pipes the body through an identity transform:
// stream-bug.js
'use strict'
const { Transform } = require('stream')
const fetch = require('node-fetch')
function noopTransform () {
return new Transform({
transform (chunk, encoding, callback) {
callback(null, chunk)
}
})
}
async function main () {
let { body: stream } = await fetch('http://0.0.0.0:8000/?n=1000')
for (let ev of ['close', 'end', 'error']) {
stream.on(ev, (...args) => console.log({ ev }, args))
}
stream = stream.pipe(noopTransform())
try {
console.log('stream start...')
for await (let item of stream) {
// console.log({ item })
}
} catch (error) {
console.log('stream catch', error)
} finally {
console.log('stream finally')
}
console.log('done.')
}
main().then(
(result) => console.log('main success'),
(error) => console.log('main error', error)
)
This should run for a while consuming the response. While it's running, kill the server. We see the following output:
$ node stream-bug.js
stream start...
{ ev: 'error' } [
Error: Premature close
at IncomingMessage.<anonymous> (node_modules/node-fetch/lib/index.js:1749:18)
at Object.onceWrapper (node:events:627:28)
at IncomingMessage.emit (node:events:513:28)
at emitCloseNT (node:internal/streams/destroy:132:10)
at process.processTicksAndRejections (node:internal/process/task_queues:81:21) {
code: 'ERR_STREAM_PREMATURE_CLOSE'
}
]
{ ev: 'close' } []
That is, the event listeners detect the body
emitting error
and then close
. However, the other log lines in the script's try
/catch
/finally
block do not run -- the function just aborts as the async iterator is not resumed after the error event.
Expected behavior
We have tried the same scenario but using a synthetic readable stream that emits an error as the source, instead of a response body. Here is the implementation for that:
const { Readable } = require('stream')
function source () {
let running = true
async function * gen () {
let id = 1
while (running) {
let msg = JSON.stringify({ id })
yield msg + '\n'
id += 1
await sleep(100)
}
}
let stream = Readable.from(gen())
stream.on('error', () => {})
setTimeout(() => {
running = false
stream.emit('error', new Error('connection closed'))
stream.emit('close')
}, 1000)
return stream
}
function sleep (ms) {
return new Promise((resolve) => setTimeout(resolve, ms))
}
Then we use this source in place of the fetch()
response in our test script:
async function main () {
let stream = source()
for (let ev of ['close', 'end', 'error']) {
stream.on(ev, (...args) => console.log({ ev }, args))
}
stream = stream.pipe(noopTransform())
try {
console.log('stream start...')
for await (let item of stream) {
// console.log({ item })
}
} catch (error) {
console.log('stream catch', error)
} finally {
console.log('stream finally')
}
console.log('done.')
}
main().then(
(result) => console.log('main success'),
(error) => console.log('main error', error)
)
When we run this we get the following output:
$ node stream-bug.js
stream start...
{ ev: 'error' } [
Error: connection closed
at Timeout._onTimeout (stream-bug.js:25:26)
at listOnTimeout (node:internal/timers:569:17)
at process.processTimers (node:internal/timers:512:7)
]
{ ev: 'close' } []
{ ev: 'end' } []
{ ev: 'close' } []
stream finally
done.
main success
So the events are logged, and also the finally
block and the rest of the function up to done
executes correctly. We're not sure why the catch
block does not execute in this case, but ideally it should execute if the response body emits an error. However it would be sufficient for our purposes to at least make the function resume after the for
loop correctly, as we can detect that the stream was not completed from the information received before it ended.
Your Environment
software | version |
---|---|
node-fetch | 2.6.9 |
node | 14.12.2, 16.19.1, 18.14.2 |
npm | 9.5.0 |
Operating System | macOS Ventura |
Additional context
We notice that handling errors for async iterators on the body
itself was only very recently implemented in version 2.6.8, based on our own testing. We are unable to move to v3 at this point as it would require migration to ES modules.
We originally noticed this behaviour when piping the body into the split2
module. We have written the reproduction using a basic Transform
instance as that's what split2
is, and we wanted to rule out additional third party modules from the causes of the problem to the extent possible.
I think this is the same root cause as #1231, which I just ran into.
It seems that when the response stream has an error, neither the end
or error
event is emitted to the body stream.
Any progress on this issue?
According to the node docs...
In the case of an error, 'end' should not be emitted.
It seems to me (I could very well be wrong) like streams further down the pipeline are waiting for an "end" event before they throw an error. But the node docs specify not to.
I am having this error in reverse. I am giving node-fetch a readable stream as a post body. Before giving it to node-fetch, I pass it through a transform stream. When the transform stream emits an error, the node-fetch request should throw an error, but it does not. It just hangs.