nest icon indicating copy to clipboard operation
nest copied to clipboard

Kafka microservice can process only one message by topic at the moment

Open Klutrem opened this issue 2 years ago • 24 comments
trafficstars

Is there an existing issue for this?

  • [X] I have searched the existing issues

Current behavior

When sending any message on the topic by kafka, application getting stuck and doesn't process other messages until processing on the previous version will be finished

Minimum reproduction code

https://github.com/Klutrem/kafka-issue

Steps to reproduce

  • pull repository
  • run npm i
  • run kafka and specify port and host in envs, if it's not localhost:9092
  • run npm run start
  • send message to kafka topic test

Expected behavior

It was working on the previous versions:

{
    "name": "kafka-issue",
    "version": "1.0.0",
    "private": true,
    "license": "UNLICENSED",
    "scripts": {
      "build": "nest build",
      "start": "nest start",
      "start:dev": "nest start --watch",
      "start:prod": "node dist/main"
    },
    "dependencies": {
      "@nestjs/common": "^7.6.18",
      "@nestjs/config": "^1.0.0",
      "@nestjs/core": "^7.6.15",
      "@nestjs/mapped-types": "^1.2.2",
      "@nestjs/microservices": "^7.6.18",
      "kafkajs": "^1.15.0"
    },
    "devDependencies": {
      "@nestjs/cli": "^10.1.11",
      "@nestjs/schematics": "^7.3.0",
      "@nestjs/testing": "^7.6.18",
      "@swc/cli": "^0.1.62",
      "@swc/core": "1.3.75",
      "@types/node": "^14.17.5",
      "typescript": "^4.7.4",
      "unplugin-swc": "^1.3.2",
      "vitest": "^0.34.1"
    }
  }

With that package.json, application can process any number of messages at the same time, it doesn't stuck

Package

  • [x] I don't know. Or some 3rd-party package
  • [ ] @nestjs/common
  • [ ] @nestjs/core
  • [X] @nestjs/microservices
  • [ ] @nestjs/platform-express
  • [ ] @nestjs/platform-fastify
  • [ ] @nestjs/platform-socket.io
  • [ ] @nestjs/platform-ws
  • [ ] @nestjs/testing
  • [ ] @nestjs/websockets
  • [ ] Other (see below)

Other package

No response

NestJS version

10.2.7

Packages versions

{
    "name": "kafka-issue",
    "version": "1.0.0",
    "private": true,
    "license": "UNLICENSED",
    "scripts": {
      "build": "nest build",
      "start": "nest start",
      "start:dev": "nest start --watch",
      "start:prod": "node dist/main"
    },
    "dependencies": {
      "@nestjs/common": "^10.2.7",
      "@nestjs/config": "^3.1.1",
      "@nestjs/core": "^10.2.7",
      "@nestjs/mapped-types": "^2.0.2",
      "@nestjs/microservices": "^10.2.7",
      "kafkajs": "^2.2.4"
    },
    "devDependencies": {
      "@nestjs/cli": "^10.2.1",
      "@nestjs/schematics": "^10.0.3",
      "@nestjs/testing": "^10.2.7",
      "@swc/cli": "^0.1.62",
      "@swc/core": "1.3.95",
      "@types/node": "^20.8.10",
      "typescript": "^5.2.2",
      "unplugin-swc": "^1.4.3",
      "vitest": "^0.34.6"
    }
  }

Node.js version

18.16.0

In which operating systems have you tested?

  • [ ] macOS
  • [ ] Windows
  • [X] Linux

Other

No response

Klutrem avatar Nov 08 '23 10:11 Klutrem

