zeromq.node icon indicating copy to clipboard operation
zeromq.node copied to clipboard

node zmq should implement Stream interface not EventEmitter?

Open nacholibre opened this issue 10 years ago • 36 comments

I'm just unable to write clean code with the current API of the node zeromq. There is something fundamentally wrong here, I think the API should be wrapped around Stream not EventEmitter.

Simple example: lets say I want to distribute work among nodes (one pusher, n pullers). With the current API pullers can't stop pulling, and it should't be like that, pullers should pull upon demand, but with sock.on('message', ...) there is no way of stopping the messages flowing in and this just breaks the whole philosophy of Zero MQ.

I feel like on pull socket I should be able to call .pause() and .resume(), I should have control over the whole pull process.

Maybe push sockets should be Writable Stream and pull sockets should be Readable Stream?

What do you guys think about this?

nacholibre avatar Dec 28 '14 13:12 nacholibre

It shouldn't be difficult to change from eventemitter to streams. Basically there are two main methods : send and flush Changing these methods to act with streams shouldn't be difficult... Very important for me is to check for performance changes. I'm still looking to the difference with python so I could perform some tests in these days.

prdn avatar Dec 29 '14 13:12 prdn

@prdn

It shouldn't be difficult to change from eventemitter to streams.

easier said than done, but I think you're correct.

because node streams inherit from EventEmitter with readable, data, end, close, and a bunch of other implementation specific events.

while node's, or rather the io.js, stream interface is awesome, I doubt any language binding like this should veer far from its underlying application binary interface, a.k.a. ABI.

convenience and value in a consistent messaging protocol is born out of language agnostic connect-ability, which to me means: a nice platform connector for handling distributed and diverse computing environments.

so for example, sticking close to its ABI ensures we can connect socket types from node servers to python or go, and ultimately to iOS devices, androids, raspberryPi's etc., being that messages all speak the same tongue envelope.

So i think when you keep the cross-platform/cross-language messaging interface consistent with specifications provided by that underlying library, it's a big win for writing efficient network architecture across areas otherwise marked by inconsistent communication patterns/frameworks/protocols, etc.

As long as node's higher level stream interface gets out of the way of that need, I would be in favor.

By the way, on the subject of streams and cool interfaces, it looks like libzmq v4+ has a stream socket type that can route and pipe across non-peer sockets.

reqshark avatar Jan 06 '15 02:01 reqshark

I think the API is fine the way it is as it does represent the underlying ABI quite well (may need improvement, but perhaps we should focus on that?). If you want to wrap candy around it, why not turn that into a separate module that depends on this one? zmq-streams or something. I think you're inevitably going to run into problems with the socket as a writable stream, but by all means, prove me wrong :) A readable stream (instead of event emission) could be interesting though. I don't really see any potential issues there.

ronkorving avatar Jan 06 '15 02:01 ronkorving

well ya of course. @ronkorving +1.

and if libzmq gave us a streaming type of interface let's start there.

reqshark avatar Jan 06 '15 02:01 reqshark

@reqshark you have a point, I agree with you, but the current node bindings doesn't follow the so called ABI. Pull sockets should pull on demand, that is the point of the pullers, but I cannot implement pull on demand with the current node zmq API.

nacholibre avatar Jan 06 '15 09:01 nacholibre

You're right about that, and that is a problem. I understand why you say streams solve this, but I would argue that we should stick closer (rather than further removed) to the ABI to address this problem. Streams can always be stuck on top after the fact.

ronkorving avatar Jan 06 '15 10:01 ronkorving

Method with callback function will solve the problem, something like this:

sock.recv(function(message) {
    console.log('message received');
});

If I have some time I'll try to implement it and see how it goes.

nacholibre avatar Jan 06 '15 14:01 nacholibre

@nacholibre glad you saw my point, but hold up, you're saying pull is broken and that you might have a solution:

I cannot implement pull on demand with the current node zmq API.

