kafkajs icon indicating copy to clipboard operation
kafkajs copied to clipboard

Kafkajs disconnect itself and do not attempt to reconnect

Open dusan-dragon opened this issue 4 years ago • 11 comments

Hello, first of all maybe I just did not understand something properly and has wrong expectations, but I was expecting that kafkajs will do autoreconnect / autodiscovery of new brokers but instead it will just stop consuming without emmiting crash event or something.

Here are logs from our server, I removed repeating cycle of heartbeat and fetching:

2020-03-11T09:18:05.612Z,Kafka server has closed connection
2020-03-11T09:18:05.612Z,disconnected
2020-03-11T09:18:05.612Z,disconnecting...
2020-03-11T09:17:00.552Z,Kafka server has closed connection
2020-03-11T09:17:00.552Z,disconnected
2020-03-11T09:17:00.552Z,disconnecting...
2020-03-11T09:08:05.612Z,"Request Fetch(key: 1, version: 7)"
2020-03-11T09:08:05.612Z,Fetching from 1 partitions for 1 out of 1 topics
2020-03-11T09:08:05.611Z,"Response Heartbeat(key: 12, version: 1)"
2020-03-11T09:08:05.611Z,"Request Heartbeat(key: 12, version: 1)"
2020-03-11T09:08:05.610Z,"Response Fetch(key: 1, version: 7)"
2020-03-11T09:08:00.608Z,"Request Fetch(key: 1, version: 7)"
2020-03-11T09:08:00.606Z,Fetching from 1 partitions for 1 out of 1 topics
2020-03-11T09:08:00.605Z,"Response Heartbeat(key: 12, version: 1)"
2020-03-11T09:08:00.604Z,"Request Heartbeat(key: 12, version: 1)"
2020-03-11T09:08:00.604Z,"Response Fetch(key: 1, version: 7)"
2020-03-11T09:07:55.602Z,"Request Fetch(key: 1, version: 7)"
2020-03-11T09:07:55.602Z,Fetching from 1 partitions for 1 out of 1 topics
2020-03-11T09:07:55.602Z,"Response Heartbeat(key: 12, version: 1)"
2020-03-11T09:07:55.601Z,"Request Heartbeat(key: 12, version: 1)"
2020-03-11T09:07:55.601Z,"Response Fetch(key: 1, version: 7)"
2020-03-11T09:07:50.599Z,"Request Fetch(key: 1, version: 7)"
2020-03-11T09:07:50.598Z,Fetching from 1 partitions for 1 out of 1 topics
2020-03-11T09:07:50.598Z,"Response Heartbeat(key: 12, version: 1)"
2020-03-11T09:07:50.595Z,"Request Heartbeat(key: 12, version: 1)"
2020-03-11T09:07:50.595Z,"Response Fetch(key: 1, version: 7)"
2020-03-11T09:07:45.593Z,"Request Fetch(key: 1, version: 7)"
2020-03-11T09:07:45.593Z,Fetching from 1 partitions for 1 out of 1 topics
2020-03-11T09:07:45.592Z,"Response Heartbeat(key: 12, version: 1)"
2020-03-11T09:07:45.591Z,"Request Heartbeat(key: 12, version: 1)"
2020-03-11T09:07:45.590Z,"Response Fetch(key: 1, version: 7)"
2020-03-11T09:07:40.588Z,"Request Fetch(key: 1, version: 7)"
2020-03-11T09:07:40.587Z,Fetching from 1 partitions for 1 out of 1 topics
2020-03-11T09:07:40.587Z,"Response Heartbeat(key: 12, version: 1)"
2020-03-11T09:07:40.586Z,"Request Heartbeat(key: 12, version: 1)"
2020-03-11T09:07:40.584Z,"Response Fetch(key: 1, version: 7)"
2020-03-11T09:07:35.582Z,"Request Fetch(key: 1, version: 7)"
...
...
2020-03-11T09:02:09.441Z,"Request Fetch(key: 1, version: 7)"
2020-03-11T09:02:09.440Z,Fetching from 1 partitions for 1 out of 1 topics
2020-03-11T09:02:09.440Z,"Response Heartbeat(key: 12, version: 1)"
2020-03-11T09:02:09.439Z,"Request Heartbeat(key: 12, version: 1)"
2020-03-11T09:02:09.438Z,"Response Fetch(key: 1, version: 7)"
2020-03-11T09:02:04.436Z,"Request Fetch(key: 1, version: 7)"
2020-03-11T09:02:04.436Z,Fetching from 1 partitions for 1 out of 1 topics
2020-03-11T09:02:04.435Z,"Response Heartbeat(key: 12, version: 1)"
2020-03-11T09:02:04.434Z,"Request Heartbeat(key: 12, version: 1)"
2020-03-11T09:02:04.430Z,"Response Fetch(key: 1, version: 7)"
2020-03-11T09:01:59.427Z,"Request Fetch(key: 1, version: 7)"
2020-03-11T09:01:59.419Z,Fetching from 1 partitions for 1 out of 1 topics
2020-03-11T09:01:59.419Z,"Response ListOffsets(key: 2, version: 2)"
2020-03-11T09:01:59.417Z,"Request ListOffsets(key: 2, version: 2)"
2020-03-11T09:01:59.415Z,Verified support for SaslAuthenticate
2020-03-11T09:01:59.398Z,Connecting
2020-03-11T09:01:59.397Z,"Response OffsetFetch(key: 9, version: 3)"
2020-03-11T09:01:59.396Z,"Request OffsetFetch(key: 9, version: 3)"
2020-03-11T09:01:59.393Z,Consumer has joined the group
2020-03-11T09:01:59.392Z,Received assignment
2020-03-11T09:01:59.392Z,"Response SyncGroup(key: 14, version: 1)"
2020-03-11T09:01:59.391Z,"Request SyncGroup(key: 14, version: 1)"
2020-03-11T09:01:59.389Z,Group assignment
2020-03-11T09:01:59.388Z,"Response Metadata(key: 3, version: 5)"
2020-03-11T09:01:59.387Z,"Request Metadata(key: 3, version: 5)"
2020-03-11T09:01:59.386Z,Chosen as group leader
2020-03-11T09:01:59.385Z,"Response JoinGroup(key: 11, version: 2)"
2020-03-11T09:01:59.383Z,"Request JoinGroup(key: 11, version: 2)"
2020-03-11T09:01:59.381Z,Found group coordinator
2020-03-11T09:01:59.381Z,"Response GroupCoordinator(key: 10, version: 1)"
2020-03-11T09:01:59.379Z,"Request GroupCoordinator(key: 10, version: 1)"
2020-03-11T09:01:59.378Z,Verified support for SaslAuthenticate
2020-03-11T09:01:59.358Z,Connecting
2020-03-11T09:01:59.357Z,Starting
2020-03-11T09:01:59.356Z,"Response Metadata(key: 3, version: 5)"
2020-03-11T09:01:59.354Z,"Request Metadata(key: 3, version: 5)"
2020-03-11T09:01:59.351Z,Verified support for SaslAuthenticate
2020-03-11T09:01:59.351Z,"Response ApiVersions(key: 18, version: 2)"
2020-03-11T09:01:59.349Z,"Request ApiVersions(key: 18, version: 2)"
2020-03-11T09:01:59.292Z,Connecting

