kafkajs icon indicating copy to clipboard operation
kafkajs copied to clipboard

Consumer stop processing messages

Open arthurgrigoletto opened this issue 4 years ago • 27 comments

I have a consumer application deployed on AWS that store data on mysql database. But eventually without any error my consumer stop processing messages and I have to restart the aws's task consumer to work again. Am I doing something wrong?

import { Kafka, logLevel } from 'kafkajs';
import kafkaConfig from './config/kafka';

const kafka = new Kafka({
  clientId: 'cdr-consumer',
  brokers: [`${kafkaConfig.host}:${kafkaConfig.port}`],
});

const consumer = kafka.consumer({
  groupId: kafkaConfig.topic,
  retry: {
    // Try to reconnect after 10seg
    initialRetryTime: 10 * 1000,
    retries: 10,
  },
  heartbeatInterval: 25000,
});

const run = async () => {
  await consumer.connect();
  await consumer.subscribe({ topic: kafkaConfig.topic });

  console.log(`Kafka topic: ${kafkaConfig.topic}`);

  await consumer.run({
    eachMessage: async ({ message }) => {
      // Store on database
      await CdrController.store(message);
    },
  });
};

run().catch(console.error);

arthurgrigoletto avatar Mar 10 '20 17:03 arthurgrigoletto

Hi, this can happen if your consumer crash with some kind of errors. I would suggest something like:

consumer.on('consumer.crash', event => {
      const error = event?.payload?.error
      this.crashHandler(error)
})

And crash handler:

private async crashHandler(error: any) {
    // This logic is based on kafkajs implementation: https://github.com/tulios/kafkajs/blob/master/src/consumer/index.js#L257
    if (error && error.name !== 'KafkaJSNumberOfRetriesExceeded' && error.retriable !== true) {
      await this.restartConsumer()
    }
  }

I found another case when consumer stops consuming. See my issue #660

ghost avatar Mar 12 '20 05:03 ghost

I already tried monitoring crash event, but the application didn't crash, just stop consuming. Is it some kind a problem with connection between consumer / brokres on AWS?

arthurgrigoletto avatar Mar 12 '20 17:03 arthurgrigoletto

@arthurgrigoletto also this issue https://github.com/tulios/kafkajs/issues/660 it is happening also in aws, no crash and consumer stop consuming after restarting one of the brokers. The only visible log (when debug mode for kafkajs is on) is that it will disconnect from other brokers also after 10 minutes and fetch will never end...

ghost avatar Mar 14 '20 13:03 ghost

Do you have any logs or any more information to share? It would be helpful to have access to debug logs.

