nest
nest copied to clipboard
Kafka microservice can process only one message by topic at the moment
Is there an existing issue for this?
- [X] I have searched the existing issues
Current behavior
When sending any message on the topic by kafka, application getting stuck and doesn't process other messages until processing on the previous version will be finished
Minimum reproduction code
https://github.com/Klutrem/kafka-issue
Steps to reproduce
- pull repository
- run
npm i - run kafka and specify port and host in envs, if it's not
localhost:9092 - run
npm run start - send message to kafka topic
test
Expected behavior
It was working on the previous versions:
{
"name": "kafka-issue",
"version": "1.0.0",
"private": true,
"license": "UNLICENSED",
"scripts": {
"build": "nest build",
"start": "nest start",
"start:dev": "nest start --watch",
"start:prod": "node dist/main"
},
"dependencies": {
"@nestjs/common": "^7.6.18",
"@nestjs/config": "^1.0.0",
"@nestjs/core": "^7.6.15",
"@nestjs/mapped-types": "^1.2.2",
"@nestjs/microservices": "^7.6.18",
"kafkajs": "^1.15.0"
},
"devDependencies": {
"@nestjs/cli": "^10.1.11",
"@nestjs/schematics": "^7.3.0",
"@nestjs/testing": "^7.6.18",
"@swc/cli": "^0.1.62",
"@swc/core": "1.3.75",
"@types/node": "^14.17.5",
"typescript": "^4.7.4",
"unplugin-swc": "^1.3.2",
"vitest": "^0.34.1"
}
}
With that package.json, application can process any number of messages at the same time, it doesn't stuck
Package
- [x] I don't know. Or some 3rd-party package
- [ ]
@nestjs/common - [ ]
@nestjs/core - [X]
@nestjs/microservices - [ ]
@nestjs/platform-express - [ ]
@nestjs/platform-fastify - [ ]
@nestjs/platform-socket.io - [ ]
@nestjs/platform-ws - [ ]
@nestjs/testing - [ ]
@nestjs/websockets - [ ] Other (see below)
Other package
No response
NestJS version
10.2.7
Packages versions
{
"name": "kafka-issue",
"version": "1.0.0",
"private": true,
"license": "UNLICENSED",
"scripts": {
"build": "nest build",
"start": "nest start",
"start:dev": "nest start --watch",
"start:prod": "node dist/main"
},
"dependencies": {
"@nestjs/common": "^10.2.7",
"@nestjs/config": "^3.1.1",
"@nestjs/core": "^10.2.7",
"@nestjs/mapped-types": "^2.0.2",
"@nestjs/microservices": "^10.2.7",
"kafkajs": "^2.2.4"
},
"devDependencies": {
"@nestjs/cli": "^10.2.1",
"@nestjs/schematics": "^10.0.3",
"@nestjs/testing": "^10.2.7",
"@swc/cli": "^0.1.62",
"@swc/core": "1.3.95",
"@types/node": "^20.8.10",
"typescript": "^5.2.2",
"unplugin-swc": "^1.4.3",
"vitest": "^0.34.6"
}
}
Node.js version
18.16.0
In which operating systems have you tested?
- [ ] macOS
- [ ] Windows
- [X] Linux
Other
No response
Same.( Any solutions on the latest preset?
I think this issue is missing some more detail to clarify whether this is expected behavior or not.
- Does your topic have multiple partitions?
- If there is more than one partition, have you tried setting https://kafka.js.org/docs/consuming#a-name-concurrent-processing-a-partition-aware-concurrency.
Kafka is intended to process messages in each partition in order, so it doesn't make sense to process messages later until you finish processing the ones in front.
I think this issue is missing some more detail to clarify whether this is expected behavior or not.
- Does your topic have multiple partitions?
- If there is more than one partition, have you tried setting https://kafka.js.org/docs/consuming#a-name-concurrent-processing-a-partition-aware-concurrency.
Kafka is intended to process messages in each partition in order, so it doesn't make sense to process messages later until you finish processing the ones in front.
So, We've updated example repository, readme was added. https://github.com/Klutrem/kafka-issue
The problem is that consumer doesn't process message even on another topic if some message was sent to the one topic (when we send message to test1 topic, consumer doesn't process any messages on test2 topic)
That's just how eachMessage for KafkaJS works. It processes each topic-partition sequentially one at a time. You need to set that partitionsConsumedConcurrently to get what you want.
You're saying this only started happening at an upgrade to 10.x, can you reproduce in another example repository with a lower NestJs version?
So, I made another branch async-requests
Firstly, it was working the same, even after update. But then, I provided APP_INTERCEPTOR as logger interceptor, that we're using in our microservices. And after that, on old version (7.x) it works asynchronously (you can see that on screenshot), but on 10.x it works only with one message at a time. Maybe the problem in our logger interceptor, or maybe in providing app interceptor, I don't know actually.
AFAIR there was a bug in v7 (or v8) that instead of awaiting the eachMessage callback (of kafkajs) we let the message be processed asynchronously which caused the incorrect behavior. It's now fixed (hence the difference).
It processes each topic-partition sequentially one at a time.
Nothing is mentioned about different topics in documentation of KafkaJS. For sure, messages in the same partition should be processed in order, and partitionsConsumedConcurrently will solve issue with making it asynchronously for different partitions.
Kafka only implies this about partitions but not topics. Imagine that we have two users:
- First one loads his profile page
- Second one loads main page of the site
If we're processing two different topics in sync so while one user waits response about his profile second one can get main page of the site.
AFAIR there was a bug in v7 (or v8) Yes, this was about in the part which related to the partitions but topics should be processed asynchronously, shouldn't they?
Who said you can't process two topics in parallel though?
Who said you can't process two topics in parallel though?
I guess there is misunderstanding because we're not talking about parallel computations but about concurrent.
Guys provided valid test-case in the async-requests branch where two of the requests should be processed asynchronously but they don't.
In the case of CPU-bound tasks and single-thread node for sure I cannot argue but in their case they have Promise that is gonna be returned as response on kafka message:
return new Promise<string>((resolve) => {
setTimeout(() => {
resolve("pong1");
const time2 = new Date()
console.log(`test1 took ${getTimeDifference(time1, time2)} secs`)
}, 5000);
});
If we have two different topics with the same behaviour they should be processed concurrently but they don't.
and according to Node.JS loop such things can be ran concurrently.
I'll investigate this as soon as I can, thank you @weissmall
Update after debugging
File: microservices/server/server-kafka.ts
Code:
public async handleEvent(
pattern: string,
packet: ReadPacket,
context: KafkaContext,
): Promise<any> {
const handler = this.getHandlerByPattern(pattern);
if (!handler) {
return this.logger.error(NO_EVENT_HANDLER`${pattern}`);
}
const resultOrStream = await handler(packet.data, context);
if (isObservable(resultOrStream)) {
await lastValueFrom(resultOrStream); // Here reason of the issue
}
}
It seems a little bit strange but while we're awaiting lastValueFrom(resultOrStream) in the described above scenario we cannot process next message from any topic until we get new response.
I guess for this situation we need to have something kinda message queue for each topic to allow different topics be processed asynchronously.
If we won't await response we'll get issue with processing one partition asynchronously which is bad, so we need some workaround or maybe core issue may be somewhere deeper that I've discovered
I'll investigate this as soon as I can, thank you @weissmall
Hi, any updates?
I can reproduce this with raw kafkajs setup so it doesn't seem to be related to NestJS
import { Kafka } from "kafkajs";
const kafka = new Kafka({
clientId: "my-app",
brokers: ["localhost:9092"],
});
const consumer = kafka.consumer({ groupId: "test-group" });
const run = async () => {
await consumer.connect();
await consumer.subscribe({ topic: "test1" });
await consumer.subscribe({ topic: "test2" });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log("Received message", {
topic,
partition,
key: message.value.toString(),
});
await new Promise((resolve) => setTimeout(resolve, 5000));
console.log("Processed message");
},
});
};
run().catch(console.error);
Result:
Received message { topic: 'test1', partition: 0, key: 'ping' }
Processed message
Received message { topic: 'test1', partition: 0, key: 'ping' }
Processed message
Received message { topic: 'test2', partition: 0, key: 'ping' }
Processed message
Received message { topic: 'test2', partition: 0, key: 'ping' }
Processed message
Received message { topic: 'test1', partition: 0, key: 'ping' }
Processed message
Received message { topic: 'test2', partition: 0, key: 'ping' }
Processed message
Received message { topic: 'test1', partition: 0, key: 'ping' }
Processed message
Received message { topic: 'test2', partition: 0, key: 'ping' }
I'm running into same siutation ... any updates?
Have you tried using an observable? https://github.com/nestjs/nest/issues/3954#issuecomment-579698979
With using Blizzard's node-rdkafka everything works perfect
Code:
var Transform = require('stream').Transform;
const { randomInt } = require('crypto');
var Kafka = require('node-rdkafka');
var stream = Kafka.KafkaConsumer.createReadStream({
'metadata.broker.list': 'localhost:9092',
'group.id': 'test-group',
'socket.keepalive.enable': true,
'enable.auto.commit': true,
"offset.store.method": 'broker',
}, {
'auto.offset.reset': "end",
"offset.store.method": 'broker',
"enable.auto.commit": true,
"auto.commit.interval.ms": 5000,
"offset.store.sync.interval.ms": 5000,
}, {
topics: ['test1', 'test2'],
waitInterval: 0,
objectMode: false
});
stream.on('error', function(err) {
if (err) console.log(err);
process.exit(1);
});
stream.on('error', function(err) {
console.log(err);
process.exit(1);
});
stream.consumer.on('event.error', function(err) {
console.log(err);
})
stream.consumer.on('ready', (_) => {
console.log('Ready');
})
stream.consumer.on('subscribed', (_) => {
console.log('Subscribed')
})
stream.consumer.on('data', async ({ topic, value, partition }) => {
const id = randomInt(500);
console.log(`Got message with [id ${id}] at [${new Date()}]\n`, {
topic, value: value.toString(), partition,
})
await new Promise((resolve) => setTimeout(resolve, 5000));
console.log(`Processed message with [id ${id}] at [${new Date()}]`);
})
Received logs:
Got message with [id 453] at [Thu Apr 25 2024 18:43:20 GMT+0300 (Moscow Standard Time)]
{ topic: 'test2', value: 'ping', partition: 0 }
Got message with [id 333] at [Thu Apr 25 2024 18:43:20 GMT+0300 (Moscow Standard Time)]
{ topic: 'test2', value: 'ping', partition: 0 }
Got message with [id 159] at [Thu Apr 25 2024 18:43:20 GMT+0300 (Moscow Standard Time)]
{ topic: 'test2', value: 'ping', partition: 0 }
Got message with [id 390] at [Thu Apr 25 2024 18:43:20 GMT+0300 (Moscow Standard Time)]
{ topic: 'test2', value: 'ping', partition: 0 }
Got message with [id 437] at [Thu Apr 25 2024 18:43:21 GMT+0300 (Moscow Standard Time)]
{ topic: 'test1', value: 'ping', partition: 0 }
Got message with [id 155] at [Thu Apr 25 2024 18:43:21 GMT+0300 (Moscow Standard Time)]
{ topic: 'test1', value: 'ping', partition: 0 }
Got message with [id 348] at [Thu Apr 25 2024 18:43:21 GMT+0300 (Moscow Standard Time)]
{ topic: 'test1', value: 'ping', partition: 0 }
Got message with [id 247] at [Thu Apr 25 2024 18:43:21 GMT+0300 (Moscow Standard Time)]
{ topic: 'test1', value: 'ping', partition: 0 }
Got message with [id 74] at [Thu Apr 25 2024 18:43:21 GMT+0300 (Moscow Standard Time)]
{ topic: 'test1', value: 'ping', partition: 0 }
Got message with [id 374] at [Thu Apr 25 2024 18:43:22 GMT+0300 (Moscow Standard Time)]
{ topic: 'test2', value: 'ping', partition: 0 }
Got message with [id 393] at [Thu Apr 25 2024 18:43:22 GMT+0300 (Moscow Standard Time)]
{ topic: 'test2', value: 'ping', partition: 0 }
Got message with [id 414] at [Thu Apr 25 2024 18:43:22 GMT+0300 (Moscow Standard Time)]
{ topic: 'test2', value: 'ping', partition: 0 }
Got message with [id 179] at [Thu Apr 25 2024 18:43:22 GMT+0300 (Moscow Standard Time)]
{ topic: 'test2', value: 'ping', partition: 0 }
Got message with [id 154] at [Thu Apr 25 2024 18:43:23 GMT+0300 (Moscow Standard Time)]
{ topic: 'test2', value: 'ping', partition: 0 }
Got message with [id 486] at [Thu Apr 25 2024 18:43:23 GMT+0300 (Moscow Standard Time)]
{ topic: 'test1', value: 'ping', partition: 0 }
Got message with [id 480] at [Thu Apr 25 2024 18:43:24 GMT+0300 (Moscow Standard Time)]
{ topic: 'test1', value: 'ping', partition: 0 }
Got message with [id 182] at [Thu Apr 25 2024 18:43:24 GMT+0300 (Moscow Standard Time)]
{ topic: 'test1', value: 'ping', partition: 0 }
Got message with [id 300] at [Thu Apr 25 2024 18:43:24 GMT+0300 (Moscow Standard Time)]
{ topic: 'test1', value: 'ping', partition: 0 }
Processed message with [id 453] at [Thu Apr 25 2024 18:43:25 GMT+0300 (Moscow Standard Time)]
Processed message with [id 333] at [Thu Apr 25 2024 18:43:25 GMT+0300 (Moscow Standard Time)]
Processed message with [id 159] at [Thu Apr 25 2024 18:43:25 GMT+0300 (Moscow Standard Time)]
Processed message with [id 390] at [Thu Apr 25 2024 18:43:25 GMT+0300 (Moscow Standard Time)]
Processed message with [id 437] at [Thu Apr 25 2024 18:43:26 GMT+0300 (Moscow Standard Time)]
Processed message with [id 155] at [Thu Apr 25 2024 18:43:26 GMT+0300 (Moscow Standard Time)]
Processed message with [id 348] at [Thu Apr 25 2024 18:43:26 GMT+0300 (Moscow Standard Time)]
Processed message with [id 247] at [Thu Apr 25 2024 18:43:26 GMT+0300 (Moscow Standard Time)]
Processed message with [id 74] at [Thu Apr 25 2024 18:43:26 GMT+0300 (Moscow Standard Time)]
Processed message with [id 374] at [Thu Apr 25 2024 18:43:27 GMT+0300 (Moscow Standard Time)]
Processed message with [id 393] at [Thu Apr 25 2024 18:43:27 GMT+0300 (Moscow Standard Time)]
Processed message with [id 414] at [Thu Apr 25 2024 18:43:27 GMT+0300 (Moscow Standard Time)]
Processed message with [id 179] at [Thu Apr 25 2024 18:43:27 GMT+0300 (Moscow Standard Time)]
Processed message with [id 154] at [Thu Apr 25 2024 18:43:28 GMT+0300 (Moscow Standard Time)]
Processed message with [id 486] at [Thu Apr 25 2024 18:43:28 GMT+0300 (Moscow Standard Time)]
Processed message with [id 480] at [Thu Apr 25 2024 18:43:29 GMT+0300 (Moscow Standard Time)]
Processed message with [id 182] at [Thu Apr 25 2024 18:43:29 GMT+0300 (Moscow Standard Time)]
Processed message with [id 300] at [Thu Apr 25 2024 18:43:29 GMT+0300 (Moscow Standard Time)]
All messages are processing in the correct order and we can receive them in the time when we're processing previous messages
@kamilmysliwiec
Would you like to create a PR replacing kafkajs with node-rdkafka?
Would you like to create a PR replacing kafkajs with node-rdkafka?
@kamilmysliwiec This will be a breaking change but I can take a swing at it, it has been a while.
Does it have to be a breaking change? Could it be introduced as a new transport type and mark the old one for deprecation?
Does it have to be a breaking change? Could it be introduced as a new transport type and mark the old one for deprecation?
It can be implemented as a new transport which is what I will work towards first. @kamilmysliwiec let me know which direction you would want to take.
I am opening a new issue here; https://github.com/nestjs/nest/issues/13535
Sounds good @mkaufmaner
I think waiting for "combineStreamsAndThrowIfRetriable()" in the handleMessage method of the ServerKafka class might be an issue. When I changed the code to asynchronous, I was able to receive multiple messages before the previous message processing ended.
Another problem is that the HeartBeat operation cannot be performed, resulting in a problem of rebalancing because it is blocked while processing one message.
Do you have any updates planned to address these issues?
combineStreamsAndThrowIfRetriable is not an issue, you can reproduce this with raw kafkajs setup https://github.com/nestjs/nest/issues/12703#issuecomment-2003688863