kafkajs icon indicating copy to clipboard operation
kafkajs copied to clipboard

KafkaJS claims a broker does not host a topic-partition, even though it does

Open mgirard772 opened this issue 4 years ago • 43 comments

Describe the bug I receive lots of error messages like the following:

{
    "level": "ERROR",
    "timestamp": "2020-07-15T16:48:34.740Z",
    "logger": "kafkajs",
    "message": "[Connection] Response Metadata(key: 3, version: 5)",
    "broker": "aws_msk_host_1:9092",
    "clientId": "ti-qa",
    "error": "This server does not host this topic-partition",
    "correlationId": 16,
    "size": 2120
}

However, a check of the topic metadata (using this topic as an example), speaks to the contrary:

  "learningpaths" with 6 partition(s)
partition 0 leader: 2, replicas: [2, 3, 1], isrs: [3, 1, 2] errstr: 
partition 1 leader: 1, replicas: [1, 2, 3], isrs: [3, 1, 2] errstr: 
partition 2 leader: 3, replicas: [3, 1, 2], isrs: [3, 1, 2] errstr: 
partition 3 leader: 2, replicas: [2, 1, 3], isrs: [3, 1, 2] errstr: 
partition 4 leader: 1, replicas: [1, 3, 2], isrs: [3, 1, 2] errstr: 
partition 5 leader: 3, replicas: [3, 2, 1], isrs: [3, 1, 2] errstr: 

This happens across multiple topics.

Code

const awslog = require('lib/awslog');
const config = require('config');

const { Kafka } = require('kafkajs');

const BROKERS =
  config.kafkaBrokers && config.kafkaBrokers.trim() !== '' ? config.kafkaBrokers.split(',') : null;

const USE_KAFKA = config.env !== 'test' && BROKERS !== null;

const kafka = USE_KAFKA
  ? new Kafka({
      clientId: `ti-${config.env}`,
      brokers: BROKERS,
      retry: {
        initialRetryTime: 1000,
        retries: 9
      }
    })
  : null;

const producer = USE_KAFKA
  ? kafka.producer({
      metadataMaxAge: 60000
    })
  : null;

function push(name, records) {
  if (USE_KAFKA && records && records.length) {
    Promise.all(
      records.map(record =>
        // `Promise.resolve` here prevents invalid messages from throwing,
        // just in case others in the same batch are valid.
        Promise.resolve(keysToLowerCase(record)).then(
          value => ({ value, key: record.id || record.requestId }),
          err => {
            config.bugsnag.notify(new Error('Failed to prepare record for Kafka'), {
              message: err.message,
              paths: err.paths,
              record,
              topic: name.toLowerCase(),
              brokers: BROKERS
            });

            return null;
          }
        )
      )
    )
      .then(encodedMessages => {
        const validMessages = encodedMessages.filter(message => message);
        if (validMessages.length) {
          return producer.send({
            topic: name.toLowerCase(),
            messages: validMessages,
            acks: 1
          });
        }
      })
      .catch(e => {
        awslog.error(null, new Error('Failed to send record to Kafka'), {
          message: e.message,
          topic: name.toLowerCase(),
          messages: records,
          brokers: BROKERS,
          paths: e.paths
        });
      });
  }
}

function flush() {
  if (USE_KAFKA) {
    return producer.disconnect();
  } else {
    return Promise.resolve(true);
  }
}

module.exports = {
  push,
  flush
};

function keysToLowerCase(obj) {
  const newObj = {};
  const keys = Object.keys(obj);
  for (const key of keys) {
    newObj[key.toLowerCase()] = obj[key];
  }

  return JSON.stringify(newObj);
}

Expected behavior Messages get sent and acknowledged by at least the topic-partition leader, unless an error occurs.

Observed behavior KafkaJS producer throws above error claiming This server does not host this topic-partition, when it obviously does. It's possible there's another issue but the logic throws this error instead.

Environment:

  • OS: Ubuntu 14.04.6 LTS
  • KafkaJS version 1.12.0
  • Kafka version 2.2.1 (Amazon MSK)
  • NodeJS version 10.20.1

Additional context Any pointers on what might be wrong with my code, or wrong with the library would be helpful.

mgirard772 avatar Jul 15 '20 17:07 mgirard772

Kafka version 2.2.1 (Amazon MSK)

vs

"broker": "localhost:9092",

This doesn't add up I think, are you sure you're looking at the right broker?

ankon avatar Jul 15 '20 18:07 ankon

Kafka version 2.2.1 (Amazon MSK)

vs

"broker": "localhost:9092",

This doesn't add up I think, are you sure you're looking at the right broker?

I omitted the actual host for security

mgirard772 avatar Jul 15 '20 18:07 mgirard772

Are you consistently seeing this error or intermittently? If it's constant, that would point to you not having configured ACLs correctly, so that your producer doesn't have the right to access the topic, and as such wouldn't be able to produce to it.

