zeromq.js icon indicating copy to clipboard operation
zeromq.js copied to clipboard

Subscriber Memory Leak

Open samueleforconi opened this issue 5 years ago • 5 comments

I'm running zeromq.js library along with Node v12 (but also tried with Node v10) on a RaspberryPi 4B running Raspbian Buster.

System:

RaspberryPi 4B (2GB Ram)

$ uname -a
Linux raspberry 4.19.97-v7l+ #1294 SMP Thu Jan 30 13:21:14 GMT 2020 armv7l GNU/Linux
$ cat /etc/os-release
PRETTY_NAME="Raspbian GNU/Linux 10 (buster)"
NAME="Raspbian GNU/Linux"
VERSION_ID="10"
VERSION="10 (buster)"
VERSION_CODENAME=buster
ID=raspbian
ID_LIKE=debian
HOME_URL="http://www.raspbian.org/"
SUPPORT_URL="http://www.raspbian.org/RaspbianForums"
BUG_REPORT_URL="http://www.raspbian.org/RaspbianBugs"
$ node --version
v10.19.0
$  cat /usr/lib/node_modules/zeromq/package.json
{
  "_from": "zeromq@5",
  "_id": "[email protected]",
  "_inBundle": false,
  "_integrity": "sha512-qsckhCmrg6et6zrAJytC971SSN/4iLxKgkXK1Wqn2Gij5KXMY+TA+3cy/iFwehaWdU5usg5HNOOgaBdjSqtCVw==",
  "_location": "/zeromq",
  "_phantomChildren": {},
  "_requested": {
    "type": "range",
    "registry": true,
    "raw": "zeromq@5",
    "name": "zeromq",
    "escapedName": "zeromq",
    "rawSpec": "5",
    "saveSpec": null,
    "fetchSpec": "5"
  },
  "_requiredBy": [
    "#USER"
  ],
  "_resolved": "https://registry.npmjs.org/zeromq/-/zeromq-5.2.0.tgz",
  "_shasum": "92eed6baeee5167977e51a2e2360b2c29a3b39fd",
  "_spec": "zeromq@5",
  "_where": "/usr/lib/node_modules",
  "author": {
    "name": "Justin Tulloss",
    "email": "[email protected]",
    "url": "http://justin.harmonize.fm"
  },
  "bugs": {
    "url": "https://github.com/zeromq/zeromq.js/issues"
  },
  "bundleDependencies": false,
  "dependencies": {
    "nan": "^2.14.0",
    "prebuild-install": "^5.3.2"
  },
  "deprecated": false,
  "description": "ZeroMQ for node.js",
  "devDependencies": {
    "electron-mocha": "^6.0.0",
    "jsdoc": "^3.5.4",
    "mocha": "^5.0.0",
    "nyc": "^12.0.2",
    "prebuild": "^9.1.1",
    "semver": "^5.4.1",
    "should": "^13.0.0"
  },
  "engines": {
    "node": ">=6.0"
  },
  "gypfile": true,
  "homepage": "https://github.com/zeromq/zeromq.js#readme",
  "keywords": [
    "zeromq",
    "zmq",
    "0mq",
    "ømq",
    "libzmq",
    "native",
    "binding",
    "addon"
  ],
  "license": "MIT",
  "main": "index",
  "name": "zeromq",
  "repository": {
    "type": "git",
    "url": "git+https://github.com/zeromq/zeromq.js.git"
  },
  "scripts": {
    "build:docs": "jsdoc -R README.md -d docs lib/*.js",
    "build:libzmq": "node scripts/preinstall.js",
    "coverage": "nyc report --reporter=text-lcov > coverage.lcov",
    "install": "node scripts/prebuild-install.js || (node scripts/preinstall.js && node-gyp rebuild)",
    "prebuild": "prebuild --all --strip",
    "precoverage": "nyc npm run test",
    "test": "mocha --expose-gc --slow 300",
    "test:electron": "electron-mocha --slow 300"
  },
  "version": "5.2.0"
}
$ sudo dpkg -l | grep zmq
ii  libzmq3-dev:armhf              4.3.1-4+deb10u1                     armhf        lightweight messaging kernel (development files)
ii  libzmq5:armhf                  4.3.1-4+deb10u1                     armhf        lightweight messaging kernel (shared library)

