nodejs-pubsub
nodejs-pubsub copied to clipboard
Pubsub emits more messages then allowed by maxMessages and allowExcessMessages after reaching maxExtension period
Environment details
- OS: Windows 10
- Node.js version: 12.8.3
- npm version: 6.14.6
-
@google-cloud/pubsub
version: 2.6.0
Looking into the code, and issue details this also should be relevant for 2.9.0.
Steps to reproduce
You would need 1 topic and 1 subscription of this topic. Bellow is a subscription parameters.
- Create a server with the following content and run it.
const pubsub = require("@google-cloud/pubsub");
const config = {
subName: process.env["SUB_NAME"] || "libs-test-pubsub",
};
console.log(config);
const ps = new pubsub.PubSub();
const subscription = ps.subscription(config.subName, {
flowControl: {
maxMessages: 2,
allowExcessMessages: false,
maxExtension: 5,
},
streamingOptions: {
maxStreams: 1
},
});
let rcvCounter = 0;
let prcCounter = 0;
function printCounters() {
process.stdout.clearLine(0)
process.stdout.write(`rcv: ${rcvCounter}\tprc: ${prcCounter}\r`);
}
const onError = err => {
console.error("Subscription error:", err);
process.exit(-1);
};
const onMessage = msg => {
rcvCounter++;
printCounters();
setTimeout(() => {
msg.ack();
prcCounter++;
printCounters();
}, 2000);
}
subscription.on("error", onError);
subscription.on("message", onMessage);
- Now push into the topic 100 messages in a batch, so pubsub subscription queue should acquire 100+- messages in a short amount of time.
You can use this script for that.
const pubsub = require("@google-cloud/pubsub");
const config = {
topicName: process.env["TOPIC_NAME"] || "libs-test-pubsub",
};
console.log(config);
const ps = new pubsub.PubSub();
const topic = ps.topic(config.topicName);
const messages = [];
for (let i = 0; i < 100; i++) {
const p = topic.publish(Buffer.from(""), { attr1: "123" })
.catch(err => {
console.log(err);
});
messages.push(p);
}
Promise.all(messages);
- Now, you should see something like this (see the video bellow as well)
First, you would rcv is 2 and prc is 0. Expected, as we allowed only 2 messages being processed (passed into message callback) by flow control. Then, prc becoming 2 (as 2 second timeout reached and we acked the message) and rcv is 4. Such stepped processing and updates should happen each 2 seconds. But, after some time, somehere in the range of maxExtension period deadline (when pubsub starting either extending messages periods or removing them from internal queue), rcv counter jumps up to 100, while prc is still 4, 6 or 8 depends when it happen, but the number is always small and smaller then 20, even 15.
https://user-images.githubusercontent.com/6842709/108252911-3b485400-716a-11eb-80ba-a0f19a82d901.mp4
This leads to falling assumption that, for example k8s pod which processing the messages and have resources limits by CPU or MEM might be evicted because it would try to process all the messages and would reach resources limits (would be throttled by CPU or Killed because of memory limits).
Now, where it is happeing.
I've added logs into library, so I definitely see that it following though _extendDeadlines
in lease-manager.ts.
Through else path in for loop, and calling this.remove(message)
;
And in remove it starting constantly falling into this.pending > 0
condition and dispensing current message (which actually already expired and should be redelivered by pubsub). It's obvious here, that such message should not be dispensed to the client code.
I'm not sure how to fix this in pubsub lib code, either as I don't think that there is a proper workaround for the client code (ours, as we are relying that flow control would be met).
@QuestionAndAnswer Thanks for the super detailed report. I'll have to go over this in more detail, but I'll mark it as a bug for now.
I like your editor's comment on the method. 😂 You must be kidding
@feywind so, how is it going? Did you have a chance to look into this issue?
@feywind we are running into the exact same issue in production.
Node 14 @google-cloud/pubsub version: 2.10.0
Running it in a similar env, docker container inside k8s.
Any progress here?
Quick (sadly content-less) update - no changes here yet, but I've been on a bit of a feature development rotation and hope to get back to these issues.
I stared at this for a while today, and I'm not as familiar with this bit of the library, but I don't know why we'd want to dispense messages from the pending queue just because another message was removed. It feels like there was a paradigm shift here that wasn't taken into account, but I'm going to ask some others.
Linked to the meta-issue about transport problems: b/242894947
Is there any way to bypass the problem? Our cluster is running out of memory and options regarding flowController don't seem to work. Maybe is there a way to stop subscribers to listen an enormous amount of messages pulled?
We have the same problem with @google-cloud/pubsub version: 3.4.1 . Any updates or workaround for this issue?
For those who were interested in a workaround:
subscription.setOptions({ flowControl: { maxMessages: 1, allowExcessMessages: false } })
(It could work without allowExcessMessages
, but I didn't test it without this option).
The point is that you need to set it on every connection to the broker. It looks like the setting is ignored when you use it in the subscription.create()
. So every time you create a topic on initialization and then listening to the queue, or you're subscribing to the topic that already exists - set it to this specific subscription/connection.
I've tested it on node:16.13-alpine
+ @google-cloud/[email protected]
and on the latest @google-cloud/[email protected]