How do you know that your application stops consuming messages? How are you deploying it (I've seen people deploying Kafka consumers into Lambdas and then being surprised when they stop working after a while...)?

Nevon avatar Mar 14 '20 15:03 Nevon

So, I put logLevel.DEBUG to figure out and get this last messages:

08:13:27 - {"level":"DEBUG","timestamp":"2020-03-18T11:13:27.541Z","logger":"kafkajs","message":"[Connection] disconnecting...","broker":"<BROKER_URL","clientId":"cdr-consumer"}
08:13:27 - {"level":"DEBUG","timestamp":"2020-03-18T11:13:27.541Z","logger":"kafkajs","message":"[Connection] disconnected","broker":"<BROKER_URL>","clientId":"cdr-consumer"}
08:13:27 - {"level":"DEBUG","timestamp":"2020-03-18T11:13:27.541Z","logger":"kafkajs","message":"[Connection] Kafka server has closed connection","broker":"<BROKER_URL>","clientId":"cdr-consumer"}

The last successful message was in 08:03:43, but the application continued to send logs just consumer stopped listen. I tried to log some events like STOP, DISCONNECT, CRASH but didn't log anything.

About the deploy @Nevon, it's not a Lambda. It is a container that is running on EC2 instance. I don't know more about it, I'm talking with our devops.

arthurgrigoletto avatar Mar 18 '20 12:03 arthurgrigoletto

Looks exactly same as #660 @arthurgrigoletto I guess you are using confluent kafka?

ghost avatar Mar 18 '20 12:03 ghost

@dusan-dragon do you mean confluent as docker image?

arthurgrigoletto avatar Mar 18 '20 15:03 arthurgrigoletto

@dusan-dragon do you mean confluent as docker image?

https://www.confluent.io/

ghost avatar Mar 18 '20 22:03 ghost

@dusan-dragon do you mean confluent as docker image?

https://www.confluent.io/

No, I'm not using that

arthurgrigoletto avatar Mar 19 '20 15:03 arthurgrigoletto

Using MSK on AWS, I can repeatedly observe this weird behaviour - consumers sometimes fail to rejoin the group after MSK hiccups (restart and/or rebalance).

apache kafka 2.4.1 running on AWS MSK kafkajs 1.13.0-beta.62

2020-08-26T11:12:11.158Z - Response OffsetCommit(key: 8, version: 3) - {"broker":"b-1.xxx","error":"This is not the correct coordinator for this group"}
2020-08-26T11:12:11.158Z - Response JoinGroup(key: 11, version: 4) - {"broker":"b-2.xxx","error":"The coordinator is not aware of this member"}
2020-08-26T11:12:11.159Z - Crash: KafkaJSProtocolError: The coordinator is not aware of this member
2020-08-26T11:12:13.457Z - Response OffsetCommit(key: 8, version: 3) - {"broker":"b-1.xxx","error":"This is not the correct coordinator for this group"}
2020-08-26T11:12:13.459Z - Response Heartbeat(key: 12, version: 1) - {"broker":"b-2.xxx","error":"The coordinator is not aware of this member"}
2020-08-26T11:12:13.464Z - Response LeaveGroup(key: 13, version: 1) - {"broker":"b-2.xxx","error":"The coordinator is not aware of this member"}
2020-08-26T11:12:13.464Z - Stopped - {"groupId":"commons"}

Why? 🤷‍♂️

pawelrychlik avatar Aug 26 '20 12:08 pawelrychlik

We do have the same issue using AWS MSK with apache Kafka 2.4.1.1. I'll add more debugging tomorrow and will post the logs here. overall this seem to be very strange as the other consumers we have utilizing github.com/segmentio/kafka-go do not seem having those issues.

janwiemers avatar Nov 11 '20 19:11 janwiemers

good morning, those are the logs I see a lot

{"level":"DEBUG","timestamp":"2020-11-12T07:12:31.937Z","logger":"kafkajs","message":"[Connection] Request Heartbeat(key: 12, version: 1)","broker":"b-1.my-address.c2.kafka.us-east-1.amazonaws.com:9092","clientId":"my-client","correlationId":11,"expectResponse":true,"size":161}
{"level":"ERROR","timestamp":"2020-11-12T07:12:31.938Z","logger":"kafkajs","message":"[Connection] Response Heartbeat(key: 12, version: 1)","broker":"b-1.my-address.c2.kafka.us-east-1.amazonaws.com:9092","clientId":"my-client","error":"The group is rebalancing, so a rejoin is needed","correlationId":11,"size":10}
{"level":"DEBUG","timestamp":"2020-11-12T07:12:31.938Z","logger":"kafkajs","message":"[Connection] Response Heartbeat(key: 12, version: 1)","broker":"b-1.my-address.c2.kafka.us-east-1.amazonaws.com:9092","clientId":"my-client","error":"The group is rebalancing, so a rejoin is needed","correlationId":11,"payload":{"type":"Buffer","data":"[filtered]"}}
{"level":"DEBUG","timestamp":"2020-11-12T07:18:40.625Z","logger":"kafkajs","message":"[Connection] Kafka server has closed connection","broker":"b-3.my-address.c2.kafka.us-east-1.amazonaws.com:9092","clientId":"my-client"}
{"level":"DEBUG","timestamp":"2020-11-12T07:18:40.626Z","logger":"kafkajs","message":"[Connection] disconnecting...","broker":"b-3.my-address.c2.kafka.us-east-1.amazonaws.com:9092","clientId":"my-client"}
{"level":"DEBUG","timestamp":"2020-11-12T07:18:40.626Z","logger":"kafkajs","message":"[Connection] disconnected","broker":"b-3.my-address.c2.kafka.us-east-1.amazonaws.com:9092","clientId":"my-client"}
{"level":"DEBUG","timestamp":"2020-11-12T07:19:00.419Z","logger":"kafkajs","message":"[Connection] Kafka server has closed connection","broker":"b-3.my-address.c2.kafka.us-east-1.amazonaws.com:9092","clientId":"my-client"}
{"level":"DEBUG","timestamp":"2020-11-12T07:19:00.419Z","logger":"kafkajs","message":"[Connection] disconnecting...","broker":"b-3.my-address.c2.kafka.us-east-1.amazonaws.com:9092","clientId":"my-client"}
{"level":"DEBUG","timestamp":"2020-11-12T07:19:00.419Z","logger":"kafkajs","message":"[Connection] disconnected","broker":"b-3.my-address.c2.kafka.us-east-1.amazonaws.com:9092","clientId":"my-client"}

and this even more


{"message":{"msg":"start: my-message"},"level":"info","timestamp":"2020-11-12T07:21:46.516Z"}
{"level":"DEBUG","timestamp":"2020-11-12T07:21:54.882Z","logger":"kafkajs","message":"[Connection] Request Produce(key: 0, version: 7)","broker":"b-1.my-address.jj9wbs.c2.kafka.us-east-1.amazonaws.com:9092","clientId":"my-consumer","correlationId":19,"expectResponse":true,"size":814}
{"level":"DEBUG","timestamp":"2020-11-12T07:21:54.900Z","logger":"kafkajs","message":"[Connection] Response Produce(key: 0, version: 7)","broker":"b-1.my-address.jj9wbs.c2.kafka.us-east-1.amazonaws.com:9092","clientId":"my-consumer","correlationId":19,"size":79,"data":{"topics":[{"topicName":"my-topic","partitions":[{"partition":0,"errorCode":0,"baseOffset":"186581","logAppendTime":"-1","logStartOffset":"178437"}]}],"throttleTime":0,"clientSideThrottleTime":0}}
{"message":{"msg":"done: my-message"},"level":"info","timestamp":"2020-11-12T07:21:54.900Z"}
{"level":"DEBUG","timestamp":"2020-11-12T07:21:54.900Z","logger":"kafkajs","message":"[Connection] Request Heartbeat(key: 12, version: 1)","broker":"b-1.my-address.jj9wbs.c2.kafka.us-east-1.amazonaws.com:9092","clientId":"my-consumer","correlationId":67,"expectResponse":true,"size":161}
{"level":"DEBUG","timestamp":"2020-11-12T07:21:54.903Z","logger":"kafkajs","message":"[Connection] Response Heartbeat(key: 12, version: 1)","broker":"b-1.my-address.jj9wbs.c2.kafka.us-east-1.amazonaws.com:9092","clientId":"my-consumer","correlationId":67,"size":10,"data":{"throttleTime":0,"errorCode":0}}
{"message":{"msg":"start: my-message-2"},"level":"info","timestamp":"2020-11-12T07:21:54.903Z"}
{"level":"DEBUG","timestamp":"2020-11-12T07:21:57.635Z","logger":"kafkajs","message":"[Connection] Request Produce(key: 0, version: 7)","broker":"b-2.my-address.jj9wbs.c2.kafka.us-east-1.amazonaws.com:9092","clientId":"my-consumer","correlationId":15,"expectResponse":true,"size":648}
{"level":"DEBUG","timestamp":"2020-11-12T07:21:57.640Z","logger":"kafkajs","message":"[Connection] Response Produce(key: 0, version: 7)","broker":"b-2.my-address.jj9wbs.c2.kafka.us-east-1.amazonaws.com:9092","clientId":"my-consumer","correlationId":15,"size":79,"data":{"topics":[{"topicName":"my-topic","partitions":[{"partition":1,"errorCode":0,"baseOffset":"196802","logAppendTime":"-1","logStartOffset":"188745"}]}],"throttleTime":0,"clientSideThrottleTime":0}}
{"message":{"msg":"done: my-message-2"},"level":"info","timestamp":"2020-11-12T07:21:57.640Z"}
{"message":{"msg":"start: my-message-3"},"level":"info","timestamp":"2020-11-12T07:21:57.640Z"}
{"level":"DEBUG","timestamp":"2020-11-12T07:22:07.449Z","logger":"kafkajs","message":"[Connection] Request Produce(key: 0, version: 7)","broker":"b-1.my-address.jj9wbs.c2.kafka.us-east-1.amazonaws.com:9092","clientId":"my-consumer","correlationId":20,"expectResponse":true,"size":648}
{"level":"DEBUG","timestamp":"2020-11-12T07:22:07.454Z","logger":"kafkajs","message":"[Connection] Response Produce(key: 0, version: 7)","broker":"b-1.my-address.jj9wbs.c2.kafka.us-east-1.amazonaws.com:9092","clientId":"my-consumer","correlationId":20,"size":79,"data":{"topics":[{"topicName":"my-topic","partitions":[{"partition":0,"errorCode":0,"baseOffset":"186582","logAppendTime":"-1","logStartOffset":"178437"}]}],"throttleTime":0,"clientSideThrottleTime":0}}
{"message":{"msg":"done: my-message-3"},"level":"info","timestamp":"2020-11-12T07:22:07.455Z"}

my topic I'm reading from has 12 partitions with a replica set to 3 the topic I'm writing to has 3 partitions with replica set to 3

janwiemers avatar Nov 12 '20 07:11 janwiemers

Do you have any logs from the broker? As the connection is closed by the server, there can be something in broker logs.

WaleedAshraf avatar Nov 12 '20 09:11 WaleedAshraf

Ok I found the issue. The disconnect is unrelated to KafkaJS. my consumers have a long running task that they execute on each message. It appears that in some rare cases this job wasn't able to finish and the consumer was not able to send its heartbeat. Thats why the consumer disconnected. the question I wasn't able to answer is why the consumer does not reconnect though. I saw multiple disconnects but no try from the consumer to connect again.

will further investigate...

have a great weekend :)