The subscriber code is a simple subscriber (as the one provided in the examples).

I started monitoring the subscriber process using linux top utility, initially the subscriber process was using around 30M of memory, after several hours of running (around 9-10hrs) the process was using 370M of memory.

I can see this also looking at the top interface, the memory increase regularly, and after a few minutes I can see an increase of some MB (around 5/6M).

Tell me if I can report any other information to inspect this problem.

samueleforconi avatar Mar 27 '20 08:03 samueleforconi

I can confirm this issue on Ubuntu as well. Simple subscriber grows unbounded with respect to memory. Occurs in v5 compat as well as v6 beta 6.

stephenm27 avatar Mar 27 '20 13:03 stephenm27

I have reverted back to my previous configuration (NodeJS v8.0 / Zeromq.js v4.6.0), no memory problems with these versions.

samueleforconi avatar Mar 30 '20 09:03 samueleforconi

Thanks for your report. Could you share the code you used to test it (subscriber + producer)?

rolftimmermans avatar Jun 09 '20 14:06 rolftimmermans

I can't reproduce this with the following code on my PR (#444). Unless you give the code in which this happens, I don't think it is possible to fix the issue.

thread-worker subscriber example

https://github.com/zeromq/zeromq.js/blob/master/examples/threaded-worker/threaded-worker.ts

import {Worker} from "worker_threads"
import * as zmq from "./lib/index.js"

export class ThreadedWorker {
  static async spawn(threads: number) {
    const workers = Array.from({length: threads}).map(() => {
      return new Promise((resolve, reject) => {
        const src = `
          const zmq = require("./lib/index.js")
          ${ThreadedWorker.toString()}
          new ThreadedWorker().run()
        `

        new Worker(src, {eval: true}).on("exit", code => {
          if (code === 0) {
            resolve(undefined)
          } else {
            reject(new Error(`Worker stopped with exit code ${code}`))
          }
        })
      })
    })

    await Promise.all(workers)
    console.log("all workers stopped")
  }

  /* Queue only 1 incoming message. */
  input = new zmq.Pull({receiveHighWaterMark: 1})
  output = new zmq.Push()
  signal = new zmq.Subscriber()

  shift = 13
  maxDelay = 2000 /* Average of 1s. */

  constructor() {
    this.input.connect("inproc://input")
    this.output.connect("inproc://output")

    this.signal.connect("inproc://signal")
    this.signal.subscribe()

    const listen = async () => {
      for await (const [sig] of this.signal) {
        if (sig.toString() === "stop") this.stop()
      }
    }

    listen()
  }

  async stop() {
    this.input.close()
    this.output.close()
    this.signal.close()
  }

  /* Loop over input and produce output. */
  async run() {
    for await (const [pos, req] of this.input) {
      if (req.length !== 1) {
        console.log(`skipping invalid '${req}'`)
        continue
      }

      console.log(`received work '${req}' at ${pos}`)

      const res = await this.work(req.toString())
      await this.output.send([pos, res])

      console.log(`finished work '${req}' -> '${res}' at ${pos}`)
    }
  }

  /* Do the actual Caesar shift. */
  async work(req: string): Promise<string> {
    // await new Promise((resolve) => setTimeout(resolve, Math.random() * this.maxDelay))

    let char = req.charCodeAt(0)

    for (let i = 0; i < 200000001; i++) {
      if (char >= 65 && char <= 90) {
        char = ((char - 65 + this.shift) % 26) + 65
      } else if (char >= 97 && char <= 122) {
        char = ((char - 97 + this.shift) % 26) + 97
      }
    }

    return String.fromCharCode(char)
  }
}


async function sleep() {
  await new Promise(resolve => {
    setTimeout(() => {
      resolve(undefined)
    }, 10000)
  })
}
async function main() {
  sleep()
  console.log("start")
  ThreadedWorker.spawn(2);
  const x = new ThreadedWorker()
  await x.run()
  sleep()
  x.stop()
}

main().catch(e => {
  throw e
})

aminya avatar Apr 08 '21 06:04 aminya

I found an access violation error in Socket Ironpc that might be related to this, but it might not be because it only happens with Ironpc.

https://github.com/nodejs/node-addon-api/issues/965

aminya avatar Apr 08 '21 12:04 aminya