kafkajs icon indicating copy to clipboard operation
kafkajs copied to clipboard

Issue in connecting consumer

Open ankit-rawani opened this issue 1 year ago • 8 comments

Describe the bug I'm trying to run a script (provided below) which basically should connect to kafka instance and subscribe to a few topics. But it shows few errors about not being able to connect to the broker and then it shows issues connecting the consumer and then connection timeout error.

const { Kafka } = require('kafkajs');
const dotenv = require('dotenv');

dotenv.config();

const connect = async () => {
  const kafka = new Kafka({
    brokers: [process.env.KAFKA_BROKER],
    sasl: {
      mechanism: 'scram-sha-256',
      username: process.env.KAFKA_USER_NAME,
      password: process.env.KAFKA_PASSWORD,
    },
    ssl: true,
  });

  const producer = kafka.producer();
  await producer.connect();

  const consumer = kafka.consumer({ groupId: 'health-consumer' });
  await consumer.connect();

  consumer.on(consumer.events.DISCONNECT, async () => {
    console.log('Consumer disconnected, reconnecting...');
    await consumer.connect();
  });

  return {
    producer, consumer,
  };
};

const run = async () => {
  const { consumer } = await connect();
  await consumer.subscribe({
    topics: [
      process.env.KAFKA_CONCOX_HEALTH_TOPIC,
      process.env.KAFKA_CONCOX_LOCATION_TOPIC,
      process.env.KAFKA_CONCOX_LOCATION_TOPIC_NO_ODOMETER,
    ],
  });

  await consumer.run({
    eachMessage: async ({ topic, message }) => {
      console.log(topic, ': ', message);
    },
  });
};

run().catch((e) => console.log(e));

Expected behavior The consumer should connect and then subscribe to the topics.

Observed behavior I'm getting this error.

{"level":"WARN","timestamp":"2023-03-21T10:17:40.239Z","logger":"kafkajs","message":"KafkaJS v2.0.0 switched default partitioner. To retain the same partitioning behavior as in previous versions, create the producer with the option \"createPartitioner: Partitioners.LegacyPartitioner\". See the migration guide at https://kafka.js.org/docs/migration-guide-v2.0.0#producer-new-default-partitioner for details. Silence this warning by setting the environment variable \"KAFKAJS_NO_PARTITIONER_WARNING=1\""}
{"level":"ERROR","timestamp":"2023-03-21T10:17:41.260Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:17:41.263Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":0,"retryTime":314}

