node-rdkafka icon indicating copy to clipboard operation
node-rdkafka copied to clipboard

Best consume method for handling back-pressure

Open Crispy1975 opened this issue 3 years ago • 8 comments

Hi. I've been using node-rdkafka for a little while and overall I am very pleased with how it works, great lib! I do have a question however that I am not 100% on the answer to. I have a consumer process that needs to be able to handle back-pressure effectively as I am doing ETL into a slower database cluster, so I don't want the consumer to be overwhelmed and go OOM. However, I am getting OOM issues periodically... I was looking through the library code on the JS side and saw this comment: https://github.com/Blizzard/node-rdkafka/blob/master/lib/kafka-consumer.js#L382

This suggests that using the consume() method is going to result in OOMs in situations like mine. I have set librdkafka settings such as queued.min.messages so it only grabs a smaller number of messages in the background. I've also built an internal queue in my process with a pause/resume mechanism to allow for more control. For the most part previous OOMs have much reduced, however as mentioned I still do get them. Below is a skeleton version of what I have running, comments on any issues or reasons for the OOM would be greatly appreciated.

let isPaused = false;
let consumerRunCheck: ReturnType<typeof setTimeout>;

try {
    // Kafka consumer tracking
    let processingQueue: Array<Kafka.Message> = []; // Internal message queue
    const queueStatus = new EventEmitter;    // Init internal batch queue emitter
    let lastBatchTime: number = Date.now();

    const onRebalance = async function onRebalance(err: Kafka.LibrdKafkaError, assignments: Array<Kafka.Assignment>): Promise<void> {
        try {
            if (err.code === Kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
                await consumer.assign(assignments);

            } else if (err.code === Kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS) {
                processingQueue = [];
                await consumer.unassign();

            } else {
                console.error(`Re-balance error: ${err.message}` });
            }

        } catch (err) {
            handleErrors(err);
        }
    }

    // Create the Kafka consumer
    const consumer: Kafka.KafkaConsumer = new Kafka.KafkaConsumer({
        ...consumerConfig,
        'rebalance_cb': onRebalance
    }, topicConfig);

    // Connect to the Kafka broker(s)
    consumer.connect();

    consumer.on('ready', () => {
        consumer.subscribe(transformConfig.topics);
        consumer.setDefaultConsumeTimeout(transformConfig.defaultConsumeTimeout);
        consumer.consume();
    });

    // Data event detected, push to the internal processing queue
    consumer.on('data', (data: Kafka.Message) => {
        processingQueue.push(data);
        // Check to see if internal queue is full and trigger batch processing
        if (isPaused === false && processingQueue.length >= transformConfig.internalQueueMax) {
            consumer.pause(consumer.assignments());
            isPaused = true;
            queueStatus.emit('batchReady');
        }
    });

    // Batch read for processing event detected
    queueStatus.on('batchReady', () => {
        // Process the batch of messages
        return messageProcessing(processingQueue).catch((err) => {
            handleErrors(err);

        }).finally(() => {
            // Free the internal queue memory for the next batch
            processingQueue.length = 0;
            // Set the last batch time
            lastBatchTime = Date.now();
            // Commit offsets
            consumer.commit();
            // Resume consuming messages
            consumer.resume(consumer.assignments());
            isPaused = false;
        });
    });

    // Check to see if we need to restart things
    consumerRunCheck = setInterval(() => {
        console.log(`Internal processing queue has ${processingQueue.length} messages waiting (isPaused: ${isPaused}).`);
        // Calculate the last time we saw a batch
        const lastBatchDiff: number = Math.floor((Date.now() - lastBatchTime)/1000);
        if (isPaused && lastBatchDiff >= 60) {
            console.log(`Consumer appears to be stuck, unpausing.`);
            // Resume consuming messages
            consumer.resume(consumer.assignments());
            isPaused = false;
        }
    }, 60000);

} catch (err) {
    console.error(err.message);
    process.exit(1);
}

The consumer settings are as follows:

export const consumerConfig: ConsumerGlobalConfig = {
    'client.id': `my-client`,
    'group.instance.id': `my-client-instance-${Date.now()}`,
    'metadata.broker.list': brokers,
    'group.id': `my-events`,
    'session.timeout.ms': 30000,
    'heartbeat.interval.ms': 3000,
    'enable.auto.commit': false,
    'queued.min.messages': 10,
    'queued.max.messages.kbytes': 65536,
    'fetch.message.max.bytes': 1048576,
    'fetch.max.bytes': 1048576
};

I am considering switching from the style listed above to making use of the callback on the consume() method but before that it would be interesting to see if there is something obvious I am doing (or not) with the above code.

Crispy1975 avatar Mar 20 '21 18:03 Crispy1975

Hi,

