node-pg-query-stream icon indicating copy to clipboard operation
node-pg-query-stream copied to clipboard

Do not emit 'close' event after finish with query

Open btd opened this issue 5 years ago • 12 comments

We found that one of our apps could not use node11 because of breaking change. I created bug in nodejs repo. And node maintainers said it is bug in this module instead. Stream should not emit 'close' before 'end'

btd avatar Mar 04 '19 10:03 btd

I’m willing to send a PR to fix this if its welcomed.

mcollina avatar Mar 04 '19 10:03 mcollina

@mcollina just did. Thank you again.

btd avatar Mar 04 '19 10:03 btd

The implementation of the brianc/node-pg-query-stream is broken. For example, the stream doc states:

Errors While Reading

Errors occurring during processing of the readable._read() must be propagated through the readable.destroy(err) method. Throwing an Error from within readable._read() or manually emitting an 'error' event results in undefined behavior.

Here is a correct implementation:

const { Readable } = require('stream')
const Cursor = require('pg-cursor')

class PgQueryStream extends Readable {
  constructor(text, values, { rowMode = undefined, types = undefined, batchSize = 100 } = {}) {
    // https://nodejs.org/api/stream.html#stream_new_stream_readable_options
    super({ objectMode: true, emitClose: true, autoDestroy: true, highWaterMark: batchSize })
    this.cursor = new Cursor(text, values, { rowMode, types })

    this._reading = false
    this._callbacks = []

    // delegate Submittable callbacks to cursor
    this.handleRowDescription = this.cursor.handleRowDescription.bind(this.cursor)
    this.handleDataRow = this.cursor.handleDataRow.bind(this.cursor)
    this.handlePortalSuspended = this.cursor.handlePortalSuspended.bind(this.cursor)
    this.handleCommandComplete = this.cursor.handleCommandComplete.bind(this.cursor)
    this.handleReadyForQuery = this.cursor.handleReadyForQuery.bind(this.cursor)
    this.handleError = this.cursor.handleError.bind(this.cursor)
  }

  submit(connection) {
    this.cursor.submit(connection)
  }

  close(callback) {
    if (this.destroyed) {
      if (callback) setImmediate(callback)
    } else {
      if (callback) this.once('close', callback)
      this.destroy()
    }
  }

  _close() {
    this.cursor.close(() => {
      let cb
      while ((cb = this._callbacks.pop())) cb()
    })
  }

  _destroy(_err, callback) {
    this._callbacks.push(callback)
    if (!this._reading) {
      this._close()
    }
  }

  // https://nodejs.org/api/stream.html#stream_readable_read_size_1
  _read(size) {
    // Prevent _destroy() from closing while reading
    this._reading = true

    this.cursor.read(size, (err, rows, result) => {
      this._reading = false

      if (this.destroyed) {
        // Destroyed while reading?
        this._close()
      } else if (err) {
        // https://nodejs.org/api/stream.html#stream_errors_while_reading
        this.destroy(err)
      } else {
        for (const row of rows) this.push(row)
        if (rows.length < size) this.push(null)
      }
    })
  }
}

module.exports = PgQueryStream

matthieusieben avatar Dec 04 '19 13:12 matthieusieben

@matthieusieben i would suggest to use PR instead of you comment. And it would be good to have new tests.

btd avatar Dec 06 '19 15:12 btd

Also @matthieusieben how it is possible to have multiple destroy callbacks?

btd avatar Dec 06 '19 15:12 btd

I don't think there should be more than one callback. This implementation is safe and works wether there is zero, one or more callbacks added

matthieusieben avatar Dec 06 '19 16:12 matthieusieben

I didn't create a pr because I didn't want to bother writing tests & co. I just shared my implementation. I leave it to someone else to do it.

matthieusieben avatar Dec 06 '19 16:12 matthieusieben

If you wish, just create a pr with _callback being a nullable value instead of an array.

matthieusieben avatar Dec 06 '19 16:12 matthieusieben

I do not want spend time anymore to this project.

пт, 6 дек. 2019 г., 17:50 Matthieu Sieben [email protected]:

If you wish, just create a pr with _callback being a nullable value instead of an array.

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/brianc/node-pg-query-stream/issues/52?email_source=notifications&email_token=AACRYA6UVZTBFC7R5RJEVY3QXJ7FBA5CNFSM4G3QE47KYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEGEVZ3I#issuecomment-562650349, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACRYA7OLDRWT26M47IMW6DQXJ7FBANCNFSM4G3QE47A .

btd avatar Dec 06 '19 17:12 btd

I'm going to port this repo over to the pg monorepo. After that I'll rewrite this module & it will be tested under all supported versions of node using the proper version of pg in all cases, and should make fixing this and adding tests much more straight forward...sorry for the hassle!

brianc avatar Dec 20 '19 17:12 brianc

If you do this, please start from scratch so that we get a proper implementation of the stream interface, without keeping the legacy from this package!

matthieusieben avatar Dec 20 '19 18:12 matthieusieben

Will try my best! If you're interested in supporting my continued work please consider sponsoring me!

brianc avatar Dec 20 '19 19:12 brianc