Same.( Any solutions on the latest preset?

MaxKoval1ov avatar Dec 25 '23 01:12 MaxKoval1ov

I think this issue is missing some more detail to clarify whether this is expected behavior or not.

  1. Does your topic have multiple partitions?
  2. If there is more than one partition, have you tried setting https://kafka.js.org/docs/consuming#a-name-concurrent-processing-a-partition-aware-concurrency.

Kafka is intended to process messages in each partition in order, so it doesn't make sense to process messages later until you finish processing the ones in front.

edeesis avatar Jan 22 '24 18:01 edeesis

I think this issue is missing some more detail to clarify whether this is expected behavior or not.

  1. Does your topic have multiple partitions?
  2. If there is more than one partition, have you tried setting https://kafka.js.org/docs/consuming#a-name-concurrent-processing-a-partition-aware-concurrency.

Kafka is intended to process messages in each partition in order, so it doesn't make sense to process messages later until you finish processing the ones in front.

So, We've updated example repository, readme was added. https://github.com/Klutrem/kafka-issue

The problem is that consumer doesn't process message even on another topic if some message was sent to the one topic (when we send message to test1 topic, consumer doesn't process any messages on test2 topic)

Klutrem avatar Jan 24 '24 10:01 Klutrem

That's just how eachMessage for KafkaJS works. It processes each topic-partition sequentially one at a time. You need to set that partitionsConsumedConcurrently to get what you want.

You're saying this only started happening at an upgrade to 10.x, can you reproduce in another example repository with a lower NestJs version?

edeesis avatar Jan 24 '24 13:01 edeesis

So, I made another branch async-requests Firstly, it was working the same, even after update. But then, I provided APP_INTERCEPTOR as logger interceptor, that we're using in our microservices. And after that, on old version (7.x) it works asynchronously (you can see that on screenshot), but on 10.x it works only with one message at a time. Maybe the problem in our logger interceptor, or maybe in providing app interceptor, I don't know actually.

Klutrem avatar Jan 30 '24 12:01 Klutrem

AFAIR there was a bug in v7 (or v8) that instead of awaiting the eachMessage callback (of kafkajs) we let the message be processed asynchronously which caused the incorrect behavior. It's now fixed (hence the difference).

kamilmysliwiec avatar Jan 31 '24 07:01 kamilmysliwiec

It processes each topic-partition sequentially one at a time.

Nothing is mentioned about different topics in documentation of KafkaJS. For sure, messages in the same partition should be processed in order, and partitionsConsumedConcurrently will solve issue with making it asynchronously for different partitions.

Kafka only implies this about partitions but not topics. Imagine that we have two users:

  • First one loads his profile page
  • Second one loads main page of the site

If we're processing two different topics in sync so while one user waits response about his profile second one can get main page of the site.

AFAIR there was a bug in v7 (or v8) Yes, this was about in the part which related to the partitions but topics should be processed asynchronously, shouldn't they?

weissmall avatar Feb 08 '24 17:02 weissmall

Who said you can't process two topics in parallel though?

kamilmysliwiec avatar Feb 12 '24 07:02 kamilmysliwiec

Who said you can't process two topics in parallel though?

I guess there is misunderstanding because we're not talking about parallel computations but about concurrent.

Guys provided valid test-case in the async-requests branch where two of the requests should be processed asynchronously but they don't.

In the case of CPU-bound tasks and single-thread node for sure I cannot argue but in their case they have Promise that is gonna be returned as response on kafka message:

return new Promise<string>((resolve) => {
      setTimeout(() => {
        resolve("pong1");
        const time2 = new Date()
        console.log(`test1 took ${getTimeDifference(time1, time2)} secs`)
      }, 5000);
});

If we have two different topics with the same behaviour they should be processed concurrently but they don't.

and according to Node.JS loop such things can be ran concurrently.

weissmall avatar Feb 12 '24 09:02 weissmall

I'll investigate this as soon as I can, thank you @weissmall

kamilmysliwiec avatar Feb 12 '24 09:02 kamilmysliwiec

Update after debugging

