kafkajs icon indicating copy to clipboard operation
kafkajs copied to clipboard

Multiple consumers in consumer group breaks all consumers in group?

Open ngalchemist opened this issue 1 year ago • 2 comments

Describe the bug I have a Nodejs app running in a Docker container that creates multiple consumers on startup. Each one of these consumers is for a a different topic with a single partition. All of these topics are added to the same group. When the second consumer is added to the group, I receive the logs below and neither consumer starts consuming messages. The log group in the example is 'log-sub-group' and the two topics are 'tracing.enterprise.connectionbus-update.v1' and 'tracing.enterprise.microflowengine.v1'.

{"level":"INFO","timestamp":"2023-08-04T21:58:33.820Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"log-sub-group"}
{"level":"ERROR","timestamp":"2023-08-04T21:58:38.836Z","logger":"kafkajs","message":"[Connection] Response Heartbeat(key: 12, version: 3)","broker":"kafka:9093","clientId":"log-subscriber-consumer","error":"The group is rebalancing, so a rejoin is needed","correlationId":9,"size":10}
{"level":"WARN","timestamp":"2023-08-04T21:58:38.837Z","logger":"kafkajs","message":"[Runner] The group is rebalancing, re-joining","groupId":"log-sub-group","memberId":"log-subscriber-consumer-f40e98b5-4519-4755-b89e-8b0d2976d9eb","error":"The group is rebalancing, so a rejoin is needed"}
{"level":"WARN","timestamp":"2023-08-04T21:58:38.841Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer group received unsubscribed topics","groupId":"log-sub-group","generationId":6,"memberId":"log-subscriber-consumer-022c92b6-65dd-4b29-8ac5-22c10e71404a","assignedTopics":["tracing.enterprise.connectionbus-update.v1"],"topicsSubscribed":["tracing.enterprise.microflowengine.v1"],"topicsNotSubscribed":["tracing.enterprise.connectionbus-update.v1"],"helpUrl":"https://kafka.js.org/docs/faq#why-am-i-receiving-messages-for-topics-i-m-not-subscribed-to"}
{"level":"INFO","timestamp":"2023-08-04T21:58:38.842Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"log-sub-group","memberId":"log-subscriber-consumer-022c92b6-65dd-4b29-8ac5-22c10e71404a","leaderId":"log-subscriber-consumer-f40e98b5-4519-4755-b89e-8b0d2976d9eb","isLeader":false,"memberAssignment":{},"groupProtocol":"RoundRobinAssigner","duration":5022}
{"level":"INFO","timestamp":"2023-08-04T21:58:38.842Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"log-sub-group","memberId":"log-subscriber-consumer-f40e98b5-4519-4755-b89e-8b0d2976d9eb","leaderId":"log-subscriber-consumer-f40e98b5-4519-4755-b89e-8b0d2976d9eb","isLeader":true,"memberAssignment":{},"groupProtocol":"RoundRobinAssigner","duration":5}

To Reproduce

import { Kafka } from 'kafkajs';

const kafka = new Kafka(); // Real config here
let consumers = [{Topic: 'A', Group: 'my-group'}, {Topic: 'B', Group: 'my-group'}];
for (let consumer of consumers) {
   const consumer = kafka.consumer({ groupId: consumer.Group });
   await consumer.connect();
   await consumer.subscribe({ topics: [consumer.Topic] });
   await consumer.run({ eachMessage: () => console.log('Message received!') });
}

Expected behavior The consumer should be able to run two consumers for two different topics in the same group.

Observed behavior I receive the error in the description above when running the code.

Environment:

  • OS: [linux/amd64]
  • KafkaJS version [2.2.4]
  • Kafka version [e.g. 3.4.0]
  • NodeJS version [e.g. 18.16.0]

Additional context I'm not sure if KafkaJs doesn't support this or if my implementation is incorrect. The docs here say that the warning "likely means that some members of your consumer group are subscribed to some topics, and some other members of the group are subscribed to a different set of topics" which makes me think I'm only supposed to be consuming for one topic on all consumers in a consumer group. But I haven't had to do this with the Java consumer implementations I've built.

ngalchemist avatar Aug 04 '23 23:08 ngalchemist