So we have 1 consumers which is consuming messages from 1 topic only and we have 3 brokers. The consumer stops consuming when we kill one of the brokers. The same scenario is captured in the logs.

We have brokers:

  • kafka-1
  • kafka-2
  • kafka-3

Some notes:

  • all heartbeats are done to the kafka-2 broker
  • all fetch are done to the kafka-1 broker
  • kafka-1 is KILLED
  • disconnects in logs are for kafka-2 and kafka-3
  • last Request Fetch do not receive response
  • after last fetch there is 10 minutes silence and then there are disconnects
  • we are using confluent kafka
  • group coordinator is kafka-2

I would appreciate if you could help me to resolve this. Thank you

Edit This is my configuration in code:

this.kafka = new Kafka({
      clientId: 'xyz',
      brokers: options.kafkaBrokers,
      ssl: true,
})

this.consumer = this.kafka.consumer({ groupId: uuidv4() })
this.consumer.on('consumer.crash', event => {
   const error = event?.payload?.error
   this.crashHandler(error)
})

await this.consumer.connect()
await this.consumer.subscribe({ topic: this.options.topic, fromBeginning: false })
await this.consumer.run({
  autoCommit: false,
  partitionsConsumedConcurrently: 5,
  eachMessage: async ({ topic, partition, message }) => {
    // process message
  },
})

