node-stream-transform icon indicating copy to clipboard operation
node-stream-transform copied to clipboard

Errors not passed to _transform callback.

Open adriantrunzo opened this issue 5 years ago • 1 comments

I don't think errors thrown by the user-provided transforming method are correctly being passed to the callback of _transform. For example if use node-stream-transform as follows:

const csv = require('csv')

...

const transformer = csv.transform(data => {
  data.foo += 1
  throw new Error('Unexpected error')
  return data
})

The stream will not properly close due to the error. The error event is emitted in the __done method here: https://github.com/adaltas/node-stream-transform/blob/master/lib/index.js#L172. But looking at the node documentation, the callback for _transform should also be called if an error occurs during while processing the input: https://nodejs.org/docs/latest-v10.x/api/stream.html#stream_transform_transform_chunk_encoding_callback. I think that means the __done method should also be calling cb(err).

I ran into this issue using node-stream-transform in combination with stream.pipeline. Building on the example above, I had something like this:

const csv = require('csv')
const stream = require('stream')
const util = require('util')
const pipeline = util.promisify(stream.pipeline)

...

function getData (destination) {
  const source = [some stream]
  const transformer = csv.transform(data => {
    data.foo += 1
    throw new Error('Unexpected error')
    return data
  })

  return pipeline(stream, transformer, destination)
}

Where the returned pipeline promise was caught outside of the function. Since the transform stream did not properly close, the pipeline did not close the source and destination streams as expected. When I replace the use of node-stream-transform with a regular stream.Transform, everything works as expected:

const csv = require('csv')
const stream = require('stream')
const util = require('util')
const pipeline = util.promisify(stream.pipeline)

...

function getData (destination) {
  const source = [some stream]

  const transformer = new stream.Transform({
      objectMode: true,

      transform (chunk, encoding, callback) {
        try {
          chunk.foo += 1
          throw new Error('Unexpected error')
          callback(null, chunk)
        } catch (error) {
          callback(error)
        }
      }
    })

  return pipeline(stream, transformer, destination)
}

Thoughts?

adriantrunzo avatar Mar 07 '19 16:03 adriantrunzo