moleculer icon indicating copy to clipboard operation
moleculer copied to clipboard

Service broker with disabled load balancer fails to delivery Streams

Open alvaroinckot opened this issue 4 years ago • 5 comments

Prerequisites

Please answer the following questions for yourself before submitting an issue.

  • [x] I am running the latest version
  • [x] I checked the documentation and found no answer
  • [x] I checked to make sure that this issue has not already been filed
  • [x] I'm reporting the issue to the correct repository

Current Behavior

Service brokers with the load balancer disabled, do not deliver Stream content to the receiver action.

Expected Behavior

A service broker should send the entire stream content to the same context even if the load balancer is disabled.

Failure Information

.. q: 239 [2021-06-08T22:41:09.818Z] INFO broker-one/TRANSIT: Put the chunk into pool (size: 237). Seq: 240 [2021-06-08T22:41:09.819Z] INFO broker-one/TRANSIT: Put the chunk into pool (size: 238). Seq: 241 ...

Steps to Reproduce

Please provide detailed steps for reproducing the issue.

  1. clone https://github.com/alvarocantador/bug-moleculer-message-chunck
  2. run npm ci
  3. run docker-compose up -d
  4. run node index.js
  5. You will see that " broker-one/TRANSIT: Put the chunk into pool.." messages
  6. You will see the sample-parsed.pdf empty
  7. If you turn on the load balancer in the code for all brokers, and run it again, it will fill the sample-parsed.pdf properly

Reproduce code snippet

const moleculer = require("moleculer");
const fs = require('fs');
const path = require("path");
const brokerOne = new moleculer.ServiceBroker({ transporter: 'nats://localhost:4222', disableBalancer: true, nodeID: 'broker-one'});
const brokerTwo = new moleculer.ServiceBroker({ transporter: 'nats://localhost:4222', disableBalancer: true, nodeID: 'broker-two' });
const brokerThree = new moleculer.ServiceBroker({ transporter: 'nats://localhost:4222', disableBalancer: true, nodeID: 'broker-three' });
const bar = {
    version: 1,
    name: "bar",
    dependencies: [],
    actions: {
        save(ctx) {
            this.logger.info("file received.");
            const s = fs.createWriteStream(path.resolve("sample-parsed.pdf"));
            ctx.params.pipe(s);
            this.logger.info("file saved.");
        }
    },
};
const foo = {
    version: 1,
    name: "foo",
    dependencies: [],
    actions: {
        async send(ctx) {
            this.logger.info("reading file...");
            const stream = fs.createReadStream(path.resolve("sample.pdf"));
            this.logger.info("file to stream completed.");
            await ctx.call("v1.bar.save", stream, { meta: { filename: "sample.pdf", token: "SAMPLE SAMPLE SAMPLE" }});
            this.logger.info("stream sent.");
        }
    },
}
brokerOne.createService(bar);
brokerTwo.createService(bar);
brokerThree.createService(foo);

brokerOne.start()
    .then(() => brokerTwo.start())
    .then(() => brokerThree.start())
    .then(() => brokerOne.waitForServices([{ version: 1, name: "bar" }, { version: 1, name: "foo" }]))
    .then(() => brokerTwo.waitForServices([{ version: 1, name: "bar" }, { version: 1, name: "foo" }]))
    .then(() => brokerThree.waitForServices([{ version: 1, name: "bar" }, { version: 1, name: "foo" }]))
    .then(() => brokerThree.call("v1.foo.send"));

Context

Please provide any relevant information about your setup. This is important in case the issue is not reproducible except for under certain conditions.

  • Moleculer version: 0.14.13
  • NodeJS version: 12.16.1
  • Operating System: MacOS Catalina

Failure Logs

https://github.com/alvarocantador/bug-moleculer-message-chunck/blob/main/logs.txt

alvaroinckot avatar Jun 08 '21 23:06 alvaroinckot

I feel it won't work, because with a disabled load balancer, the messagebroker (e.g. NATS) balances the packets, and it won't transfer the stream packets to the same node.

icebob avatar Jun 09 '21 07:06 icebob

A couple options come to mind for different ways this can be fixed.

  1. Select a node from the registry rather than relying on the transporter's balancer. This feels wrong because the internal balancer is explicitly disabled, but a similar thing is already done in the broadcast code: https://github.com/moleculerjs/moleculer/blob/6f6225d6079af2116a2b4713c5d00037512131f4/src/service-broker.js#L1383-L1401

  2. Send a single packet (either the stream's seq=0 packet or a different packet type) and wait for a response to see where it gets routed by the transporter's balancer. Then send the remaining stream packets targeting that node directly.

ngraef avatar Jun 09 '21 14:06 ngraef

Is this problem still relevant or has it already been solved? I tried the suggested steps several times and all the time I got the correct result and the fully transferred file.

DonVietnam avatar Feb 04 '24 12:02 DonVietnam

@ngraef do you still face this issue?

icebob avatar Mar 31 '24 14:03 icebob

This was still a problem the last time I checked. However, I no longer use moleculer, so I can't say definitively for the latest version.

ngraef avatar Apr 01 '24 15:04 ngraef