terminus icon indicating copy to clipboard operation
terminus copied to clipboard

Kafka Microservice Consumer Healthcheck

Open edeesis opened this issue 1 year ago • 2 comments

Is there an existing issue that is already proposing this?

  • [X] I have searched the existing issues

Is your feature request related to a problem? Please describe it

The Microservice Health Indicator only checks whether a producer can connect to Kafka, but I’m also interested in the state of the consumer. If the consumer has crashed, then I want the health indicator to fail.

unfortunately, implementing this myself I ran into a few issues:

  1. accessing the ServerKafka instance, which contains the consumer, doesn’t seem possible in the context of a health indicator
  2. there’s no easy way to subscribe to instrumentation events, see https://github.com/nestjs/nest/issues/11616

Describe the solution you'd like

I’d like the Microservice Health Indicator to also check the status of the Server side, not just the client side.

Teachability, documentation, adoption, migration strategy

The usage should be the same as the existing MicroserviceHealthIndicator, though it shouldn’t require instantiating a new instance of ServerKafka, it should use the existing one.

What is the motivation / use case for changing the behavior?

I need to be able to know if the consumer has crashed and include that information in a liveness check.

edeesis avatar Jun 13 '24 02:06 edeesis

My workaround I've found is to move the custom microservice implementation into a module in the application context, add a public method to expose the consumer and then use app.resolve to fetch it to run connectMicroservice:

export class KafkaMicroserviceServer extends ServerKafka {

    getConsumer(): Consumer {
      return this.consumer;
    }
export class KafkaConsumerHealthIndicator extends HealthIndicator implements OnModuleInit {
  private readonly crashEvents: { [groupId: string]: ConsumerCrashEvent | undefined } = {};

  constructor(
    private readonly kafkaMicroserviceServer: KafkaMicroserviceServer
  ) {
    super();
  }

  async onModuleInit(): Promise<void> {
    const consumer = this.kafkaMicroserviceServer.getConsumer();
    const { groupId } = await consumer.describeGroup();
    this.crashEvents[groupId] = undefined;
    consumer.on(consumer.events.CRASH, (event) => {
      this.crashEvents[event.payload.groupId] = event;
    });
  }
    const kafkaMicroserviceServer = await app.resolve(KafkaMicroserviceServer);
    app.connectMicroservice({
      strategy: kafkaMicroserviceServer,
    });

edeesis avatar Jun 20 '24 13:06 edeesis

Do have anyway to health check over HTTP in microservice application (not hybird or http)? I want to health check from k8s or docker container.

doanthai avatar Oct 22 '24 11:10 doanthai