File: microservices/server/server-kafka.ts Code:

  public async handleEvent(
    pattern: string,
    packet: ReadPacket,
    context: KafkaContext,
  ): Promise<any> {
    const handler = this.getHandlerByPattern(pattern);
    if (!handler) {
      return this.logger.error(NO_EVENT_HANDLER`${pattern}`);
    }
    const resultOrStream = await handler(packet.data, context);
    if (isObservable(resultOrStream)) {
      await lastValueFrom(resultOrStream); // Here reason of the issue
    }
  }

It seems a little bit strange but while we're awaiting lastValueFrom(resultOrStream) in the described above scenario we cannot process next message from any topic until we get new response.

I guess for this situation we need to have something kinda message queue for each topic to allow different topics be processed asynchronously.

If we won't await response we'll get issue with processing one partition asynchronously which is bad, so we need some workaround or maybe core issue may be somewhere deeper that I've discovered

weissmall avatar Feb 15 '24 15:02 weissmall

I'll investigate this as soon as I can, thank you @weissmall

Hi, any updates?

Klutrem avatar Mar 12 '24 12:03 Klutrem

I can reproduce this with raw kafkajs setup so it doesn't seem to be related to NestJS

import { Kafka } from "kafkajs";

const kafka = new Kafka({
  clientId: "my-app",
  brokers: ["localhost:9092"],
});

const consumer = kafka.consumer({ groupId: "test-group" });

const run = async () => {
  await consumer.connect();
  await consumer.subscribe({ topic: "test1" });
  await consumer.subscribe({ topic: "test2" });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log("Received message", {
        topic,
        partition,
        key: message.value.toString(),
      });
      await new Promise((resolve) => setTimeout(resolve, 5000));
      console.log("Processed message");
    },
  });
};

run().catch(console.error);

Result:

Received message { topic: 'test1', partition: 0, key: 'ping' }
Processed message
Received message { topic: 'test1', partition: 0, key: 'ping' }
Processed message
Received message { topic: 'test2', partition: 0, key: 'ping' }
Processed message
Received message { topic: 'test2', partition: 0, key: 'ping' }
Processed message
Received message { topic: 'test1', partition: 0, key: 'ping' }
Processed message
Received message { topic: 'test2', partition: 0, key: 'ping' }
Processed message
Received message { topic: 'test1', partition: 0, key: 'ping' }
Processed message
Received message { topic: 'test2', partition: 0, key: 'ping' }

kamilmysliwiec avatar Mar 18 '24 11:03 kamilmysliwiec

I'm running into same siutation ... any updates?

falkorotter avatar Mar 22 '24 11:03 falkorotter

Have you tried using an observable? https://github.com/nestjs/nest/issues/3954#issuecomment-579698979

mkaufmaner avatar Apr 01 '24 15:04 mkaufmaner

Have you tried using an observable? #3954 (comment)

Same image Am i doing something wrong?

goodhumored avatar Apr 25 '24 12:04 goodhumored

With using Blizzard's node-rdkafka everything works perfect

Code:

var Transform = require('stream').Transform;

const { randomInt } = require('crypto');
var Kafka = require('node-rdkafka');

var stream = Kafka.KafkaConsumer.createReadStream({
  'metadata.broker.list': 'localhost:9092',
  'group.id': 'test-group',
  'socket.keepalive.enable': true,
  'enable.auto.commit': true,
  "offset.store.method": 'broker',
}, {
  'auto.offset.reset': "end",
  "offset.store.method": 'broker',
  "enable.auto.commit": true,
  "auto.commit.interval.ms": 5000,
  "offset.store.sync.interval.ms": 5000,
}, {
  topics: ['test1', 'test2'],
  waitInterval: 0,
  objectMode: false
});

stream.on('error', function(err) {
  if (err) console.log(err);
  process.exit(1);
});

stream.on('error', function(err) {
  console.log(err);
  process.exit(1);
});

stream.consumer.on('event.error', function(err) {
  console.log(err);
})

stream.consumer.on('ready', (_) => {
  console.log('Ready');
})

stream.consumer.on('subscribed', (_) => {
  console.log('Subscribed')
})

