node-pg-query-stream
node-pg-query-stream copied to clipboard
Do not emit 'close' event after finish with query
We found that one of our apps could not use node11 because of breaking change. I created bug in nodejs repo. And node maintainers said it is bug in this module instead. Stream should not emit 'close' before 'end'
I’m willing to send a PR to fix this if its welcomed.
@mcollina just did. Thank you again.
The implementation of the brianc/node-pg-query-stream is broken. For example, the stream doc states:
Errors While Reading
Errors occurring during processing of the readable._read() must be propagated through the readable.destroy(err) method. Throwing an Error from within readable._read() or manually emitting an 'error' event results in undefined behavior.
Here is a correct implementation:
const { Readable } = require('stream')
const Cursor = require('pg-cursor')
class PgQueryStream extends Readable {
constructor(text, values, { rowMode = undefined, types = undefined, batchSize = 100 } = {}) {
// https://nodejs.org/api/stream.html#stream_new_stream_readable_options
super({ objectMode: true, emitClose: true, autoDestroy: true, highWaterMark: batchSize })
this.cursor = new Cursor(text, values, { rowMode, types })
this._reading = false
this._callbacks = []
// delegate Submittable callbacks to cursor
this.handleRowDescription = this.cursor.handleRowDescription.bind(this.cursor)
this.handleDataRow = this.cursor.handleDataRow.bind(this.cursor)
this.handlePortalSuspended = this.cursor.handlePortalSuspended.bind(this.cursor)
this.handleCommandComplete = this.cursor.handleCommandComplete.bind(this.cursor)
this.handleReadyForQuery = this.cursor.handleReadyForQuery.bind(this.cursor)
this.handleError = this.cursor.handleError.bind(this.cursor)
}
submit(connection) {
this.cursor.submit(connection)
}
close(callback) {
if (this.destroyed) {
if (callback) setImmediate(callback)
} else {
if (callback) this.once('close', callback)
this.destroy()
}
}
_close() {
this.cursor.close(() => {
let cb
while ((cb = this._callbacks.pop())) cb()
})
}
_destroy(_err, callback) {
this._callbacks.push(callback)
if (!this._reading) {
this._close()
}
}
// https://nodejs.org/api/stream.html#stream_readable_read_size_1
_read(size) {
// Prevent _destroy() from closing while reading
this._reading = true
this.cursor.read(size, (err, rows, result) => {
this._reading = false
if (this.destroyed) {
// Destroyed while reading?
this._close()
} else if (err) {
// https://nodejs.org/api/stream.html#stream_errors_while_reading
this.destroy(err)
} else {
for (const row of rows) this.push(row)
if (rows.length < size) this.push(null)
}
})
}
}
module.exports = PgQueryStream
@matthieusieben i would suggest to use PR instead of you comment. And it would be good to have new tests.
Also @matthieusieben how it is possible to have multiple destroy callbacks?
I don't think there should be more than one callback. This implementation is safe and works wether there is zero, one or more callbacks added
I didn't create a pr because I didn't want to bother writing tests & co. I just shared my implementation. I leave it to someone else to do it.
If you wish, just create a pr with _callback being a nullable value instead of an array.
I do not want spend time anymore to this project.
пт, 6 дек. 2019 г., 17:50 Matthieu Sieben [email protected]:
If you wish, just create a pr with _callback being a nullable value instead of an array.
— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/brianc/node-pg-query-stream/issues/52?email_source=notifications&email_token=AACRYA6UVZTBFC7R5RJEVY3QXJ7FBA5CNFSM4G3QE47KYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEGEVZ3I#issuecomment-562650349, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACRYA7OLDRWT26M47IMW6DQXJ7FBANCNFSM4G3QE47A .
I'm going to port this repo over to the pg monorepo. After that I'll rewrite this module & it will be tested under all supported versions of node using the proper version of pg
in all cases, and should make fixing this and adding tests much more straight forward...sorry for the hassle!
If you do this, please start from scratch so that we get a proper implementation of the stream interface, without keeping the legacy from this package!
Will try my best! If you're interested in supporting my continued work please consider sponsoring me!