node-mysql2
node-mysql2 copied to clipboard
MySQL Incompatibility. Pipeline operaitons on a MYSQL2 Query Stream hang when the connection is lost.
Please note that this is a case where the behavior of MySQL2 diverges significantly from MySQL.
Testcase for MySQL
import {pipeline} from 'stream/promises'
import {Transform} from 'stream'
import mysql from 'mysql'
import fs from 'fs'
class MyTransform extends Transform {
constructor() {
super({objectMode: true })
this.counter = 0
}
_transform(data,enc,callback) {
this.push(JSON.stringify(data))
callback()
}
}
class Test {
vendorProperties = {
"multipleStatements":true
,"typeCast":true
,"supportBigNumbers":true
,"bigNumberStrings":true
,"dateStrings":true
,"trace":true
,"user":"root"
,"password": "oracle"
,"host":"yadamu-db2"
,"database":"mysql"
,"port":33061
, infileStreamFactory : (path) => {return fs.createReadStream(path)}
}
async createConnectionPool() {
let stack, operation
try {
stack = new Error().stack;
operation = 'mysql.createPool()'
this.pool = mysql.createPool(this.vendorProperties)
console.log('Pool Created')
} catch (e) {
throw e
}
}
async getConnectionFromPool() {
const connection = await new Promise((resolve,reject) => {
this.pool.getConnection((err,connection) => {
if (err) {
reject(this.getDatabaseException(this.DRIVER_ID,err,stack,'mysql.Pool.getConnection()'))
}
resolve(connection)
})
})
return connection
}
async closeConnection(options) {
if ((this.connection !== undefined) && (typeof this.connection.release === 'function')) {
let stack;
try {
stack = new Error().stack
await this.connection.release()
this.connection = undefined;
} catch (e) {
this.connection = undefined;
throw e
}
}
};
async closePool(options) {
if ((this.pool !== undefined) && (typeof this.pool.end === 'function')) {
let stack;
try {
stack = new Error().stack
await this.pool.end()
this.pool = undefined;
} catch (e) {
this.pool = undefined;
throw e
}
}
}
executeSQL(sqlStatement,args) {
return new Promise((resolve,reject) => {
const stack = new Error().stack;
const sqlStartTime = performance.now()
this.connection.query(sqlStatement,args,async (err,results,fields) => {
console.log(results)
resolve(results)
})
})
}
async getConnectionID() {
const results = await this.executeSQL(`select connection_id() "pid"`)
const pid = results[0].pid
return pid
}
async test() {
let results
try {
await this.createConnectionPool()
this.connection = await this.getConnectionFromPool()
const pid = await this.getConnectionID()
const connection2 = await this.getConnectionFromPool()
results = await this.executeSQL(`SET AUTOCOMMIT = 0, TIME_ZONE = '+00:00',SESSION INTERACTIVE_TIMEOUT = 600000, WAIT_TIMEOUT = 600000, SQL_MODE='ANSI_QUOTES,PAD_CHAR_TO_FULL_LENGTH', GROUP_CONCAT_MAX_LEN = 1024000, GLOBAL LOCAL_INFILE = 'ON'`);
results = await this.executeSQL(`select count(*) from "WWI_Warehouse"."ColdRoomTemperatures_Archive"`);
console.log(results)
const is = this.connection.query({sql: `select * from "WWI_Warehouse"."ColdRoomTemperatures_Archive"`, rowsAsArray: true}).stream()
// const is = fs.createReadStream('input.txt')
is.on('error',(err) => {
console.log(is.constructor.name,err)
err.pipelineComponents = [...err.pipelineComponents || [],is.constructor.name]
}).on
const t = new MyTransform()
t.on('error',(err) => {
console.log(t.constructor.name,err)
err.pipelineComponents = [...err.pipelineComponents || [],t.constructor.name]
})
const os = fs.createWriteStream('output.txt')
os.on('error',(err) => {
console.log(os.constructor.name,err)
err.pipelineComponents = [...err.pipelineComponents || [],os.constructor.name]
})
const streams = [is,t,os]
console.log(streams.map((s) => { return s.constructor.name }).join(' => '))
try {
console.log('Start Pipeline')
setTimeout(async () => {
console.log('Kill',pid)
const operation = `kill ${pid}`
const res = await connection2.query(operation);
connection2.release()
},5000)
await pipeline(...streams);
console.log('End Pipeline')
} catch (err) {
console.error('Pipeline error:', err);
}
console.log('Done')
await this.closeConnection();
await this.closePool();
} catch (e) {
console.log(e)
await this.closeConnection();
await this.closePool();
console.log(e)
}
}
}
process.on('unhandledRejection', (e,p) => {
console.log("Unhandled",e,p)
})
const test = new Test();
test.test().then(() => {
console.log('Success')
}).catch((e) => {
console.log(e)
})
When run generates the following expected output.
C:\Development\YADAMU\src\scratch\mysql>node lostConnectionMySQL.js
Pool Created
[ RowDataPacket { pid: '35' } ]
OkPacket {
fieldCount: 0,
affectedRows: 0,
insertId: 0,
serverStatus: 0,
warningCount: 1,
message: '',
protocol41: true,
changedRows: 0
}
[ RowDataPacket { 'count(*)': '3654736' } ]
[ RowDataPacket { 'count(*)': '3654736' } ]
Readable => MyTransform => WriteStream
Start Pipeline
Kill 35
Readable Error: Connection lost: The server closed the connection.
at Protocol.end (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:112:13)
at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:94:28)
at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:526:10)
at Socket.emit (node:events:532:35)
at endReadableNT (node:internal/streams/readable:1696:12)
at process.processTicksAndRejections (node:internal/process/task_queues:82:21)
--------------------
at Protocol._enqueue (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:144:48)
at PoolConnection.query (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:198:25)
at Test.test (file:///C:/Development/YADAMU/src/scratch/mysql/lostConnectionMySQL.js:133:36)
at process.processTicksAndRejections (node:internal/process/task_queues:95:5) {
fatal: true,
code: 'PROTOCOL_CONNECTION_LOST'
}
Readable Error: Connection lost: The server closed the connection.
at Protocol.end (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:112:13)
at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:94:28)
at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:526:10)
at Socket.emit (node:events:532:35)
at endReadableNT (node:internal/streams/readable:1696:12)
at process.processTicksAndRejections (node:internal/process/task_queues:82:21)
--------------------
at Protocol._enqueue (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:144:48)
at PoolConnection.query (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:198:25)
at Test.test (file:///C:/Development/YADAMU/src/scratch/mysql/lostConnectionMySQL.js:133:36)
at process.processTicksAndRejections (node:internal/process/task_queues:95:5) {
fatal: true,
code: 'PROTOCOL_CONNECTION_LOST',
pipelineComponents: [ 'Readable' ]
}
MyTransform Error: Connection lost: The server closed the connection.
at Protocol.end (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:112:13)
at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:94:28)
at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:526:10)
at Socket.emit (node:events:532:35)
at endReadableNT (node:internal/streams/readable:1696:12)
at process.processTicksAndRejections (node:internal/process/task_queues:82:21)
--------------------
at Protocol._enqueue (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:144:48)
at PoolConnection.query (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:198:25)
at Test.test (file:///C:/Development/YADAMU/src/scratch/mysql/lostConnectionMySQL.js:133:36)
at process.processTicksAndRejections (node:internal/process/task_queues:95:5) {
fatal: true,
code: 'PROTOCOL_CONNECTION_LOST',
pipelineComponents: [ 'Readable', 'Readable' ]
}
Pipeline error: Error: Connection lost: The server closed the connection.
at Protocol.end (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:112:13)
at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:94:28)
at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:526:10)
at Socket.emit (node:events:532:35)
at endReadableNT (node:internal/streams/readable:1696:12)
at process.processTicksAndRejections (node:internal/process/task_queues:82:21)
--------------------
at Protocol._enqueue (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:144:48)
at PoolConnection.query (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:198:25)
at Test.test (file:///C:/Development/YADAMU/src/scratch/mysql/lostConnectionMySQL.js:133:36)
at process.processTicksAndRejections (node:internal/process/task_queues:95:5) {
fatal: true,
code: 'PROTOCOL_CONNECTION_LOST',
pipelineComponents: [ 'Readable', 'Readable', 'MyTransform' ]
}
Done
Success
WriteStream Error: Connection lost: The server closed the connection.
at Protocol.end (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:112:13)
at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:94:28)
at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:526:10)
at Socket.emit (node:events:532:35)
at endReadableNT (node:internal/streams/readable:1696:12)
at process.processTicksAndRejections (node:internal/process/task_queues:82:21)
--------------------
at Protocol._enqueue (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:144:48)
at PoolConnection.query (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:198:25)
at Test.test (file:///C:/Development/YADAMU/src/scratch/mysql/lostConnectionMySQL.js:133:36)
at process.processTicksAndRejections (node:internal/process/task_queues:95:5) {
fatal: true,
code: 'PROTOCOL_CONNECTION_LOST',
pipelineComponents: [ 'Readable', 'Readable', 'MyTransform' ]
}
C:\Development\YADAMU\src\scratch\mysql>
C:\Development\YADAMU\src\scratch\mysql>
C:\Development\YADAMU\src\scratch\mysql>node lostConnectionMySQL.js
Pool Created
[ RowDataPacket { pid: '37' } ]
OkPacket {
fieldCount: 0,
affectedRows: 0,
insertId: 0,
serverStatus: 0,
warningCount: 1,
message: '',
protocol41: true,
changedRows: 0
}
[ RowDataPacket { 'count(*)': '3654736' } ]
[ RowDataPacket { 'count(*)': '3654736' } ]
Readable => MyTransform => WriteStream
Start Pipeline
Kill 37
Readable Error: Connection lost: The server closed the connection.
at Protocol.end (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:112:13)
at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:94:28)
at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:526:10)
at Socket.emit (node:events:532:35)
at endReadableNT (node:internal/streams/readable:1696:12)
at process.processTicksAndRejections (node:internal/process/task_queues:82:21)
--------------------
at Protocol._enqueue (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:144:48)
at PoolConnection.query (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:198:25)
at Test.test (file:///C:/Development/YADAMU/src/scratch/mysql/lostConnectionMySQL.js:133:36)
at process.processTicksAndRejections (node:internal/process/task_queues:95:5) {
fatal: true,
code: 'PROTOCOL_CONNECTION_LOST'
}
Readable Error: Connection lost: The server closed the connection.
at Protocol.end (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:112:13)
at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:94:28)
at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:526:10)
at Socket.emit (node:events:532:35)
at endReadableNT (node:internal/streams/readable:1696:12)
at process.processTicksAndRejections (node:internal/process/task_queues:82:21)
--------------------
at Protocol._enqueue (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:144:48)
at PoolConnection.query (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:198:25)
at Test.test (file:///C:/Development/YADAMU/src/scratch/mysql/lostConnectionMySQL.js:133:36)
at process.processTicksAndRejections (node:internal/process/task_queues:95:5) {
fatal: true,
code: 'PROTOCOL_CONNECTION_LOST',
pipelineComponents: [ 'Readable' ]
}
MyTransform Error: Connection lost: The server closed the connection.
at Protocol.end (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:112:13)
at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:94:28)
at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:526:10)
at Socket.emit (node:events:532:35)
at endReadableNT (node:internal/streams/readable:1696:12)
at process.processTicksAndRejections (node:internal/process/task_queues:82:21)
--------------------
at Protocol._enqueue (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:144:48)
at PoolConnection.query (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:198:25)
at Test.test (file:///C:/Development/YADAMU/src/scratch/mysql/lostConnectionMySQL.js:133:36)
at process.processTicksAndRejections (node:internal/process/task_queues:95:5) {
fatal: true,
code: 'PROTOCOL_CONNECTION_LOST',
pipelineComponents: [ 'Readable', 'Readable' ]
}
Pipeline error: Error: Connection lost: The server closed the connection.
at Protocol.end (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:112:13)
at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:94:28)
at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:526:10)
at Socket.emit (node:events:532:35)
at endReadableNT (node:internal/streams/readable:1696:12)
at process.processTicksAndRejections (node:internal/process/task_queues:82:21)
--------------------
at Protocol._enqueue (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:144:48)
at PoolConnection.query (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:198:25)
at Test.test (file:///C:/Development/YADAMU/src/scratch/mysql/lostConnectionMySQL.js:133:36)
at process.processTicksAndRejections (node:internal/process/task_queues:95:5) {
fatal: true,
code: 'PROTOCOL_CONNECTION_LOST',
pipelineComponents: [ 'Readable', 'Readable', 'MyTransform' ]
}
Done
Success
WriteStream Error: Connection lost: The server closed the connection.
at Protocol.end (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:112:13)
at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:94:28)
at Socket.<anonymous> (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:526:10)
at Socket.emit (node:events:532:35)
at endReadableNT (node:internal/streams/readable:1696:12)
at process.processTicksAndRejections (node:internal/process/task_queues:82:21)
--------------------
at Protocol._enqueue (C:\Development\YADAMU\src\node_modules\mysql\lib\protocol\Protocol.js:144:48)
at PoolConnection.query (C:\Development\YADAMU\src\node_modules\mysql\lib\Connection.js:198:25)
at Test.test (file:///C:/Development/YADAMU/src/scratch/mysql/lostConnectionMySQL.js:133:36)
at process.processTicksAndRejections (node:internal/process/task_queues:95:5) {
fatal: true,
code: 'PROTOCOL_CONNECTION_LOST',
pipelineComponents: [ 'Readable', 'Readable', 'MyTransform' ]
}
When connection 1 is killed the pipeline terminates with a "The Server Closed the connection'. This allows the code to handle the lost connection in a real world enviroment.
When the MySQL2 variant of the code is run
import {pipeline} from 'stream/promises'
import {Transform} from 'stream'
import mysql from 'mysql2/promise'
import fs from 'fs'
class MyTransform extends Transform {
constructor() {
super({objectMode: true })
this.counter = 0
}
_transform(data,enc,callback) {
this.push(JSON.stringify(data))
callback()
}
}
class Test {
vendorProperties = {
"multipleStatements":true
,"typeCast":true
,"supportBigNumbers":true
,"bigNumberStrings":true
,"dateStrings":true
,"trace":true
,"user":"root"
,"password": "oracle"
,"host":"yadamu-db2"
,"database":"mysql"
,"port":33061
, infileStreamFactory : (path) => {return fs.createReadStream(path)}
}
async createConnectionPool() {
let stack, operation
try {
stack = new Error().stack;
operation = 'mysql.createPool()'
this.pool = mysql.createPool(this.vendorProperties)
console.log('Pool Created')
} catch (e) {
throw e
}
}
async getConnectionFromPool() {
let stack
try {
stack = new Error().stack;
const connection = await this.pool.getConnection()
console.log('Connection obtained')
return connection
} catch (err) {
throw err
}
}
async closeConnection(options) {
if ((this.connection !== undefined) && (typeof this.connection.release === 'function')) {
let stack;
try {
stack = new Error().stack
await this.connection.release()
this.connection = undefined;
} catch (e) {
this.connection = undefined;
throw e
}
}
};
async closePool(options) {
if ((this.pool !== undefined) && (typeof this.pool.end === 'function')) {
let stack;
try {
stack = new Error().stack
await this.pool.end()
this.pool = undefined;
} catch (e) {
this.pool = undefined;
throw e
}
}
}
async executeSQL(sqlStatement,args) {
let stack
let results
try {
stack = new Error().stack;
const [results,fields] = await this.connection.query(sqlStatement,args)
return results;
} catch (e) {
throw e
}
}
async getConnectionID() {
const results = await this.executeSQL(`select connection_id() "pid"`)
const pid = results[0].pid
return pid
}
async test() {
let results
try {
await this.createConnectionPool()
this.connection = await this.getConnectionFromPool()
const pid = await this.getConnectionID()
const connection2 = await this.getConnectionFromPool()
results = await this.executeSQL(`SET AUTOCOMMIT = 0, TIME_ZONE = '+00:00',SESSION INTERACTIVE_TIMEOUT = 600000, WAIT_TIMEOUT = 600000, SQL_MODE='ANSI_QUOTES,PAD_CHAR_TO_FULL_LENGTH', GROUP_CONCAT_MAX_LEN = 1024000, GLOBAL LOCAL_INFILE = 'ON'`);
results = await this.executeSQL(`select count(*) from "WWI_Warehouse"."ColdRoomTemperatures_Archive"`);
console.log(results)
const is = this.connection.connection.query({sql: `select * from "WWI_Warehouse"."ColdRoomTemperatures_Archive"`, rowsAsArray: true}).stream()
// const is = fs.createReadStream('input.txt')
is.on('error',(err) => {
console.log(is.constructor.name,err)
err.pipelineComponents = [...err.pipelineComponents || [],is.constructor.name]
}).on
const t = new MyTransform()
t.on('error',(err) => {
console.log(t.constructor.name,err)
err.pipelineComponents = [...err.pipelineComponents || [],t.constructor.name]
})
const os = fs.createWriteStream('output.txt')
os.on('error',(err) => {
console.log(os.constructor.name,err)
err.pipelineComponents = [...err.pipelineComponents || [],os.constructor.name]
})
const streams = [is,t,os]
console.log(streams.map((s) => { return s.constructor.name }).join(' => '))
try {
console.log('Start Pipeline')
setTimeout(async () => {
console.log('Kill',pid)
const operation = `kill ${pid}`
const res = await connection2.query(operation);
connection2.release()
},5000)
await pipeline(...streams);
console.log('End Pipeline')
} catch (err) {
console.error('Pipeline error:', err);
}
console.log('Done')
await this.closeConnection();
await this.closePool();
} catch (e) {
console.log(e)
await this.closeConnection();
await this.closePool();
console.log(e)
}
}
}
process.on('unhandledRejection', (e,p) => {
console.log("Unhandled",e,p)
})
const test = new Test();
test.test().then(() => {
console.log('Success')
}).catch((e) => {
console.log(e)
})
The process hangs
C:\Development\YADAMU\src\scratch\mysql>node lostConnectionMySQL2.js
Pool Created
Connection obtained
Connection obtained
[ { 'count(*)': '3654736' } ]
Readable => MyTransform => WriteStream
Start Pipeline
Kill 39
The above was captured 5 min after the KILL was executed.
The environment used is shown here
C:\Development\YADAMU\src\scratch\mysql>node -v
v22.2.0
C:\Development\YADAMU\src\scratch\mysql>npm ls
[email protected] C:\Development\YADAMU\src
+-- @aws-sdk/[email protected]
+-- @aws-sdk/[email protected]
+-- @azure/[email protected]
+-- @electron/[email protected]
+-- [email protected]
+-- [email protected]
+-- [email protected]
+-- [email protected]
+-- [email protected]
+-- [email protected]
+-- [email protected]
+-- [email protected]
+-- [email protected]
+-- ibm_db_electron@npm:[email protected]
+-- [email protected]
+-- [email protected]
+-- [email protected]
+-- [email protected]
+-- [email protected]
+-- [email protected]
+-- [email protected]
+-- [email protected]
+-- [email protected]
+-- [email protected]
+-- [email protected]
+-- [email protected]
+-- [email protected]
+-- [email protected]
+-- [email protected]
+-- [email protected]
+-- [email protected]
`-- [email protected]
Similar behavior is seen on linux.