nodejs-pubsub icon indicating copy to clipboard operation
nodejs-pubsub copied to clipboard

Pubsub emits more messages then allowed by maxMessages and allowExcessMessages after reaching maxExtension period

Open QuestionAndAnswer opened this issue 4 years ago • 9 comments

Environment details

  • OS: Windows 10
  • Node.js version: 12.8.3
  • npm version: 6.14.6
  • @google-cloud/pubsub version: 2.6.0

Looking into the code, and issue details this also should be relevant for 2.9.0.

Steps to reproduce

You would need 1 topic and 1 subscription of this topic. Bellow is a subscription parameters.

image

  1. Create a server with the following content and run it.

const pubsub = require("@google-cloud/pubsub");

const config = {
    subName: process.env["SUB_NAME"] || "libs-test-pubsub",
};

console.log(config);

const ps = new pubsub.PubSub();
const subscription = ps.subscription(config.subName, {
    flowControl: {
        maxMessages: 2,
        allowExcessMessages: false,
        maxExtension: 5,
    },
    streamingOptions: {
        maxStreams: 1
    },
});

let rcvCounter = 0;
let prcCounter = 0;

function printCounters() {
    process.stdout.clearLine(0)
    process.stdout.write(`rcv: ${rcvCounter}\tprc: ${prcCounter}\r`);
}

const onError = err => {
    console.error("Subscription error:", err);
    process.exit(-1);
};

const onMessage = msg => {
    rcvCounter++;
    printCounters();

    setTimeout(() => {
        msg.ack();
        prcCounter++;
        printCounters();
    }, 2000);
}

subscription.on("error", onError);
subscription.on("message", onMessage);

  1. Now push into the topic 100 messages in a batch, so pubsub subscription queue should acquire 100+- messages in a short amount of time.

You can use this script for that.

const pubsub = require("@google-cloud/pubsub");

const config = {
    topicName: process.env["TOPIC_NAME"] || "libs-test-pubsub",
};

console.log(config);

const ps = new pubsub.PubSub();
const topic = ps.topic(config.topicName);

const messages = [];

for (let i = 0; i < 100; i++) {
    const p = topic.publish(Buffer.from(""), { attr1: "123" })
        .catch(err => {
            console.log(err);
        });

    messages.push(p);
}

Promise.all(messages);
  1. Now, you should see something like this (see the video bellow as well)

First, you would rcv is 2 and prc is 0. Expected, as we allowed only 2 messages being processed (passed into message callback) by flow control. Then, prc becoming 2 (as 2 second timeout reached and we acked the message) and rcv is 4. Such stepped processing and updates should happen each 2 seconds. But, after some time, somehere in the range of maxExtension period deadline (when pubsub starting either extending messages periods or removing them from internal queue), rcv counter jumps up to 100, while prc is still 4, 6 or 8 depends when it happen, but the number is always small and smaller then 20, even 15.

https://user-images.githubusercontent.com/6842709/108252911-3b485400-716a-11eb-80ba-a0f19a82d901.mp4

This leads to falling assumption that, for example k8s pod which processing the messages and have resources limits by CPU or MEM might be evicted because it would try to process all the messages and would reach resources limits (would be throttled by CPU or Killed because of memory limits).

Now, where it is happeing. I've added logs into library, so I definitely see that it following though _extendDeadlines in lease-manager.ts.

image

Through else path in for loop, and calling this.remove(message);

image

And in remove it starting constantly falling into this.pending > 0 condition and dispensing current message (which actually already expired and should be redelivered by pubsub). It's obvious here, that such message should not be dispensed to the client code.

I'm not sure how to fix this in pubsub lib code, either as I don't think that there is a proper workaround for the client code (ours, as we are relying that flow control would be met).

QuestionAndAnswer avatar Feb 17 '21 18:02 QuestionAndAnswer

@QuestionAndAnswer Thanks for the super detailed report. I'll have to go over this in more detail, but I'll mark it as a bug for now.

I like your editor's comment on the method. 😂 You must be kidding

feywind avatar Feb 18 '21 17:02 feywind

@feywind so, how is it going? Did you have a chance to look into this issue?

QuestionAndAnswer avatar Feb 22 '21 08:02 QuestionAndAnswer

@feywind we are running into the exact same issue in production.

Node 14 @google-cloud/pubsub version: 2.10.0

Running it in a similar env, docker container inside k8s.

Any progress here?

procedurallygenerated avatar Jun 11 '21 16:06 procedurallygenerated

Quick (sadly content-less) update - no changes here yet, but I've been on a bit of a feature development rotation and hope to get back to these issues.

feywind avatar Sep 09 '21 16:09 feywind

I stared at this for a while today, and I'm not as familiar with this bit of the library, but I don't know why we'd want to dispense messages from the pending queue just because another message was removed. It feels like there was a paradigm shift here that wasn't taken into account, but I'm going to ask some others.

feywind avatar Nov 26 '21 23:11 feywind

Linked to the meta-issue about transport problems: b/242894947

feywind avatar Aug 17 '22 20:08 feywind

Is there any way to bypass the problem? Our cluster is running out of memory and options regarding flowController don't seem to work. Maybe is there a way to stop subscribers to listen an enormous amount of messages pulled?

michal-zap avatar Aug 22 '22 11:08 michal-zap

We have the same problem with @google-cloud/pubsub version: 3.4.1 . Any updates or workaround for this issue?

jcgomezcorrea avatar Mar 13 '23 16:03 jcgomezcorrea

For those who were interested in a workaround:

subscription.setOptions({ flowControl: { maxMessages: 1, allowExcessMessages: false } })

(It could work without allowExcessMessages, but I didn't test it without this option).

The point is that you need to set it on every connection to the broker. It looks like the setting is ignored when you use it in the subscription.create(). So every time you create a topic on initialization and then listening to the queue, or you're subscribing to the topic that already exists - set it to this specific subscription/connection.

I've tested it on node:16.13-alpine + @google-cloud/[email protected] and on the latest @google-cloud/[email protected]

alexby avatar Oct 14 '23 17:10 alexby