kafkajs
kafkajs copied to clipboard
Issue in connecting consumer
Describe the bug I'm trying to run a script (provided below) which basically should connect to kafka instance and subscribe to a few topics. But it shows few errors about not being able to connect to the broker and then it shows issues connecting the consumer and then connection timeout error.
const { Kafka } = require('kafkajs');
const dotenv = require('dotenv');
dotenv.config();
const connect = async () => {
const kafka = new Kafka({
brokers: [process.env.KAFKA_BROKER],
sasl: {
mechanism: 'scram-sha-256',
username: process.env.KAFKA_USER_NAME,
password: process.env.KAFKA_PASSWORD,
},
ssl: true,
});
const producer = kafka.producer();
await producer.connect();
const consumer = kafka.consumer({ groupId: 'health-consumer' });
await consumer.connect();
consumer.on(consumer.events.DISCONNECT, async () => {
console.log('Consumer disconnected, reconnecting...');
await consumer.connect();
});
return {
producer, consumer,
};
};
const run = async () => {
const { consumer } = await connect();
await consumer.subscribe({
topics: [
process.env.KAFKA_CONCOX_HEALTH_TOPIC,
process.env.KAFKA_CONCOX_LOCATION_TOPIC,
process.env.KAFKA_CONCOX_LOCATION_TOPIC_NO_ODOMETER,
],
});
await consumer.run({
eachMessage: async ({ topic, message }) => {
console.log(topic, ': ', message);
},
});
};
run().catch((e) => console.log(e));
Expected behavior The consumer should connect and then subscribe to the topics.
Observed behavior I'm getting this error.
{"level":"WARN","timestamp":"2023-03-21T10:17:40.239Z","logger":"kafkajs","message":"KafkaJS v2.0.0 switched default partitioner. To retain the same partitioning behavior as in previous versions, create the producer with the option \"createPartitioner: Partitioners.LegacyPartitioner\". See the migration guide at https://kafka.js.org/docs/migration-guide-v2.0.0#producer-new-default-partitioner for details. Silence this warning by setting the environment variable \"KAFKAJS_NO_PARTITIONER_WARNING=1\""}
{"level":"ERROR","timestamp":"2023-03-21T10:17:41.260Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:17:41.263Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":0,"retryTime":314}
{"level":"ERROR","timestamp":"2023-03-21T10:17:45.650Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:17:45.652Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":0,"retryTime":271}
{"level":"INFO","timestamp":"2023-03-21T10:17:50.657Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"health-consumer"}
{"level":"ERROR","timestamp":"2023-03-21T10:17:51.667Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-1-kafka.upstash.io:9093","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:17:51.669Z","logger":"kafkajs","message":"[Consumer] Crash: KafkaJSConnectionError: Connection timeout","groupId":"health-consumer","stack":"KafkaJSConnectionError: Connection timeout\n at Timeout.onTimeout [as _onTimeout] (/home/ankit/concox-trans-gs05/node_modules/kafkajs/src/network/connection.js:223:23)\n at listOnTimeout (node:internal/timers:559:17)\n at processTimers (node:internal/timers:502:7)"}
{"level":"INFO","timestamp":"2023-03-21T10:17:51.670Z","logger":"kafkajs","message":"[Consumer] Stopped","groupId":"health-consumer"}
Consumer disconnected, reconnecting...
{"level":"ERROR","timestamp":"2023-03-21T10:17:51.672Z","logger":"kafkajs","message":"[Consumer] Restarting the consumer in 300ms","retryTime":300,"groupId":"health-consumer"}
{"level":"INFO","timestamp":"2023-03-21T10:17:51.974Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"health-consumer"}
{"level":"ERROR","timestamp":"2023-03-21T10:17:52.674Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:17:52.677Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":0,"retryTime":309}
{"level":"ERROR","timestamp":"2023-03-21T10:17:53.988Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:17:53.989Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":1,"retryTime":648}
{"level":"ERROR","timestamp":"2023-03-21T10:17:55.639Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:17:55.641Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":2,"retryTime":1162}
{"level":"ERROR","timestamp":"2023-03-21T10:17:57.804Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:17:57.806Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":3,"retryTime":2018}
{"level":"ERROR","timestamp":"2023-03-21T10:18:00.825Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:18:00.827Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":4,"retryTime":4136}
{"level":"ERROR","timestamp":"2023-03-21T10:18:05.963Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:18:05.965Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":5,"retryTime":8316}
{"level":"ERROR","timestamp":"2023-03-21T10:18:05.966Z","logger":"kafkajs","message":"[Consumer] Crash: KafkaJSNumberOfRetriesExceeded: Connection timeout","groupId":"health-consumer","retryCount":5,"stack":"KafkaJSNonRetriableError\n Caused by: KafkaJSConnectionError: Connection timeout\n at Timeout.onTimeout [as _onTimeout] (/home/ankit/concox-trans-gs05/node_modules/kafkajs/src/network/connection.js:223:23)\n at listOnTimeout (node:internal/timers:559:17)\n at processTimers (node:internal/timers:502:7)"}
{"level":"ERROR","timestamp":"2023-03-21T10:18:05.966Z","logger":"kafkajs","message":"[Consumer] Failed to execute listener: Connection timeout","eventName":"consumer.disconnect","stack":"KafkaJSNonRetriableError\n Caused by: KafkaJSConnectionError: Connection timeout\n at Timeout.onTimeout [as _onTimeout] (/home/ankit/concox-trans-gs05/node_modules/kafkajs/src/network/connection.js:223:23)\n at listOnTimeout (node:internal/timers:559:17)\n at processTimers (node:internal/timers:502:7)"}
{"level":"INFO","timestamp":"2023-03-21T10:18:05.967Z","logger":"kafkajs","message":"[Consumer] Stopped","groupId":"health-consumer"}
Consumer disconnected, reconnecting...
{"level":"ERROR","timestamp":"2023-03-21T10:18:05.967Z","logger":"kafkajs","message":"[Consumer] Restarting the consumer in 8316ms","retryCount":5,"retryTime":8316,"groupId":"health-consumer"}
{"level":"ERROR","timestamp":"2023-03-21T10:18:06.967Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:18:06.968Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":0,"retryTime":349}
{"level":"ERROR","timestamp":"2023-03-21T10:18:08.319Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:18:08.319Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":1,"retryTime":568}
{"level":"ERROR","timestamp":"2023-03-21T10:18:09.888Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:18:09.890Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":2,"retryTime":1242}
{"level":"ERROR","timestamp":"2023-03-21T10:18:12.133Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:18:12.134Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":3,"retryTime":2188}
{"level":"INFO","timestamp":"2023-03-21T10:18:14.284Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"health-consumer"}
{"level":"ERROR","timestamp":"2023-03-21T10:18:15.323Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:18:15.325Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":4,"retryTime":4980}
To Reproduce I'm running this script that is just trying to connect the consumer, subscribe to a few channels and console log the topic and messages.
Environment:
- OS: Linux Mint 19.3
- KafkaJS version 2.2.3
- Kafka version - Hosted Instance on Upstash
- NodeJS version 16.15.0
Hi @ankit-rawani, your kafka instance seems to be running fine. Can you make sure that your environment can connect to brokers ? I can see that I have connection to there with following commands.
❯ nc -vz ample-earwig-9318-us1-kafka.upstash.io 9092
Connection to ample-earwig-9318-us1-kafka.upstash.io port 9092 [tcp/XmlIpcRegSvc] succeeded!
~ on ☁️ (us-west-1)
❯ nc -vz ample-earwig-9318-us1-kafka.upstash.io 9093
Connection to ample-earwig-9318-us1-kafka.upstash.io port 9093 [tcp/*] succeeded!
~ on ☁️ (us-west-1)
❯ nc -vz ample-earwig-9318-us1-kafka.upstash.io 9094
Connection to ample-earwig-9318-us1-kafka.upstash.io port 9094 [tcp/*] succeeded!
❯ nc -vz ample-earwig-9318-us1-kafka.upstash.io 9095
Connection to ample-earwig-9318-us1-kafka.upstash.io port 9095 [tcp/*] succeeded!
Hi @ankit-rawani , are you still experiencing the problem ? Were you able to check if your environment can reach to kafka as I suggested ?
Hi @sancar I am having same issue and I tried your command to check if I can connect to my borkers and its successfully getting connect with all the brokers. Let me know what I should do further.
@rishibakshidev Can you share the kafka-cluster id and/or the endpoint from the console so that I can check ? Also can you try creating a second cluster and see it works ? And please also share the logs that you see as sometimes the problem is hidden in small differences ?
@rishibakshidev if it is Upstash related, you can ask your questions https://upstash.com/discord or you can use our chat support in https://upstash.com/ As you would guess we are not checking the issues in other repos regularly.
@tulios We can close this issue if you prefer.
I have the same problem when I connect to a cluster on upstash. :/
Add connectionTimeout:3000
while connecting and see if you are able to resolve the issue.
const kafkaConnect = new Kafka({ ...props, connectionTimeout: 3000, });
@rishibakshidev Thanks, I was having same issue but adding connectionTimeout somehow fixed it, Thanks dude!