janwiemers avatar Nov 14 '20 18:11 janwiemers

I'm experiencing the same situation with a NestJS-KafkaJS-Confluent setup. Our producers are just fine, but some consumers stop without any logs or other indication of an issue. Interestingly, producers in the same service continue to send events.

I've added hooks, more logging etc.. but It doesn't yield results -- callbacks aren't called, logs lack anything related. The service has no idea there is a problem, but there are a too many rebalances though. Our system monitoring doesn't detect any issues (like network or other resource contention) and I've reviewed with Confluent.

I run several instances of a service connected to the same consumer group. Sometimes one service stops consuming and sometimes all of them stop consuming.

This issue was closed #561 -- and I agree that thread branched in many different directions.

I'm not able to reproduce the issue on command, otherwise I'd post code. We've since pivoted our architecture to make things more reliable. In simple produce/consume examples, it just doesn't happen before I loose patience.

thicks avatar May 11 '21 19:05 thicks

Hello gents, any news on this front? I am trying to understand why my consumers simply stop consuming after a while. It is very weird. I am on the latest version of the lib and even tried the 1.16.0-beta.27.

The consumer is not crashing and does not seem to be disconnecting either. The one common thing that I have noticed so far is that almost every time, the last thing logged is the consumer.network.request_queue_size event.