In our company wa are using the consumer(non-following mode) and work fine to handle back-pressure (with the async module or with out it).

  consumerOnReady(consumer) {
          logger.debug(`[Consumer] - consumerEvents -  consumer ready.`);
          logger.info(`[Consumer] - consumerEvents - Subscribing to Topic: '${kafka.topic.split(",")}'`);
          consumer.subscribe(kafka.topic.split(","));
          consumer.consume(this.maxPollRecords, this.onData);
          this.start(consumer);
          consumerInstance = consumer;
      }
    onDataCallback(consumer, err, msg) {
        try {
            if (err) {
                if (err.message === failuresMessage.CONSUMER_NOT_CONNECTED) process.exit(1);
                else logger.error(`[Consumer] - onDataCallback - error: ${err}`);
            }
            if (msg && msg.length) {
                logger.debug(`[Consumer] - onDataCallback - poll returns: ${msg.length} Records`);
                msg.forEach(record => this.notifyStartProcessing(record));
                this.q.push(msg);
            }
            if (this.q.length() > queue.asyncMaxQueueSize) {
                consumer.pause(consumer.assignments());
                this.paused = true;
                logger.debug(`[Consumer] - onDataCallback - consumer paused`);
            } else {
                if (this.paused) {
                    logger.debug(`[Consumer] - onDataCallback - consumer resumed`);
                    this.paused = false;
                    consumer.resume(consumer.assignments());
                }
                consumer.consume(this.maxPollRecords, this.onData);
            }
        } catch (error) {
            logger.error(`[Consumer] - onDataCallback - handle error: ${error}`);
        }
    }
    queueOnDrain(consumer) {
        logger.debug(`[Consumer] - consumerEvents - queue drain`);
        logger.debug(`[Consumer] - consumerEvents - is paused ${this.paused}`);
        if (this.paused) {
            this.paused = false;
            consumer.resume(consumer.assignments());
            logger.debug(`[Consumer] - consumerEvents - consumer resumed`);
        }
        consumer.consume(this.maxPollRecords, this.onData);
    }

    /**
     * Kafka consumser event listner, push on the queue all event received from kafka
     * The queue is used to handle backpressure, if the length of the queue is greatter than the asyncMaxQueueSize the consumer is paused,
     * will be resumed only when the queue is drained (all events are processed successfully or errored).
     */
    consumerEvents() {
        const consumer = new rdKafka.KafkaConsumer(ConsumerConfig.globalConfig(this), ConsumerConfig.topicConfig());
        consumer.setDefaultConsumeTimeout(kafka.consumerDefaultTimeout);
        consumer.connect({ timeout: kafka.connectionTimeout }, (err) => this.consumerOnConnect(err));
        consumer.on('ready', () => this.consumerOnReady(consumer));
        consumer.on('warning', warn => logger.warn(`[Consumer] - consumerEvents - warning ${JSON.stringify(warn)}`));
        consumer.on('event.log', log => logger.warn(`[Consumer] - consumerEvents - event.log ${JSON.stringify(log)}`)); //logging debug messages, if debug is enabled
        consumer.on('event.error', err => logger.error(`[Consumer] - consumerEvents - event.error ${JSON.stringify(err)}`)); //logging all errors
        consumer.on('disconnected', (arg) => logger.info(`[Consumer] - consumerEvents - consumer disconnected:  ${JSON.stringify(arg)}`));
        this.onData = (err, msg) => this.onDataCallback(consumer, err, msg);
        this.q.drain(() => this.queueOnDrain(consumer));
        this.q.error((err, task) => logger.error(`[Consumer] - consumerEvents - async queue error: ${err}`, { task, stackTrace: err.stack }));
    }

syahiaoui avatar Mar 24 '21 18:03 syahiaoui

Thanks for the example @syahiaoui - I suspect I have a leak somewhere else after the consumer code... I will dig into that.

Crispy1975 avatar Mar 26 '21 10:03 Crispy1975

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs.

stale[bot] avatar Jun 26 '21 02:06 stale[bot]

@syahiaoui I think if you start using streams api instead of the traditional consumer, you don't need to handle this back pressure manually. Please correct me if I am wrong.

sathyarajagopal avatar Jul 07 '21 18:07 sathyarajagopal

@sathyarajagopal The consumer's stream API extends the native class of Readable, but in the past, there was a problem of not stopping reading messages when the internal buffer has reached the threshold of highWaterMark. (I don't know if this has been fixed)

syahiaoui avatar Aug 10 '21 13:08 syahiaoui

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs.

stale[bot] avatar Jan 08 '22 22:01 stale[bot]

@syahiaoui do you have the full code to the great example you provided of the consumer(non-following mode)? Thanks again for this, it's very elegant!

iklotzko avatar Feb 09 '22 16:02 iklotzko

@syahiaoui I was able to get it working, thanks, it's a very clear solution for our problem!

iklotzko avatar Feb 09 '22 18:02 iklotzko