{"level":"ERROR","timestamp":"2023-03-21T10:17:45.650Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:17:45.652Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":0,"retryTime":271}
{"level":"INFO","timestamp":"2023-03-21T10:17:50.657Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"health-consumer"}
{"level":"ERROR","timestamp":"2023-03-21T10:17:51.667Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-1-kafka.upstash.io:9093","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:17:51.669Z","logger":"kafkajs","message":"[Consumer] Crash: KafkaJSConnectionError: Connection timeout","groupId":"health-consumer","stack":"KafkaJSConnectionError: Connection timeout\n    at Timeout.onTimeout [as _onTimeout] (/home/ankit/concox-trans-gs05/node_modules/kafkajs/src/network/connection.js:223:23)\n    at listOnTimeout (node:internal/timers:559:17)\n    at processTimers (node:internal/timers:502:7)"}
{"level":"INFO","timestamp":"2023-03-21T10:17:51.670Z","logger":"kafkajs","message":"[Consumer] Stopped","groupId":"health-consumer"}
Consumer disconnected, reconnecting...
{"level":"ERROR","timestamp":"2023-03-21T10:17:51.672Z","logger":"kafkajs","message":"[Consumer] Restarting the consumer in 300ms","retryTime":300,"groupId":"health-consumer"}
{"level":"INFO","timestamp":"2023-03-21T10:17:51.974Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"health-consumer"}
{"level":"ERROR","timestamp":"2023-03-21T10:17:52.674Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:17:52.677Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":0,"retryTime":309}
{"level":"ERROR","timestamp":"2023-03-21T10:17:53.988Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:17:53.989Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":1,"retryTime":648}
{"level":"ERROR","timestamp":"2023-03-21T10:17:55.639Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:17:55.641Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":2,"retryTime":1162}
{"level":"ERROR","timestamp":"2023-03-21T10:17:57.804Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:17:57.806Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":3,"retryTime":2018}
{"level":"ERROR","timestamp":"2023-03-21T10:18:00.825Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:18:00.827Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":4,"retryTime":4136}
{"level":"ERROR","timestamp":"2023-03-21T10:18:05.963Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:18:05.965Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":5,"retryTime":8316}
{"level":"ERROR","timestamp":"2023-03-21T10:18:05.966Z","logger":"kafkajs","message":"[Consumer] Crash: KafkaJSNumberOfRetriesExceeded: Connection timeout","groupId":"health-consumer","retryCount":5,"stack":"KafkaJSNonRetriableError\n  Caused by: KafkaJSConnectionError: Connection timeout\n    at Timeout.onTimeout [as _onTimeout] (/home/ankit/concox-trans-gs05/node_modules/kafkajs/src/network/connection.js:223:23)\n    at listOnTimeout (node:internal/timers:559:17)\n    at processTimers (node:internal/timers:502:7)"}
{"level":"ERROR","timestamp":"2023-03-21T10:18:05.966Z","logger":"kafkajs","message":"[Consumer] Failed to execute listener: Connection timeout","eventName":"consumer.disconnect","stack":"KafkaJSNonRetriableError\n  Caused by: KafkaJSConnectionError: Connection timeout\n    at Timeout.onTimeout [as _onTimeout] (/home/ankit/concox-trans-gs05/node_modules/kafkajs/src/network/connection.js:223:23)\n    at listOnTimeout (node:internal/timers:559:17)\n    at processTimers (node:internal/timers:502:7)"}
{"level":"INFO","timestamp":"2023-03-21T10:18:05.967Z","logger":"kafkajs","message":"[Consumer] Stopped","groupId":"health-consumer"}
Consumer disconnected, reconnecting...
{"level":"ERROR","timestamp":"2023-03-21T10:18:05.967Z","logger":"kafkajs","message":"[Consumer] Restarting the consumer in 8316ms","retryCount":5,"retryTime":8316,"groupId":"health-consumer"}
{"level":"ERROR","timestamp":"2023-03-21T10:18:06.967Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:18:06.968Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":0,"retryTime":349}
{"level":"ERROR","timestamp":"2023-03-21T10:18:08.319Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:18:08.319Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":1,"retryTime":568}
{"level":"ERROR","timestamp":"2023-03-21T10:18:09.888Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:18:09.890Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":2,"retryTime":1242}
{"level":"ERROR","timestamp":"2023-03-21T10:18:12.133Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:18:12.134Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":3,"retryTime":2188}
{"level":"INFO","timestamp":"2023-03-21T10:18:14.284Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"health-consumer"}
{"level":"ERROR","timestamp":"2023-03-21T10:18:15.323Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:18:15.325Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":4,"retryTime":4980}

To Reproduce I'm running this script that is just trying to connect the consumer, subscribe to a few channels and console log the topic and messages.

Environment:

  • OS: Linux Mint 19.3
  • KafkaJS version 2.2.3
  • Kafka version - Hosted Instance on Upstash
  • NodeJS version 16.15.0

ankit-rawani avatar Mar 21 '23 10:03 ankit-rawani

Hi @ankit-rawani, your kafka instance seems to be running fine. Can you make sure that your environment can connect to brokers ? I can see that I have connection to there with following commands.

❯ nc -vz ample-earwig-9318-us1-kafka.upstash.io 9092
Connection to ample-earwig-9318-us1-kafka.upstash.io port 9092 [tcp/XmlIpcRegSvc] succeeded!
~ on ☁️  (us-west-1)
❯ nc -vz ample-earwig-9318-us1-kafka.upstash.io 9093
Connection to ample-earwig-9318-us1-kafka.upstash.io port 9093 [tcp/*] succeeded!
~ on ☁️  (us-west-1)
❯ nc -vz ample-earwig-9318-us1-kafka.upstash.io 9094
Connection to ample-earwig-9318-us1-kafka.upstash.io port 9094 [tcp/*] succeeded!
❯ nc -vz ample-earwig-9318-us1-kafka.upstash.io 9095
Connection to ample-earwig-9318-us1-kafka.upstash.io port 9095 [tcp/*] succeeded!

sancar avatar Mar 28 '23 07:03 sancar

Hi @ankit-rawani , are you still experiencing the problem ? Were you able to check if your environment can reach to kafka as I suggested ?

sancar avatar Mar 30 '23 15:03 sancar

Hi @sancar I am having same issue and I tried your command to check if I can connect to my borkers and its successfully getting connect with all the brokers. Let me know what I should do further.

rishibakshidev avatar Aug 28 '23 08:08 rishibakshidev

@rishibakshidev Can you share the kafka-cluster id and/or the endpoint from the console so that I can check ? Also can you try creating a second cluster and see it works ? And please also share the logs that you see as sometimes the problem is hidden in small differences ?

sancar avatar Aug 30 '23 09:08 sancar

@rishibakshidev if it is Upstash related, you can ask your questions https://upstash.com/discord or you can use our chat support in https://upstash.com/ As you would guess we are not checking the issues in other repos regularly.

@tulios We can close this issue if you prefer.

sancar avatar Aug 30 '23 11:08 sancar

I have the same problem when I connect to a cluster on upstash. :/

izakdvlpr avatar Oct 05 '23 11:10 izakdvlpr

Add connectionTimeout:3000 while connecting and see if you are able to resolve the issue.

const kafkaConnect = new Kafka({ ...props, connectionTimeout: 3000, });

rishibakshidev avatar Oct 06 '23 01:10 rishibakshidev

@rishibakshidev Thanks, I was having same issue but adding connectionTimeout somehow fixed it, Thanks dude!

kushagra-nt avatar May 18 '24 07:05 kushagra-nt