pulsar-client-node
pulsar-client-node copied to clipboard
Advice on increasing consumer concurrency & throughput
Hello, we need some advice on how to increase throughput in our Pulsar consumers. Here are some details:
- We run 1 consumer per pod in a kubernetes cluster
- We run in the
Sharedsubscription mode. Strict ordering does not matter for us - To keep debugging simple we have not enabled batching
- We've been using the
listenerpattern
We've found that our messages are being processed sequentially, which leads to poor throughput. We need to speed things up a bit. What we are wondering is what is the recommended way to do so. I've attached two options we are considering below
- Increases the number of consumers and uses the listener pattern
- Uses the receiver pattern with multiple workers
We'd like to understand what the community considers best practice and why. Thank you :)
Running multiple consumers per pod
import { faker } from '@faker-js/faker';
import Pulsar from 'pulsar-client';
process.env.ENVIRONMENT = 'development';
process.env.PULSAR_SERVICE_URL = 'pulsar://localhost:6650';
const PULSAR_TOPIC = `test-${faker.string.alpha(10)}`;
const PULSAR_SUBSCRIPTION = `sub-${PULSAR_TOPIC}`;
const CONCURRENCY = 5;
const SEND_NUMBER = 10;
async function handleMessage(
message: Pulsar.Message,
consumer: Pulsar.Consumer,
): Promise<void> {
console.log('Received message: ', message.getData().toString());
await new Promise((resolve) => setTimeout(resolve, 1000));
await consumer.acknowledge(message);
}
async function main() {
const client = new Pulsar.Client({
serviceUrl: process.env.PULSAR_SERVICE_URL as string,
log: logconfig(),
messageListenerThreads: CONCURRENCY,
});
console.log('Topic: ', PULSAR_TOPIC);
console.log('Subscription: ', PULSAR_SUBSCRIPTION);
// Create the main consumer
const consumers = [];
const counter = new Map<string, number>();
const subscriptionType = 'Shared';
const ackTimeoutMs = 10_000;
const nAckRedeliverTimeoutMs = 2_000;
const batchIndexAckEnabled = false;
for (let i = 0; i < CONCURRENCY; i += 1) {
const consumer = await client.subscribe({
topic: PULSAR_TOPIC,
subscription: PULSAR_SUBSCRIPTION,
subscriptionType,
ackTimeoutMs,
nAckRedeliverTimeoutMs,
receiverQueueSize: 10,
batchIndexAckEnabled,
listener: (message, consumer) => handleMessage(message, consumer),
});
consumers.push(consumer);
}
// Send messages
const producer = await client.createProducer({ topic: PULSAR_TOPIC });
for (let i = 0; i < SEND_NUMBER; i += 1) {
const msg = `test-message-${i}`;
counter.set(msg, 0);
await producer.send({ data: Buffer.from(msg) });
}
// Sleep 20 seconds to wait for the messages to be processed
await new Promise((resolve) => setTimeout(resolve, 50000));
await producer.close();
for (const consumer of consumers) {
await consumer.close();
}
process.exit(0);
}
void main();
function logconfig() {
return (level: any, _file: any, _line: any, message: any) => {
switch (level) {
case Pulsar.LogLevel.DEBUG:
console.debug(message);
break;
case Pulsar.LogLevel.INFO:
console.info(message);
break;
case Pulsar.LogLevel.WARN:
console.warn(message);
break;
case Pulsar.LogLevel.ERROR:
console.error(message);
break;
}
};
}
Increasing concurrency per consumer
import Pulsar from 'pulsar-client';
import logger from '../../utils/logger';
process.env.ENVIRONMENT = 'development';
process.env.PULSAR_SERVICE_URL = 'pulsar://localhost:6650';
const PULSAR_TOPIC = `test-${faker.string.alpha(10)}`;
const PULSAR_SUBSCRIPTION = `sub-${PULSAR_TOPIC}`;
const CONCURRENCY = 5;
const SEND_NUMBER = 10;
async function handleMessage(
message: Pulsar.Message,
consumer: Pulsar.Consumer,
): Promise<void> {
console.log('Received message: ', message.getData().toString());
await new Promise((resolve) => setTimeout(resolve, 1000));
await consumer.acknowledge(message);
}
async function main() {
const client = new Pulsar.Client({
serviceUrl: process.env.PULSAR_SERVICE_URL as string,
log: logconfig()
});
console.log('Topic: ', PULSAR_TOPIC);
console.log('Subscription: ', PULSAR_SUBSCRIPTION);
// Create the main consumer
const consumers = [];
const counter = new Map<string, number>();
const subscriptionType = 'Shared';
const ackTimeoutMs = 10_000;
const nAckRedeliverTimeoutMs = 2_000;
const batchIndexAckEnabled = false;
const consumer = await client.subscribe({
topic: PULSAR_TOPIC,
subscription: PULSAR_SUBSCRIPTION,
subscriptionType,
ackTimeoutMs,
nAckRedeliverTimeoutMs,
receiverQueueSize: 10,
batchIndexAckEnabled,
});
await listen(
consumer,
async (consumer, message) => handleMessage(message, consumer),
CONCURRENCY,
);
// Send messages
const producer = await client.createProducer({ topic: PULSAR_TOPIC });
for (let i = 0; i < SEND_NUMBER; i += 1) {
const msg = `test-message-${i}`;
counter.set(msg, 0);
await producer.send({ data: Buffer.from(msg) });
}
// Sleep 20 seconds to wait for the messages to be processed
await new Promise((resolve) => setTimeout(resolve, 50000));
await producer.close();
await consumer.close();
process.exit(0);
}
void main();
/**
* Receive messages from a Pulsar consumer and process them concurrently.
*
* @param consumer - Pulsar consumer to receive messages from.
* @param listener - Message handler function.
* @param concurrency - Maximum number of messages to process at a time.
*/
export async function listen(
consumer: Pulsar.Consumer,
listener: (
consumer: Pulsar.Consumer,
message: Pulsar.Message,
) => Promise<void>,
concurrency = 1,
): Promise<void> {
const workers = new Array<Promise<void>>();
for (let i = 0; i < concurrency; i++) {
const worker = async () => {
for (;;) {
try {
const message = await consumer.receive();
await listener(consumer, message);
} catch (err: any) {
logger.error(`Message processing error: ${err.message}`);
}
}
};
workers.push(worker());
}
await Promise.all(workers);
}
function logconfig() {
return (level: any, _file: any, _line: any, message: any) => {
switch (level) {
case Pulsar.LogLevel.DEBUG:
console.debug(message);
break;
case Pulsar.LogLevel.INFO:
console.info(message);
break;
case Pulsar.LogLevel.WARN:
console.warn(message);
break;
case Pulsar.LogLevel.ERROR:
console.error(message);
break;
}
};
}