Subscriber Memory Leak
I'm running zeromq.js library along with Node v12 (but also tried with Node v10) on a RaspberryPi 4B running Raspbian Buster.
System:
RaspberryPi 4B (2GB Ram)
$ uname -a
Linux raspberry 4.19.97-v7l+ #1294 SMP Thu Jan 30 13:21:14 GMT 2020 armv7l GNU/Linux
$ cat /etc/os-release
PRETTY_NAME="Raspbian GNU/Linux 10 (buster)"
NAME="Raspbian GNU/Linux"
VERSION_ID="10"
VERSION="10 (buster)"
VERSION_CODENAME=buster
ID=raspbian
ID_LIKE=debian
HOME_URL="http://www.raspbian.org/"
SUPPORT_URL="http://www.raspbian.org/RaspbianForums"
BUG_REPORT_URL="http://www.raspbian.org/RaspbianBugs"
$ node --version
v10.19.0
$ cat /usr/lib/node_modules/zeromq/package.json
{
"_from": "zeromq@5",
"_id": "[email protected]",
"_inBundle": false,
"_integrity": "sha512-qsckhCmrg6et6zrAJytC971SSN/4iLxKgkXK1Wqn2Gij5KXMY+TA+3cy/iFwehaWdU5usg5HNOOgaBdjSqtCVw==",
"_location": "/zeromq",
"_phantomChildren": {},
"_requested": {
"type": "range",
"registry": true,
"raw": "zeromq@5",
"name": "zeromq",
"escapedName": "zeromq",
"rawSpec": "5",
"saveSpec": null,
"fetchSpec": "5"
},
"_requiredBy": [
"#USER"
],
"_resolved": "https://registry.npmjs.org/zeromq/-/zeromq-5.2.0.tgz",
"_shasum": "92eed6baeee5167977e51a2e2360b2c29a3b39fd",
"_spec": "zeromq@5",
"_where": "/usr/lib/node_modules",
"author": {
"name": "Justin Tulloss",
"email": "[email protected]",
"url": "http://justin.harmonize.fm"
},
"bugs": {
"url": "https://github.com/zeromq/zeromq.js/issues"
},
"bundleDependencies": false,
"dependencies": {
"nan": "^2.14.0",
"prebuild-install": "^5.3.2"
},
"deprecated": false,
"description": "ZeroMQ for node.js",
"devDependencies": {
"electron-mocha": "^6.0.0",
"jsdoc": "^3.5.4",
"mocha": "^5.0.0",
"nyc": "^12.0.2",
"prebuild": "^9.1.1",
"semver": "^5.4.1",
"should": "^13.0.0"
},
"engines": {
"node": ">=6.0"
},
"gypfile": true,
"homepage": "https://github.com/zeromq/zeromq.js#readme",
"keywords": [
"zeromq",
"zmq",
"0mq",
"ømq",
"libzmq",
"native",
"binding",
"addon"
],
"license": "MIT",
"main": "index",
"name": "zeromq",
"repository": {
"type": "git",
"url": "git+https://github.com/zeromq/zeromq.js.git"
},
"scripts": {
"build:docs": "jsdoc -R README.md -d docs lib/*.js",
"build:libzmq": "node scripts/preinstall.js",
"coverage": "nyc report --reporter=text-lcov > coverage.lcov",
"install": "node scripts/prebuild-install.js || (node scripts/preinstall.js && node-gyp rebuild)",
"prebuild": "prebuild --all --strip",
"precoverage": "nyc npm run test",
"test": "mocha --expose-gc --slow 300",
"test:electron": "electron-mocha --slow 300"
},
"version": "5.2.0"
}
$ sudo dpkg -l | grep zmq
ii libzmq3-dev:armhf 4.3.1-4+deb10u1 armhf lightweight messaging kernel (development files)
ii libzmq5:armhf 4.3.1-4+deb10u1 armhf lightweight messaging kernel (shared library)
The subscriber code is a simple subscriber (as the one provided in the examples).
I started monitoring the subscriber process using linux top utility, initially the subscriber process was using around 30M of memory, after several hours of running (around 9-10hrs) the process was using 370M of memory.
I can see this also looking at the top interface, the memory increase regularly, and after a few minutes I can see an increase of some MB (around 5/6M).
Tell me if I can report any other information to inspect this problem.
I can confirm this issue on Ubuntu as well. Simple subscriber grows unbounded with respect to memory. Occurs in v5 compat as well as v6 beta 6.
I have reverted back to my previous configuration (NodeJS v8.0 / Zeromq.js v4.6.0), no memory problems with these versions.
Thanks for your report. Could you share the code you used to test it (subscriber + producer)?
I can't reproduce this with the following code on my PR (#444). Unless you give the code in which this happens, I don't think it is possible to fix the issue.
thread-worker subscriber example
https://github.com/zeromq/zeromq.js/blob/master/examples/threaded-worker/threaded-worker.ts
import {Worker} from "worker_threads"
import * as zmq from "./lib/index.js"
export class ThreadedWorker {
static async spawn(threads: number) {
const workers = Array.from({length: threads}).map(() => {
return new Promise((resolve, reject) => {
const src = `
const zmq = require("./lib/index.js")
${ThreadedWorker.toString()}
new ThreadedWorker().run()
`
new Worker(src, {eval: true}).on("exit", code => {
if (code === 0) {
resolve(undefined)
} else {
reject(new Error(`Worker stopped with exit code ${code}`))
}
})
})
})
await Promise.all(workers)
console.log("all workers stopped")
}
/* Queue only 1 incoming message. */
input = new zmq.Pull({receiveHighWaterMark: 1})
output = new zmq.Push()
signal = new zmq.Subscriber()
shift = 13
maxDelay = 2000 /* Average of 1s. */
constructor() {
this.input.connect("inproc://input")
this.output.connect("inproc://output")
this.signal.connect("inproc://signal")
this.signal.subscribe()
const listen = async () => {
for await (const [sig] of this.signal) {
if (sig.toString() === "stop") this.stop()
}
}
listen()
}
async stop() {
this.input.close()
this.output.close()
this.signal.close()
}
/* Loop over input and produce output. */
async run() {
for await (const [pos, req] of this.input) {
if (req.length !== 1) {
console.log(`skipping invalid '${req}'`)
continue
}
console.log(`received work '${req}' at ${pos}`)
const res = await this.work(req.toString())
await this.output.send([pos, res])
console.log(`finished work '${req}' -> '${res}' at ${pos}`)
}
}
/* Do the actual Caesar shift. */
async work(req: string): Promise<string> {
// await new Promise((resolve) => setTimeout(resolve, Math.random() * this.maxDelay))
let char = req.charCodeAt(0)
for (let i = 0; i < 200000001; i++) {
if (char >= 65 && char <= 90) {
char = ((char - 65 + this.shift) % 26) + 65
} else if (char >= 97 && char <= 122) {
char = ((char - 97 + this.shift) % 26) + 97
}
}
return String.fromCharCode(char)
}
}
async function sleep() {
await new Promise(resolve => {
setTimeout(() => {
resolve(undefined)
}, 10000)
})
}
async function main() {
sleep()
console.log("start")
ThreadedWorker.spawn(2);
const x = new ThreadedWorker()
await x.run()
sleep()
x.stop()
}
main().catch(e => {
throw e
})
I found an access violation error in Socket Ironpc that might be related to this, but it might not be because it only happens with Ironpc.
https://github.com/nodejs/node-addon-api/issues/965