stream.consumer.on('data', async ({ topic, value, partition }) => {
  const id = randomInt(500);
  console.log(`Got message with [id ${id}] at [${new Date()}]\n`, {
    topic, value: value.toString(), partition,
  })

  await new Promise((resolve) => setTimeout(resolve, 5000));
  console.log(`Processed message with [id ${id}] at [${new Date()}]`);
})

Received logs:

Got message with [id 453] at [Thu Apr 25 2024 18:43:20 GMT+0300 (Moscow Standard Time)]
 { topic: 'test2', value: 'ping', partition: 0 }
Got message with [id 333] at [Thu Apr 25 2024 18:43:20 GMT+0300 (Moscow Standard Time)]
 { topic: 'test2', value: 'ping', partition: 0 }
Got message with [id 159] at [Thu Apr 25 2024 18:43:20 GMT+0300 (Moscow Standard Time)]
 { topic: 'test2', value: 'ping', partition: 0 }
Got message with [id 390] at [Thu Apr 25 2024 18:43:20 GMT+0300 (Moscow Standard Time)]
 { topic: 'test2', value: 'ping', partition: 0 }
Got message with [id 437] at [Thu Apr 25 2024 18:43:21 GMT+0300 (Moscow Standard Time)]
 { topic: 'test1', value: 'ping', partition: 0 }
Got message with [id 155] at [Thu Apr 25 2024 18:43:21 GMT+0300 (Moscow Standard Time)]
 { topic: 'test1', value: 'ping', partition: 0 }
Got message with [id 348] at [Thu Apr 25 2024 18:43:21 GMT+0300 (Moscow Standard Time)]
 { topic: 'test1', value: 'ping', partition: 0 }
Got message with [id 247] at [Thu Apr 25 2024 18:43:21 GMT+0300 (Moscow Standard Time)]
 { topic: 'test1', value: 'ping', partition: 0 }
Got message with [id 74] at [Thu Apr 25 2024 18:43:21 GMT+0300 (Moscow Standard Time)]
 { topic: 'test1', value: 'ping', partition: 0 }
Got message with [id 374] at [Thu Apr 25 2024 18:43:22 GMT+0300 (Moscow Standard Time)]
 { topic: 'test2', value: 'ping', partition: 0 }
Got message with [id 393] at [Thu Apr 25 2024 18:43:22 GMT+0300 (Moscow Standard Time)]
 { topic: 'test2', value: 'ping', partition: 0 }
Got message with [id 414] at [Thu Apr 25 2024 18:43:22 GMT+0300 (Moscow Standard Time)]
 { topic: 'test2', value: 'ping', partition: 0 }
Got message with [id 179] at [Thu Apr 25 2024 18:43:22 GMT+0300 (Moscow Standard Time)]
 { topic: 'test2', value: 'ping', partition: 0 }
Got message with [id 154] at [Thu Apr 25 2024 18:43:23 GMT+0300 (Moscow Standard Time)]
 { topic: 'test2', value: 'ping', partition: 0 }
Got message with [id 486] at [Thu Apr 25 2024 18:43:23 GMT+0300 (Moscow Standard Time)]
 { topic: 'test1', value: 'ping', partition: 0 }
Got message with [id 480] at [Thu Apr 25 2024 18:43:24 GMT+0300 (Moscow Standard Time)]
 { topic: 'test1', value: 'ping', partition: 0 }
Got message with [id 182] at [Thu Apr 25 2024 18:43:24 GMT+0300 (Moscow Standard Time)]
 { topic: 'test1', value: 'ping', partition: 0 }
Got message with [id 300] at [Thu Apr 25 2024 18:43:24 GMT+0300 (Moscow Standard Time)]
 { topic: 'test1', value: 'ping', partition: 0 }
