nest icon indicating copy to clipboard operation
nest copied to clipboard

Add support for Kafka Instrumentation events

Open marinakurtin opened this issue 1 year ago • 7 comments

Is there an existing issue that is already proposing this?

  • [X] I have searched the existing issues

Is your feature request related to a problem? Please describe it

I need an ability to monitor the events of my Kafka consumer

As far as I know it's already supported in the Kafka.js

I added the documentations reference in the documentation section below

I succeeded to find a workaround solution. On receive of the first message of kafka, I can extract the reference of the Consumer object and add listeners on it.

Here is code example

@EventPattern('kafkaTopic')
 handler(@Ctx() context: KafkaContext) {
  if (!this.consumer) {
    this.consumer = context.getConsumer();
    this.consumer.on('consumer.heartbeat', (event) => {
      console.log("HEARTBEAT event", event);
    });
  }
//business logic }

But in my opinion, it's a bad solution, because that way requires to check on each message if it's already handled or not.

Describe the solution you'd like

I would an ability to connect the Instrumentation events.

I would like to have a decorator, which would receive the name of the event and connect to it.

Or

I would like to have an ability to add it into the options object of the NestFactory.createMicroservice method

Teachability, documentation, adoption, migration strategy

https://kafka.js.org/docs/instrumentation-events

What is the motivation / use case for changing the behavior?

The ability to monitor the consumer behaviour

marinakurtin avatar May 14 '23 18:05 marinakurtin

Would like to see this implemented too I had to hack it:

export class CustomKafkaServer extends ServerKafka {
            async start(callback): Promise<void> {
                const consumerOptions = Object.assign(this.options.consumer || {}, {
                  groupId: this.groupId,
                });
                this.consumer = this.client.consumer(consumerOptions);
                this.producer = this.client.producer(this.options.producer);
                this.registerEvents(this.instrumentationEvents);  <--- add events here
                await this.consumer.connect();
                await this.producer.connect();
                await this.bindEvents(this.consumer);
                callback();
              }
}

david-badalov avatar May 16 '23 13:05 david-badalov

I created a PR to address crashing consumers based on the Instrumentation Events. Maybe it could be handle more generally.

Here is the related PR: https://github.com/nestjs/nest/pull/11910

esahin90 avatar Jun 27 '23 11:06 esahin90

So far the only way to accomplish this is to use a custom strategy that extends the built-in transporter. Example:

import { ServerKafka } from '@nestjs/microservices';
import { Consumer } from '@nestjs/microservices/external/kafka.interface';

class MyCustomStrategy extends ServerKafka {
  async bindEvents(consumer: Consumer) {
    consumer.on('consumer.heartbeat', () => console.log('...'));
    await super.bindEvents(consumer);
  }
}

// and later
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  strategy: new MyCustomStrategy({ ...kafka options here }),
});

kamilmysliwiec avatar Jun 28 '23 07:06 kamilmysliwiec

Hey @micalevisk @johnbiundo @jmcdo29 @BrunnerLivio @MarkPieszak @valorkin, We're big fans of NestJS at our organization! :rocket: We've submitted a PR that adds kafkajs instrumentation events to NestJS, addressing a current gap. We'd be grateful if you could review it when you have time. Thanks!

https://github.com/nestjs/nest/pull/12012

david-badalov avatar Jul 25 '23 11:07 david-badalov

Any updates regarding the review of the PR? This feature is crucial for a production environment, since you could get into the state, where the consumer dies, but your nest-service is still running without any issues. When a consumer crashes, the service would not be consuming anymore messages from kafka, but the service is still alive. I would not recommend to use the kafka consumer implementation in production until your cable of listening to consumer CRASH events or at least the nest-service carries over the failure.

I tried to address the consumer crash problem in a different PR, where the nestjs kafka consumer is always listening to these event in #11910

esahin90 avatar Oct 02 '23 11:10 esahin90

This feature isn't crucial as there's currently a different way to register event listeners. We're still debating if the approach/API proposed in this PR is actually simpler than what you can do now @esahin90 (https://github.com/nestjs/nest/issues/11616#issuecomment-1610883351)

kamilmysliwiec avatar Oct 03 '23 10:10 kamilmysliwiec

I think that we shouldn't add new features related with kafka.js due to #13223

micalevisk avatar Mar 20 '24 11:03 micalevisk