dusan-dragon avatar Mar 11 '20 21:03 dusan-dragon

One more note here, the kafka-1 is killed "violently" meaning it is running as ec2 instance and the instance is killed. Mentioning this because when I tried it locally simulate with docker kill command, the consumer will recover normally.

ghost avatar Mar 12 '20 05:03 ghost

Hi, I'm trying to reproduce this issue also.

Q: When you kill ec2 instance, isn't it a graceful shutdown? It's not the same as killing the process. Maybe I'm missing something.

WaleedAshraf avatar Mar 12 '20 12:03 WaleedAshraf

I was not able to reproduce this issue on AWS EC2 with Kafka 1.1.0. (not confluent).

  • have 3 brokers on EC2
  • use kafkajs to produce batch and consume batch on all three brokers on one topic.
  • kill one broker
  • I see logs on the consumer for disconnect
  • both producer and consumer keep working with other 2 brokers
  • bring back the killed broker
  • after some time, both producer and consumer connect to that broker also.

WaleedAshraf avatar Mar 12 '20 15:03 WaleedAshraf

@WaleedAshraf describes the expected experience. The way that Kafka clients work is that they initially connect to one broker, and then discover the rest of the cluster. In case it gets disconnected from all brokers, it will try to reconnect to the brokers that you have configured the client with, one by one, until it finds one that it can connect to, after which it will re-discover the rest of the cluster.

The only thing I could think of would be if the topic you are consuming from only has a singe partition and isn't being replicated to any of the other brokers, so when that broker goes away there is no leader for that topic to connect to, but even then I would at the very least expect an error - not to silently fail.

Whether you do a "violent" or graceful shutdown doesn't matter that much. The only difference would be that it could take longer before the cluster realizes that the broker is gone, but in the end it should end up in the same state eventually.

If you could create a repo with a way to reproduce the issue, we could take a look at it, but without that it's hard to help you. The logs you are posting are also not raw KafkaJS logs, so there's a lot of information that is missing. I don't really see anything wrong in those. It just looks like it's connecting, heartbeating and fetching, as expected.

Nevon avatar Mar 14 '20 15:03 Nevon

Hi @Nevon I have a wrapper around the logs and I pulled only titles, so if you need a body for some specific log let me know and I can pull it out.

I am trying to create a reproducible example, but it is little bit harder, so far all easy stuff I tried behaves as expected. Only our confluent kafka setup reliable replicate the issue which I described.

Is there anything more which could help to debug this until I do not have some kind of shareable repo?

ghost avatar Mar 14 '20 17:03 ghost

Not sure if this is related but sounds like it and I've also been tracking down some connection/timeout issues and I noticed if I pull the shut off my network connection I get no crash, nothing; just stops logging.

I reconnect immediately and the node app stays hung in this state without crashing or continuing and no feedback. Nothing emitted in the consumer.crash event as suggested above. My init code is almost the same as @dusan-dragon except I'm using a single partition.

node: "v12.14.1" kafkajs: "1.12.0" kafka: "2.3.0"

