Consumption rate of messages from one topic effects the rate of consumption from another topic
Environment Information
- OS [ubuntu 24.10]:
- Node Version [20.16.0]:
- confluent-kafka-javascript version [1.0.0]:
Summary
The following code produces messages to 2 topics and then consumes from the 2 topics with partitionsConsumedConcurrently:2 . When consuming using eachBatch, only entries from the "slow topic" are consumed, blocking the consumption of the "fast topic". In other executions, some of the "fast topic" messages are consumed but are still later blocked by the consumption of "slow topic" messages.
The expected behavior would be to consume all the "fast topic" messages very quickly, and for the existence of another messages in another topic not effect the consumption rate of the fast topic.
Reproduce
import { KafkaJS } from "@confluentinc/kafka-javascript";
import { config } from "../src/config";
const kafka = new KafkaJS.Kafka({
kafkaJS: {
brokers: config.kafkaBrokersAddress.split(","),
},
});
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
const fastTopicName = "fast-topic5";
const slowTopicName = "slow-topic5";
const consumerGroup = "cg5";
(async () => {
const admin = kafka.admin();
await admin.connect();
await admin.createTopics({
topics: [
{
topic: fastTopicName,
numPartitions: 1,
},
{
topic: slowTopicName,
numPartitions: 1,
},
],
});
console.log("Partitions added successfully!");
await admin.disconnect();
const producer = kafka.producer();
await producer.connect();
for (let i = 0; i < 5; i++) {
const messages = [];
for (let q = 0; q < 50; q++) {
messages.push({ headers: {}, value: (q + i * 1000).toString() + " " + "a".repeat(1000) });
}
await producer.send({
topic: slowTopicName,
messages: messages,
});
await producer.send({
topic: fastTopicName,
messages: messages,
});
}
await producer.disconnect();
const consumer = kafka.consumer({
kafkaJS: {
groupId: consumerGroup,
maxWaitTimeInMs: 5000,
fromBeginning: true,
},
});
await consumer.connect();
await consumer.subscribe({ topics: [slowTopicName, fastTopicName] });
const messagesConsumed: Record<string, number> = { [fastTopicName]: 0, [slowTopicName]: 0 };
await consumer.run({
partitionsConsumedConcurrently: 2,
eachBatch: async ({ batch }) => {
console.log("handling batch", {
partition: batch.partition,
topic: batch.topic,
});
if (batch.topic === slowTopicName) {
await sleep(500);
}
messagesConsumed[batch.topic] += batch.messages.length;
console.log("finished batch", {
time: new Date(),
partition: batch.partition,
consumed: JSON.stringify(messagesConsumed),
});
},
});
await new Promise((resolve) => setTimeout(resolve, 30000));
await consumer.disconnect();
})();
Output
handling batch { partition: 0, topic: 'slow-topic1' }
finished batch {
time: 2025-01-22T17:00:55.045Z,
partition: 0,
consumed: '{"fast-topic1":0,"slow-topic1":1}'
}
handling batch { partition: 0, topic: 'slow-topic1' }
finished batch {
time: 2025-01-22T17:00:55.546Z,
partition: 0,
consumed: '{"fast-topic1":0,"slow-topic1":2}'
}
handling batch { partition: 0, topic: 'slow-topic1' }
finished batch {
time: 2025-01-22T17:00:56.048Z,
partition: 0,
consumed: '{"fast-topic1":0,"slow-topic1":4}'
}
...
@omer-riv looks like you are blocking main loop with await sleep(500)
try something like below to simulate slow|fast topics
function createMessages(batchIndex) {
const messages = [];
for (let q = 0; q < 50; q++) {
messages.push({
headers: {},
value: `${q + batchIndex * 1000} ${'a'.repeat(1000)}`,
});
}
return messages;
}
function sendBatches(topicName, intervalMs) {
let batchCount = 0;
const timerId = setInterval(async () => {
if (batchCount >= 5) {
clearInterval(timerId);
return;
}
const messages = createMessages(batchCount);
try {
await producer.send({ topic: topicName, messages });
console.log(`Batch ${batchCount + 1} sent to ${topicName}`);
} catch (err) {
console.error(`Error sending batch ${batchCount + 1} to ${topicName}:`, err);
}
batchCount++;
}, intervalMs);
}
sendBatches(slowTopicName, 2000);
sendBatches(fastTopicName, 500);