Processed message with [id 453] at [Thu Apr 25 2024 18:43:25 GMT+0300 (Moscow Standard Time)]
Processed message with [id 333] at [Thu Apr 25 2024 18:43:25 GMT+0300 (Moscow Standard Time)]
Processed message with [id 159] at [Thu Apr 25 2024 18:43:25 GMT+0300 (Moscow Standard Time)]
Processed message with [id 390] at [Thu Apr 25 2024 18:43:25 GMT+0300 (Moscow Standard Time)]
Processed message with [id 437] at [Thu Apr 25 2024 18:43:26 GMT+0300 (Moscow Standard Time)]
Processed message with [id 155] at [Thu Apr 25 2024 18:43:26 GMT+0300 (Moscow Standard Time)]
Processed message with [id 348] at [Thu Apr 25 2024 18:43:26 GMT+0300 (Moscow Standard Time)]
Processed message with [id 247] at [Thu Apr 25 2024 18:43:26 GMT+0300 (Moscow Standard Time)]
Processed message with [id 74] at [Thu Apr 25 2024 18:43:26 GMT+0300 (Moscow Standard Time)]
Processed message with [id 374] at [Thu Apr 25 2024 18:43:27 GMT+0300 (Moscow Standard Time)]
Processed message with [id 393] at [Thu Apr 25 2024 18:43:27 GMT+0300 (Moscow Standard Time)]
Processed message with [id 414] at [Thu Apr 25 2024 18:43:27 GMT+0300 (Moscow Standard Time)]
Processed message with [id 179] at [Thu Apr 25 2024 18:43:27 GMT+0300 (Moscow Standard Time)]
Processed message with [id 154] at [Thu Apr 25 2024 18:43:28 GMT+0300 (Moscow Standard Time)]
Processed message with [id 486] at [Thu Apr 25 2024 18:43:28 GMT+0300 (Moscow Standard Time)]
Processed message with [id 480] at [Thu Apr 25 2024 18:43:29 GMT+0300 (Moscow Standard Time)]
Processed message with [id 182] at [Thu Apr 25 2024 18:43:29 GMT+0300 (Moscow Standard Time)]
Processed message with [id 300] at [Thu Apr 25 2024 18:43:29 GMT+0300 (Moscow Standard Time)]

All messages are processing in the correct order and we can receive them in the time when we're processing previous messages

@kamilmysliwiec

weissmall avatar Apr 25 '24 15:04 weissmall

Would you like to create a PR replacing kafkajs with node-rdkafka?

kamilmysliwiec avatar Apr 30 '24 07:04 kamilmysliwiec

Would you like to create a PR replacing kafkajs with node-rdkafka?

@kamilmysliwiec This will be a breaking change but I can take a swing at it, it has been a while.

mkaufmaner avatar May 04 '24 16:05 mkaufmaner

Does it have to be a breaking change? Could it be introduced as a new transport type and mark the old one for deprecation?

edeesis avatar May 04 '24 16:05 edeesis

Does it have to be a breaking change? Could it be introduced as a new transport type and mark the old one for deprecation?

It can be implemented as a new transport which is what I will work towards first. @kamilmysliwiec let me know which direction you would want to take.

I am opening a new issue here; https://github.com/nestjs/nest/issues/13535

mkaufmaner avatar May 04 '24 16:05 mkaufmaner

Sounds good @mkaufmaner

kamilmysliwiec avatar May 08 '24 08:05 kamilmysliwiec

I think waiting for "combineStreamsAndThrowIfRetriable()" in the handleMessage method of the ServerKafka class might be an issue. When I changed the code to asynchronous, I was able to receive multiple messages before the previous message processing ended.

Another problem is that the HeartBeat operation cannot be performed, resulting in a problem of rebalancing because it is blocked while processing one message.

Do you have any updates planned to address these issues?

stitch20 avatar Aug 07 '24 07:08 stitch20

combineStreamsAndThrowIfRetriable is not an issue, you can reproduce this with raw kafkajs setup https://github.com/nestjs/nest/issues/12703#issuecomment-2003688863

kamilmysliwiec avatar Aug 07 '24 08:08 kamilmysliwiec