[Query] Poller functionality in zeromq.js
Hi All, As per the code of index.js and testcases there is no poller functionality provided in nodejs binding of ZeroMQ, although it is supported in native code ..
Is it supported, if not , is there is any specific reason for not supporting the poller?
It's been a while since you asked this, but could you elaborate what you expect from such functionality? The polling logic is integrated into the Node.js event loop and it is not obvious to me how it could be used in a different way with Node.js.
@rolftimmermans
In the nodejs version of lbbroker of zguide: lbbroker: Load balancing broker in Node.js, the comment says:
// What if no workers are available? Delay till one is ready.
// This is because I don't know the equivalent of zmq_poll
// in Node.js zeromq, which is basically an event loop itself.
// I start an interval so that the message is eventually sent. \
// Maybe there is a better way.
The interval for setInterval is set to 10 ms.
Is the behaviour of this code the same as the C version?
I think when setInterval is used, frontend requests will be queued in nodejs' event loop,
instead of zmq socket's message queue right? thus can not be handled by zmq anymore.
My take on the example using the new API is the following,
which does not take message out of zmq's message queue,
but it still need to use wait(10) in handleTask.
I think it is differet from the C version that uses poll,
because the nodejs code is doing a buzy loop with a delay,
but the C version is blocked by poll and waiting for the IO event.
Maybe we need to support zmq_poll to let nodejs user implement custom event emitters.
import * as Zmq from "zeromq"
import { wait } from "../utils/wait"
type State = {
frontend: Zmq.Router
backend: Zmq.Router
workerQueue: Array<string>
}
async function run() {
const frontend = new Zmq.Router()
const backend = new Zmq.Router()
const who = "load-balancer"
const loadBalancerFrontend = "tcp://127.0.0.1:3000"
const loadBalancerBackend = "tcp://127.0.0.1:3001"
await frontend.bind(loadBalancerFrontend)
await backend.bind(loadBalancerBackend)
console.log({ who, message: "started" })
const state: State = { frontend, backend, workerQueue: [] }
handleResult(state)
handleTask(state)
}
async function handleResult(state: State) {
for await (const [workerId, kind, ...rest] of state.backend) {
state.workerQueue.push(String(workerId))
switch (String(kind)) {
case "Ready": {
}
case "Result": {
const [clientId, result] = rest
await state.frontend.send([clientId, result])
}
}
}
}
async function handleTask(state: State) {
while (true) {
const workerId = state.workerQueue.shift()
if (workerId === undefined) {
await wait(10)
continue
}
const [clientId, task] = await state.frontend.receive()
await state.backend.send([workerId, clientId, task])
}
}
run()
I see the trick of dispatchPending in the example majordemo code,
I will just use this trick.
It is amazing that I can use reactive programming (learned from web frontend) to solve this problem: https://github.com/xieyuheng/zeromq-messaging-patterns/blob/master/src/load-balancing-broker/brokerReactivelyMatch.ts
import { watch } from "@vue/runtime-core"
import type { Broker } from "./Broker"
import { brokerMatch } from "./brokerMatch"
export function brokerReactivelyMatch(broker: Broker) {
watch(
() => broker.workerIds.length,
() => {
brokerMatch(broker)
},
)
watch(
() => broker.tasks.length,
() => {
brokerMatch(broker)
},
)
}
The same for the majordemo,
when reactive programming is used,
we do not have to call dispatchPending every time the workers or requests changed,
we can simply watch the them and reactively call dispatchPending.
- https://github.com/zeromq/zeromq.js/blob/master/examples/majordomo/service.ts#L34