image

One other thing to add is that it is taking 10 minutes between the last activity and these last consumer.network.request_queue_size events.

I am using an AWS MSK cluster running on Kafka 2.6.2 with 2 brokers and the consumers are running in ECS Fargate tasks.

Is there any thing I can do to fix this behavior on my side? I am down to two alternatives: either terminating the process if it takes too long to receive any consumer.* event or switching to another kafka library altogether.

raphaabreu avatar Nov 23 '21 22:11 raphaabreu

@raphaabreu we solved it like this:

// listen to heartbeat 
this.consumer.on('consumer.heartbeat', () => {
    this.lastHeartbeat = new Date()
})

// start heartbeat check, if we detect that we do not have heartbeat, restart consumer
private startHeartbeatCheck() {
    setInterval(async () => {
      const now = new Date()
      if (this.lastHeartbeat.getTime() < now.getTime() - HEARTBEAT_CHECK_INTERVAL) {
        this.errorLogger(new Error(`Last heartbeat was at ${this.lastHeartbeat}`), false)
        await this.restartConsumer()
      }
    }, HEARTBEAT_CHECK_INTERVAL)
  }

Heartbeat should be received even if there is no traffic... when we detect that heartbeat is not coming, we just restart consumer....

ghost avatar Nov 24 '21 12:11 ghost