can you explain how pull is broken or maybe open a separate issue for discussing the pull socket. if pull is borked, let's fix it. @ronkorving is this true is there something wrong with our pull socket?

Anyway assuming that's the case, it's a different issue than the node streams discussion.

Moreover, to solve a broken socket by introducing a stream interface IMO may not be the ideal solution.

All node streams, old and new have the following characteristic: they are either a source, a sink or some form of duplex holding both source and sink properties/capabilities at once. Some call source streams Readable and sinks get referred to as Writeable, and the combo stream is sometimes a Transform or whatever.

One could try to map the ZeroMQ socket types to these interfaces, for example a pair socket is definitely a duplex.

All this ostensible categorical cleanliness in conceptually associating socket type behavior among node streams of past and present is an implementation pitfall IMO. Case and point: this update to my PR just made the travis CI pass, check out where i changed lines 46-49: https://github.com/reqshark/zeromq.node/blob/master/test/socket.stream.js#L46-L49

On the other hand to my point, the best example I saw recently building on a similar socket message event pattern with a really elegant stream interface is probably Max Ogden's repo where they did that on the node ws websocket module: https://github.com/maxogden/websocket-stream/blob/master/index.js

reqshark avatar Jan 06 '15 14:01 reqshark

The only thing wrong with our pull socket is that it's not on demand. It will just pull as fast as it can and emit and emit and emit. Probably faster than you can handle the messages. That makes a master/worker pattern with many workers (pull) for one source (push) of jobs less than ideal.

So, our pull socket works. But a typical use case I imagine is not working all that well.

ronkorving avatar Jan 07 '15 02:01 ronkorving

@ronkorving bear with me pls: what event does pull emit so fast in succession?

while pulling emit, emit, emit it's just firing message events like the normal callback pattern?

pull.on('message', function (msg) {
  //do something with msg...
})

reqshark avatar Jan 07 '15 02:01 reqshark

Right, but if I want to send jobs to 10 pullers, they will all be pulling as fast as possible, rather than providing back-pressure. That is, pull on demand, after resources are available to execute a next job. The use case: http://zguide.zeromq.org/page:all#Divide-and-Conquer

ronkorving avatar Jan 07 '15 03:01 ronkorving

ahh, :palm_tree: therefore one of your ten pull sockets may absorb unfair results. gotcha.

@nacholibre, @ronkorving, this isn't just a slow joiner syndrome?

those divide and conquer docs say only single sink under workers will get results in fair-queue style.

reqshark avatar Jan 07 '15 14:01 reqshark

@reqshark I agree with you, implementing streams in the current API is not a good idea and maybe is not a good idea at all, we should fix the pulling process.

this isn't just a slow joiner syndrome?

No, it's not slow joiner syndrome and it's not about fair-queue either, when pulling you should be able to stop, process the pulled messages and start pulling again, with the EventEmitter you can't do that, you can't stop messages from flowing in.

Here is simple example: one pusher and one puller.

Push socket Reads from file and push every line of it, the file has 1 million lines.

Pull socket Process every single message, do some async queries etc with concurrency factor of 100.

How it should be: the pull socket should pull messages until 100 is reached and then start processing it and stop pulling more messages. After the processing is done, it should repeat.

With the current API of the puller, you can't control the pull process, you can only attach on message event and if there is any messages it will be received, implementing back pressure is impossible. Eventually all messages will be transferred to the puller, and if we talk about million of messages that will result out of memory for the pullers. This is why pullers should pull whenever they can process new messages, not constantly.

One more thing for the push socket. Right now you can push sock.send() unlimited messages (even if no one is connected to the socket) and if you don't do regular checks to the pending outgoing messages socket._zmq.pending your memory will be eaten up because all of the messages will be buffered in memory.

One should be able to know when the message is sent or at least have easy way for checking the outgoing queue, in the python binding (and all blocking languages) send will block until there is anyone to pull. Maybe sock.send() should have second parameter callback with argument for successful sending?

