php-rdkafka icon indicating copy to clipboard operation
php-rdkafka copied to clipboard

Batch processing

Open sauravaggarwal opened this issue 2 years ago • 5 comments

Description

Need to consume messages in batches with auto re balancing

sauravaggarwal avatar Apr 26 '22 10:04 sauravaggarwal

Hi,

Thanks for update.Using below configuration for consuming the messages. But getting single message.

    $conf = new \RdKafka\Conf();
    $conf->set('group.id', 'testing');
    $conf->set('metadata.broker.list', config('kafka'));
    $conf->set('sasl.username',config('kafka'));
    $conf->set('sasl.password',config('kafka'));
    $conf->set('sasl.mechanisms',config('kafka'));
    $conf->set('security.protocol',config('kafka'));
    $conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
        switch ($err) {
            case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                echo "Assign: ";
                var_dump($partitions);
                $kafka->assign($partitions);
                break;

             case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                 echo "Revoke: ";
                 var_dump($partitions);
                 $kafka->assign(NULL);
                 break;

             default:
                throw new \Exception($err);
        }
    });
    $consumer = new \RdKafka\KafkaConsumer($conf);
    $consumer->subscribe(['test']);
    while (true) {
        $message = $consumer->consume(120*1000);
        switch ($message->err) {
            case RD_KAFKA_RESP_ERR_NO_ERROR:
                var_dump($message);
                break;
            case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                echo "No more messages; will wait for more\n";
                break;
            case RD_KAFKA_RESP_ERR__TIMED_OUT:
                echo "Timed out\n";
                break;
            default:
                throw new \Exception($message->errstr(), $message->err);
                break;
        }
    }

Also if we are using multiple consumer to consume the messages. We are passing all the partition to both the worker, then both the worker are consuming the same data.Do we any any process to check if one consumer partition is running then we can block it running from another consumer? Do you have any reference link to implement the same with example?

sauravaggarwal avatar Apr 28 '22 17:04 sauravaggarwal

batchConsume is from the legacy api (low level consumer) and in general it is not advised to use it. The high level consumer, hides a lot of the internal logic and does a very good job handling multiple consumer for / topics / partitions on it's own so you don't need to care about that. Furthermore, consuming in batch is not really in the Kafka spirit, since Kafka is all about streaming constant data. If you want to implement this, you should do it in your project. I saw your question here, same goes for this library, but you are free the create your own consumer class based on AbstractKafkaConsumer and implement that functionality.

nick-zh avatar Apr 28 '22 17:04 nick-zh

@arnaud-lb if you want to explore if this could be implemented, there is an approach mentioned here, but i think doing this in userland should just be fine :smiley:

nick-zh avatar Apr 28 '22 17:04 nick-zh

Hi, We are using low-level consumer for batch processing. Can we identify if any consumer is running which is consuming the messages from that partition? Basically in case of multiple consumer need to divide the load on both the consumer and If we are running multiple consumer and reading the data from same topic and partition in low level consumer then there is data redundancy. Need to avoid that.

sauravaggarwal avatar Apr 30 '22 05:04 sauravaggarwal