{"level":"DEBUG","timestamp":"2020-04-17T06:35:30.680Z","logger":"kafkajs","message":"[ConsumerGroup] Fetching from 1 partitions for 1 out of 1 topics","topics":["topic-name"],"activeTopicPartitions":[{"topic":"topic-name","partitions":[0]}],"pausedTopicPartitions":[]}
{"level":"DEBUG","timestamp":"2020-04-17T06:35:30.680Z","logger":"kafkajs","message":"[Connection] Request Fetch(key: 1, version: 7)","broker":"<BROKER-0>:9092","clientId":"bash-mbp","correlationId":118,"expectResponse":true,"size":116}
{"level":"DEBUG","timestamp":"2020-04-17T06:35:35.775Z","logger":"kafkajs","message":"[Connection] Response Fetch(key: 1, version: 7)","broker":"<BROKER-0>:9092","clientId":"bash-mbp","correlationId":118,"size":93,"data":"[filtered]"}
{"level":"DEBUG","timestamp":"2020-04-17T06:35:35.776Z","logger":"kafkajs","message":"[Connection] Request Heartbeat(key: 12, version: 1)","broker":"<BROKER-0>:9092","clientId":"bash-mbp","correlationId":119,"expectResponse":true,"size":107}
{"level":"DEBUG","timestamp":"2020-04-17T06:35:35.802Z","logger":"kafkajs","message":"[Connection] Response Heartbeat(key: 12, version: 1)","broker":"<BROKER-0>:9092","clientId":"bash-mbp","correlationId":119,"size":10,"data":{"throttleTime":0,"errorCode":0}}
{"level":"DEBUG","timestamp":"2020-04-17T06:35:35.803Z","logger":"kafkajs","message":"[ConsumerGroup] Fetching from 1 partitions for 1 out of 1 topics","topics":["topic-name"],"activeTopicPartitions":[{"topic":"topic-name","partitions":[0]}],"pausedTopicPartitions":[]}
{"level":"DEBUG","timestamp":"2020-04-17T06:35:35.804Z","logger":"kafkajs","message":"[Connection] Request Fetch(key: 1, version: 7)","broker":"<BROKER-0>:9092","clientId":"bash-mbp","correlationId":120,"expectResponse":true,"size":116}



<infinite sadness>

unfrgivn avatar Apr 17 '20 06:04 unfrgivn

@unfrgivn so far I endup going for heartbeat check - because it hang up and even heartbeats are not going through you can detect it that way:

startHeartbeatCheck() {
    setInterval(async () => {
      const now = new Date()
      if (this.lastHeartbeat.getTime() < now.getTime() - HEARTBEAT_CHECK_INTERVAL) {
        this.errorLogger(new Error(`Last heartbeat was at ${this.lastHeartbeat}`), false)
        await this.restartConsumer()
      }
    }, HEARTBEAT_CHECK_INTERVAL)
  }

ghost avatar Apr 17 '20 09:04 ghost

I am trying to fix this now; I can reproduce the case locally. KafkaJS is doing the right thing, but taking the OS 10 minutes to realize that the connection is dead and then propagating the correct signals to nodejs and then for our code.

example1

The latest pre-release version (1.14.0-beta.7) does a good job of waiting for the connection to finish, but this is just a long time. I am working on a heartbeat mechanism for the connection so we can detect the issue much earlier.

tulios avatar Sep 17 '20 12:09 tulios

taking the OS 10 minutes to realize that the connection is dead and then propagating the correct signals to nodejs and then for our code.

Read #361 for some background on this. It should be enough to call end + unref on the socket, but in my testing it wasn't. destroy may do it.

Nevon avatar Sep 17 '20 13:09 Nevon

@Nevon I made it work in a simple way, since we have keep alive, the socket was receiving the correct signal. Still, it wasn't propagating it correctly since we would never remove the interrupted connection from the brokers list, which made the client attempt a reconnect there, which caused the client to be stuck. I am wrapping up a PR that at least solves the problem I was able to reproduce.

The root cause is more complicated, node.js doesn't allow us to set all necessary keep alive parameters which forces the application to use the OS configs, which in my case, is 11 minutes.

tulios avatar Sep 17 '20 14:09 tulios

I am trying to fix this now; I can reproduce the case locally. KafkaJS is doing the right thing, but taking the OS 10 minutes to realize that the connection is dead and then propagating the correct signals to nodejs and then for our code.

example1

The latest pre-release version (1.14.0-beta.7) does a good job of waiting for the connection to finish, but this is just a long time. I am working on a heartbeat mechanism for the connection so we can detect the issue much earlier.

@tulios Hi there, this may not be relevant to the topic at hand, but I'm curious what log viewing software you're using?

luokuning avatar Dec 26 '21 09:12 luokuning