nacholibre avatar Jan 07 '15 16:01 nacholibre

Maybe sock.send() should have second parameter callback with argument for successful sending?

sure you could do that but it won't change the problem of handler buildup from pull's async message callbacks overlapping any i/o you're trying to handle in-process, i.e. within the onmessage callback.

I think the best fix would be add synchronous function support in the binding. the node runloop itself must be paused and stop being asynchronous during pull connection

reqshark avatar Jan 07 '15 17:01 reqshark

I think pull socket should have recv method with callback and pushSockets should have easy to access pendingMessages property.

pushSocket.send(message);
if (pushSocket.pendingMessages >= 500) {
  console.log('slow pullers, should stop sending messages');
}

pullSocket.recv(function(message) {
  console.log('message received');
});

nacholibre avatar Jan 07 '15 18:01 nacholibre

woah @nacholibre hold up, your comment previous to this located and began to describe the precise nature of a pull socket deficiency rearing up in the JavaScript layer.

Moreover you provide insight as to a clear and simple solution:

when pulling you should be able to stop, process the pulled messages and start pulling again

without touching the push socket can we manage pull inbound message i/o so as to

stop, process the pulled messages and start pulling again

To eliminate new JS layer perf hits incurred watching pendingMessages (which i assume is some count you had in mind for yet-to-be processed, not whether a msg got sent).

I think the problem here is unblocked i/o and the push socket is fine. Leave it alone.

can we fix the onmessage handler i/o overlap directly in pull?

reqshark avatar Jan 07 '15 20:01 reqshark

does a socketopt already sort this thing out in pull?

reqshark avatar Jan 07 '15 21:01 reqshark

What do you mean by that? Hwm? If you are talking about high water mark option I can tell you about my past experience with it - it doesn't work.

Just try to implement push pull pattern with non trivial number of messages and you'll understand everything I'm saying.

Try to implement back pressure on the push and on the pull side and you'll stumble probably on the same steps.

nacholibre avatar Jan 07 '15 22:01 nacholibre

would you be able to push a quick ex. to illustrate:

try to implement push pull pattern with non trivial number of messages and you'll understand everything I'm saying.

https://github.com/reqshark/zeromq.node.issue.380

reqshark avatar Jan 07 '15 22:01 reqshark

I've made my repo, because in yours you should npm install in the package directory.

https://github.com/nacholibre/nodezmq.push.pull.example/

try to limit received messages in the consumer.js and try to limit outgoing pending messages in the producer.js

nacholibre avatar Jan 08 '15 09:01 nacholibre

while those horrific attacks yesterday in Paris test my focus doing software, I can still give you a nice scenerio now where backpressure doesn't matter.

no need to worry about flooding a fast pull socket's EventEmitter().

the concern about message buffering and overlapping emit distributions is put to rest in this example. pull/push emits are stable and consistent here, kind of starting a divide and conquer pattern.

in the following setup, runtime management of underlying i/o in the callback before the next emitted event works exactly as we hope and expect.

at least that's what I found in v0.11.14 and v0.10.35

@nacholibre, check my results when i:

try to limit received messages in the consumer.js and try to limit outgoing pending messages in the producer.js

well i thought maybe something in the files of zeromq.node might need to be refactored, so I started the repository without package.json. I ran rm -rf .git after pulling down master in order to easily diff changes I was considering. I did all that unconventional package stuff deliberately for a workflow that combines changes to producer.js/consumer.js scripts and any related updates i might consider applying on the binding.

whatever I'm sure there's better workflows for sending PRs to npm packages but sometimes you need to pull down a few copies and make dirtier require('../../') type calls to bring index.js in from wherever.

conclusion

i dont know why this works.

@ronkorving please check my work and see if this is right: https://github.com/reqshark/zeromq.node.issue.380

reqshark avatar Jan 09 '15 00:01 reqshark