Thank you @dusan-dragon, I have followed your advice and so far so good. It really feels like this should be something baked into the default behavior of the lib.

raphaabreu avatar Nov 25 '21 12:11 raphaabreu

If the logs above are complete, it seems there is a missing end_batch_process event for partition 9. Which suggests the batch processing has hung.

Do you have any logging in your processing function?

On Thu, 25 Nov 2021, 20:29 Raphael Abreu, @.***> wrote:

Thank you @dusan-dragon https://github.com/dusan-dragon, I have followed your advice and so far so good. It really feels like this should be something baked into the default behavior of the lib.

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/tulios/kafkajs/issues/659#issuecomment-979171858, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABDLW5SFYEDWXKQS3S7SHOLUNYTZFANCNFSM4LFEEICQ .

t-d-d avatar Nov 29 '21 00:11 t-d-d

@dusan-dragon - what is your this.restartConsumer() logic doing here?

The heartbeats being reacted to in the 'consumer.heartbeat' instrumentation events are being triggered when the kafkajs consumer client is sending heartbeats to the broker, correct?

Is that logic actually stopping in the situation where the consumer is hanging? I would have assumed it's still being triggered by the client, even when the broker connection may not be sending back any data.

EricMCornelius avatar Dec 06 '21 01:12 EricMCornelius

@EricMCornelius

The logic inside restartConsumer() is simple. We are just calling consumer.disconnect() and then we recreate that consumer. In our case it is working fine.

You are correct that this heartbeat is consumer sending to broker. Well I am not 100% sure about your scenario.

ghost avatar Dec 06 '21 07:12 ghost

We have similar issues - only 1/3 to 1/2 of our consumers across the fleet ever consume messages.

And yes, we have more partitions on the topic at hand than consumers.

yakkomajuri avatar May 31 '22 10:05 yakkomajuri

I found out that the problem is with logs folder (where meta.properties is stored) So before I launch zookeeper then kafka, I fully empty logs folder. To have everything in one place I just have one bash script

#!/bin/bash

rm -rf /home/kafka/logs
mkdir /home/kafka/logs
sudo systemctl start zookeeper
sudo systemctl start kafka

alkhachatryan avatar Aug 21 '22 11:08 alkhachatryan

Note from our side - our problem was that we were getting CPU bound when spinning up nodes. At that point our server did a whole bunch of stuff, including connecting to Kafka. So even though we were well within CPU limits during normal execution, we were not coping with the startup spike well.

yakkomajuri avatar Aug 22 '22 12:08 yakkomajuri

I’ve also encountered an unusual problem with our consumers. After a certain period, some consumers stop consuming messages abruptly, without any error messages. Restarting the applications resolves the issue temporarily. I’ve been trying to reproduce it on my local setup, but haven’t been successful so far.

Any assistance would be greatly appreciated.

mahajanankur avatar Jun 28 '23 04:06 mahajanankur

I am having a similar problem as described above and i found a way to reproduce it:

  1. Start the application normally (with the consumer connected and subscribed to topics)
  2. Restart the consumer connection with the broker (i my case i am running kafka locally) a few times. (Restart only the connection between consumer and the broker, not the application itself)
  3. After a few restarts on the connection, the consumer stops processing messages. PS: to be sure, i started another application and it was able to process the messages normally. After a few restarts on the connection, it stopped processing as well.

EDIT: Only one disconnect/connect is necessary to reproduce the problem on my machine.

Since a similar problem was mentioned here: https://github.com/tulios/kafkajs/issues/660 I'll test the following:

  • increase the time between the disconnect and the connect to check if the problem stops happening after some threshold (which may indicate that Node was able to kill the socket after the disconnect step)

teohen avatar Jan 19 '24 22:01 teohen