cookie-cutter
cookie-cutter copied to clipboard
Two kafka sources with different consumer group names
We have a service that is reading from two kafka topics on the same kafka cluster using two different consumer groups. This is needed because topics are encoded differently. It's configured like so:
const kafkaJsonConfig: IKafkaBrokerConfiguration & IKafkaSubscriptionConfiguration = {
broker: "commonBroker",
group: "consumerGroup_json",
topics: "topic1",
encoder: ProtoJsonMessageEncoder(),
};
const kafkaProtoConfig: IKafkaBrokerConfiguration & IKafkaSubscriptionConfiguration = {
broker: "commonBroker",
group: "consumerGroup_proto",
topics: "topic2",
encoder: ProtoMessageEncoder(),
};
...
.input()
.add(kafkaSource(kafkaJsonConfig))
.add(kafkaSource(kafkaProtoConfig))
.done()
...
This works well most of the time, but when kafka cluster goes down and then recovers, this service looses connection to the cluster and never recovers like other more traditional services. The errors we see are like this:
{"level":"ERROR","timestamp":"2020-09-27T13:12:11.194Z","logger":"kafkajs","message":"[Connection] Response JoinGroup(key: 11, version: 2)","broker":"[broker]","clientId":"[client ID]","error":"The coordinator is loading and hence can't process requests for this group","correlationId":0,"size":24}
{"level":"ERROR","timestamp":"2020-09-27T13:12:11.194Z","logger":"kafkajs","message":"[Consumer] Crash: KafkaJSProtocolError: The coordinator is loading and hence can't process requests for this group","groupId":"consumerGroup_json","stack":"KafkaJSProtocolError: The coordinator is loading and hence can't process requests for this group
at createErrorFromCode (/usr/src/node_modules/kafkajs/src/protocol/error.js:537:10)
at Object.parse (/usr/src/node_modules/kafkajs/src/protocol/requests/joinGroup/v0/response.js:37:11)
at Connection.send (/usr/src/node_modules/kafkajs/src/network/connection.js:311:35)
at runMicrotasks (<anonymous>)
at processTicksAndRejections (internal/process/task_queues.js:93:5)
at async Broker.joinGroup (/usr/src/node_modules/kafkajs/src/broker/index.js:351:12)
at async ConsumerGroup.join (/usr/src/node_modules/kafkajs/src/consumer/consumerGroup.js:93:23)
at async /usr/src/node_modules/kafkajs/src/consumer/runner.js:51:9
at async Runner.start (/usr/src/node_modules/kafkajs/src/consumer/runner.js:105:7)
at async start (/usr/src/node_modules/kafkajs/src/consumer/index.js:230:7)"}