In your example you are providing backpressure, but only because the work() function is doing its work synchronously. What if there's async involved? That's the scenario we're talking about. I'm sorry if that wasn't clear from the beginning :(

ronkorving avatar Jan 09 '15 02:01 ronkorving

hey thanks for helping me through this issue, i've learned a lot @ronkorving and @nacholibre now i completely agree with you guys. pull is borked.

if anyone wants to compare just git checkout async, here's a diff

i still don't really know why sync works so well, and I'll also comment that async eventually seems to calm down into some kind of consistent processing, but it takes at least 10 or 20 minutes of leaving on several pull sockets. still then when it finally reaches that state, it's way outside what I would consider a reasonable pull socket.

reqshark avatar Jan 09 '15 08:01 reqshark

Sync "work" execution prevents zmq from calling recv internally. Node is single threaded, and the bindings to ZMQ live in that same thread. As long as you're looping over some code, ZMQ is not picking up messages (and is thus providing back-pressure).

ronkorving avatar Jan 09 '15 09:01 ronkorving

So what are we going to do now?

I don't have any experience with C and I think the binding should be modified, I can help with testing!

nacholibre avatar Jan 13 '15 10:01 nacholibre

It seems to me that depending on the socket type, an EventEmitter or manual extraction of a message is preferred. This seems really tricky. In that sense, a Node readable object-stream does make sense (since you can pause it). But again, sticking close to the ABI is probably still the best course of action as it allows us to be most future-proof with ZMQ.

It would completely break BC, but I think the most elegant solution would be to emit "ready" instead of "message" when the socket can be read from. CallbackIfReady currently triggers a flush run, which is not what we want here. Perhaps, one approach would be to listen for event listeners, and based on which event, we either emit messages or "ready" events. A pretty big hack imho, and reminds me of the Streams 2 fiasco.

Ideas welcome, because I see a lot of options, none of which are particularly elegant.

ronkorving avatar Jan 13 '15 12:01 ronkorving

I think the best way is to add additional recv method, that way we won't break the BC and receive will be on demand and will be as close as it can get to the zmq ABI, even closer compared to EventEmitter:

socket.recv(function(message) {
  console.log('message received');
});

that way you can implement back pressure on the pull side and use recv only if you need it.

What do you think?

nacholibre avatar Jan 13 '15 12:01 nacholibre

i wonder if that additional recv method might be best called from an additional socket?

from the section on load balancing:

One reason PUSH and DEALER use the simplistic approach is sheer performance. If you arrive in any major US airport, you'll find long queues of people waiting at immigration. The border patrol officials will send people in advance to queue up at each counter, rather than using a single queue. Having people walk fifty yards in advance saves a minute or two per passenger. And because every passport check takes roughly the same time, it's more or less fair. This is the strategy for PUSH and DEALER: send work loads ahead of time so that there is less travel distance.

This is a recurring theme with ZeroMQ: the world's problems are diverse and you can benefit from solving different problems each in the right way. The airport isn't the post office and one size fits no one, really well.

Let's return to the scenario of a worker (DEALER or REQ) connected to a broker (ROUTER). The broker has to know when the worker is ready, and keep a list of workers so that it can take the least recently used worker each time.

The solution is really simple, in fact: workers send a "ready" message when they start, and after they finish each task. The broker reads these messages one-by-one. Each time it reads a message, it is from the last used worker. And because we're using a ROUTER socket, we get an identity that we can then use to send a task back to the worker.

reqshark avatar Jan 13 '15 18:01 reqshark

@nacholibre hah, so the logic would become, that while recv is in transit, no "message" events are emitted?

ronkorving avatar Jan 14 '15 02:01 ronkorving

Hm, does messages get received by the client if no on message event listener is attached?

One should use either on message event listener or call recv on demand but not both at the same time.

IMO using EventEmitter is not good idea and should not be in the node zmq API at all.

nacholibre avatar Jan 14 '15 08:01 nacholibre