Ubuntu 14.04.6 LTS

As an aside, you should really upgrade that. 14.04 has been EOL for over a year now. 😅

Nevon avatar Jul 16 '20 16:07 Nevon

Everything seems to work fine up to a point, then an onslaught of these errors start coming (and don't stop) even though the metadata says everything is fine. The kicker is that the errors are for topics we've confirmed exist and are writable by the producer. Something changes somewhere down the line.

Things I've tried so far, but to no avail:

  1. Ensure no whitespace makes it into the topic name
  2. Upgrade to the latest Kafka version available on MSK (2.4.1)
  3. Reducing metadata age
  4. Only requiring acks from the leader

Things I'm still looking into:

  1. Potential issues with the provided key somehow resulting in an incorrect partition number. The key is a UUID, so it shouldn't be an issue, but who knows.
  2. Using the JavaCompatiblePartitioner
  3. Ditching MSK and using another Kafka implementation

Thanks for bringing up the Ubuntu version, I'll have to mention it to our SRE team.

Any thoughts on what I might be missing? I'd really prefer to stick with our current implementation, but I've been trying to debug this for 2 weeks now and starting to get pretty frustrated.

mgirard772 avatar Jul 16 '20 16:07 mgirard772

The key and the partitioner should be innocent in this. Regardless of which partition a message is assigned to, the mismatch is between the broker address and the resolved IP, or the metadata response and reality.

Under the hood, KafkaJS uses net/tls.connect in order to connect to the brokers. DNS resolution is done via dns.lookup by default (this is all happening in Node, it's not something that we explicitly configure), which in turn just delegates to the OS. If your OS was caching DNS entries, that could mean that we connect to the wrong broker if the IPs change. This would explain why this suddenly happens at some point and doesn't stop. What's unlikely about it though is that clearly a broker is responding, just not the right one. You could verify this theory by implementing a custom socket factory (search in the docs for this. There's an example in there) which uses dns.resolve instead of dns.lookup - this is an option you can pass into net/tls.connect. dns.resolve does not use the same mechanism as lookup, and should bypass whatever caching the OS may have in place. At the very least you could log the hostname and the resolved IP to see if this is changing over time.

If that's not the issue, then it would have to be something with the metadata response being incorrect. We have seen some odd stuff from MSK in the past, but I can't remember if it was related.

Another thing you could do, just to rule it out, is to try using the beta version of KafkaJS instead of 1.12.0. It's possible there was some related bug fixed in there, but I would have to go through the commits to be sure.

Nevon avatar Jul 16 '20 18:07 Nevon

Does the broker list have to be in a particular order? For example, does broker 1 have the be the first in the list, or are host names/IP's used to match appropriately based on what's in the metadata? The metadata responses I've seen simply give numbers.

I've noticed that the AWS Boto3 API in Python gives the brokers hostnames out of order, 3,2,1 or 3,1,2, while the AWS console gives them as 1,2,3.

mgirard772 avatar Jul 16 '20 18:07 mgirard772

No, order is irrelevant. It's just a mapping between node ids and hostnames.

Nevon avatar Jul 16 '20 20:07 Nevon

Ditching MSK and using another Kafka implementation

We have seen some odd stuff from MSK in the past, but I can't remember if it was related.

Mostly as a data point: We're running on MSK (right now Kafka 2.4.1), and have been since the preview. Sometimes there are glitches/oddnesses, but in general MSK does work and we haven't seen this particular problem.

ankon avatar Jul 16 '20 21:07 ankon

Maybe this is also related to this issue: #803

I experienced the same problems and errors as described by the author here.

oemergenc avatar Jul 31 '20 11:07 oemergenc

Interesting.

We ended up switching over to node-rdkafka and stopped experiencing the issue. There must be some issue in the way kafkajs communicates with the brokers that's causing it to throw these errors unnecessarily.

mgirard772 avatar Jul 31 '20 11:07 mgirard772

I've a similar issue using NestJS with Kafka Confluent Cloud, when the topic doesn't exists it fails even though I have allowAutoTopicCreation set to true in Consumer and Producer:

[NestWinston] Error 2021-6-18 16:06:17 [ClientKafka] ERROR [Connection] Response Metadata(key: 3, version: 6) {"timestamp":"2021-06-18T20:06:17.495Z","logger":"kafkajs","broker":"pkc-4nym6.us-east-1.aws.confluent.cloud:9092","clientId":"nestjs-consumer-client","error":"This server does not host this topic-partition","correlationId":24,"size":2569} - {"stack":[""]}

Client Config

{
	transport: Transport.KAFKA,
	options: {
	  client: {
	    brokers,
	    sasl,
	    ssl,
	  },
	  consumer: {
	    groupId: 'consumer',
	    allowAutoTopicCreation: true,
	  }
	},
 }

titobundy avatar Jun 18 '21 20:06 titobundy

@titobundy check if auto.create.topics.enable config is set to true, I've got that issue on AWS MSK

momirov avatar Jun 24 '21 12:06 momirov

I've use Kafka Confluent Cloud free version, it seems that this version Confluent Control Center doesn't allow access to all options to configure the broker, but in the docs it indicates that by default is set to true.

image

titobundy avatar Jun 24 '21 21:06 titobundy

When you use createTopics method , you should try to add { topic: 'topic_name', numPartitions : 1, replicationFactor : 3 }

vquanglqd avatar Jul 24 '21 09:07 vquanglqd

I get the error message when I try to run

    const [createError, createResult] = await to( admin.createTopics({
        validateOnly: true,
        waitForLeaders: true,
        timeout: 5000,
        topics: [
            {
                topic: '123',
            }
        ],
    }) );

joerg-walter-de avatar Oct 27 '21 13:10 joerg-walter-de

@joerg-walter-de thanks the waitForLeaders: true, solved the error type: 'NOT_CONTROLLER' in some cases for me :+1:

helio-frota avatar Nov 08 '21 13:11 helio-frota

That is so strange, I'm trying to run the kafkaJS integration tests against a different kafka (not the kafka configured by the kafkaJS integration tests), and when I use waitForLeaders: true I can get the src/admin/__tests__/createTopics.spec.js tests working but get NOT_CONTROLLER error on src/admin/__tests__/deleteTopics.spec.js tests. When I change waitForLeaders: false I got the same error on the createTopics.spec.js tests.

helio-frota avatar Nov 08 '21 14:11 helio-frota

Running 2.0.1 here and still seeing this bug running against Confluent Cloud. It doesn't matter how many partitions the topic has, tried with 1, 3 and 6. Is there a workaround for this yet?

With the following code:

const consumeMessage = async ({ topic, partition, message, heartbeat }) => {
  console.log('Received message', topic, partition, message, heartbeat)
  return processEntry(message.key.toString(), JSON.parse(message.value.toString()))
}

const startConsumer = async () => {
  const consumer = kafka.consumer({ groupId: process.env.KAFKA_GROUP_ID })
  await consumer.connect()
  await consumer.subscribe({ topics: [process.env.KAFKA_TOPIC] })
  return await consumer.run({ eachMessage: consumeMessage })
}

I never see my console.log. I only see the following logged, every time a message is published on that topic:

Got request 5e39bdb0-c9ec-11ec-8a73-cd9f4c2cfe13 { ts: 1653574734217 }
{"level":"ERROR","timestamp":"2022-06-01T07:54:50.826Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 6)","broker":"pkc-4r297.europe-west1.gcp.confluent.cloud:9092","clientId":"gtm-adapter","error":"This server does not host this topic-partition","correlationId":3,"size":2345}
{"level":"ERROR","timestamp":"2022-06-01T07:54:51.109Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 6)","broker":"pkc-4r297.europe-west1.gcp.confluent.cloud:9092","clientId":"gtm-adapter","error":"This server does not host this topic-partition","correlationId":4,"size":2345}
{"level":"ERROR","timestamp":"2022-06-01T07:54:51.582Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 6)","broker":"pkc-4r297.europe-west1.gcp.confluent.cloud:9092","clientId":"gtm-adapter","error":"This server does not host this topic-partition","correlationId":5,"size":2345}
{"level":"ERROR","timestamp":"2022-06-01T07:54:52.364Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 6)","broker":"pkc-4r297.europe-west1.gcp.confluent.cloud:9092","clientId":"gtm-adapter","error":"This server does not host this topic-partition","correlationId":6,"size":2345}
{"level":"ERROR","timestamp":"2022-06-01T07:54:54.193Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 6)","broker":"pkc-4r297.europe-west1.gcp.confluent.cloud:9092","clientId":"gtm-adapter","error":"This server does not host this topic-partition","correlationId":7,"size":2345}
{"level":"ERROR","timestamp":"2022-06-01T07:54:57.138Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 6)","broker":"pkc-4r297.europe-west1.gcp.confluent.cloud:9092","clientId":"gtm-adapter","error":"This server does not host this topic-partition","correlationId":8,"size":2345}
{"level":"ERROR","timestamp":"2022-06-01T07:55:00.023Z","logger":"kafkajs","message":"[Consumer] Crash: KafkaJSNonRetriableError: This server does not host this topic-partition","groupId":"alex-consumer","stack":"KafkaJSNonRetriableError: This server does not host this topic-partition\n    at /Users/alex/Projects/revend/gtmadapter-node/node_modules/kafkajs/src/retry/index.js:55:18\n    at runMicrotasks (<anonymous>)\n    at processTicksAndRejections (node:internal/process/task_queues:96:5)"}
{"level":"INFO","timestamp":"2022-06-01T07:55:00.091Z","logger":"kafkajs","message":"[Consumer] Stopped","groupId":"alex-consumer"}

Is there any settings on the topic or broker that I need to change in conflunt cloud in order to make it compatible with this library?

Thanks in advance!

alexmreis avatar Jun 01 '22 08:06 alexmreis

+1 facing the same issue

adamatti avatar Jul 15 '22 18:07 adamatti

I had the same issue. There is a bug in refreshMetadata. It is not able to handle external (outside of kafkajs) removal of the topic properly.

Problem is related to refreshMetadata code which is trying to refresh metadata for each topic that is in this.targetTopics Set (from admin/cluster/index.js)

Easy way to reproduce this bug:

  1. create topic1 and topic2
  2. get offsets for topic1 and topic2 with kafkajs
  3. manually delete topic1 without use of kafkajs
  4. try to get offset for topic2 with kafkajs. It will fail with "This server does not host this topic-partition" exception even if topic2 exists and we only deleted topic1.

So if one of the topics from this.targetTopics was externally removed from kafka (without use of kafkajs) - you will not be able to perform metadata refresh for any other topic and it will not be able to recover from this error.

vadiml avatar Jul 16 '22 16:07 vadiml

"error":"This server does not host this topic-partition"

I am experiencing this issue with NestJS with the subscribeToResponseOf() function even though I know the topic exists... I have search everywhere for a resolution, and tried many code changes.

Any guidance from anyone on how to fix this issue? Is it simply a Kafka BUG??

natenrb9 avatar Jul 27 '22 15:07 natenrb9

Update from my side: my issue was that we had the same broker for prod and dev, and the app instances were using the same group id (e.g. app dev instance connecting to dev_ topics, app prod instance connecting to prod_ topics). It all started to working fine when we added the env to group id. Hope it helps someone.

adamatti avatar Jul 27 '22 17:07 adamatti

We have also faced similar issue: Steps to reproduce:

  1. Try to publish message to a topic that is NOT EXISTING
  2. Try to publish message to a topic that is EXISTING Both the publishes will fail below error: "This server does not host this topic-partition"" "originalError\":{\"name\":\"KafkaJSProtocolError\",\"retriable\":true,\"type\":\"UNKNOWN_TOPIC_OR_PARTITION\",\"code\":3}

Library should not fail to publish data to an EXISTING topic even if there is an attempt to publish data to a NON EXISTING topic in some other part of the code.

shivakumara-rapido avatar Aug 03 '22 12:08 shivakumara-rapido

I'm seeing the same issue as @vadiml. We use Kafka topics as webhook queues, along with a permanent topic for create/update/delete of the webhooks. When a webhook is deleted, if we delete the webhook topic, there's a race between the queue readers unsubscribing due to the webhook delete message and hitting this issue. To be clear, we do use kafkajs to delete the topic, but it's happening in another process. It would also be a big help if the error had more context, like which topic-partition is not found.

trevorr avatar Aug 11 '22 17:08 trevorr

+1 I'm having the same experience when trying to delete topics

prescindivel avatar Sep 14 '22 15:09 prescindivel

Oddly I'm having the same issue with confluentinc/cp-kafka:7.0.4 but if I try (and fail) to delete the topic before disconnecting, when I reconnect and list topics the topic... was somehow created (and accepts messages).

There seems to be some sort of latency issue or the promise returns before the actual creation process is complete because any subsequent logic that expects the topics to be there fails, even with long (multi-second) sleeps beforehand.

If I let the process end the connection and restart however, the topic is there successfully created the next time I run admin.listTopics().

olivierlacan avatar Oct 22 '22 19:10 olivierlacan

I'm having the same issue. I'm running MSK Serverless kafka in AWS. When destroying our AWS Cloudformation stack, we have a lambda that get called to delete all known topics. These deletion requests are run in parallel. Very usually one out of 5 will fail with this exception. Rerunning the deletion makes the topic get deleted properly. I suspect for at least my case, the problem is related to the parallel deletion.

mtkopone avatar Oct 31 '22 13:10 mtkopone

having the same issue, running NestJS with confluent Kafka, it works if I run it in a sample file but has issues when running on NestJS.

tekpriest avatar Nov 09 '22 12:11 tekpriest

@tekpriest Hi I have the same issue running Kafka on NestJs. Happens only in the stage/prod environment. When I use local Kafka this issue does not exist.

Jurajzovinec avatar Nov 22 '22 13:11 Jurajzovinec

@tekpriest Hi I have the same issue running Kafka on NestJs. Happens only in the stage/prod environment. When I use local Kafka this issue does not exist.

Yeah running locally doesn't cause errors, but I have been having issues running kafka on docker, anyone got a reference?

tekpriest avatar Nov 22 '22 13:11 tekpriest