laravel-kafka icon indicating copy to clipboard operation
laravel-kafka copied to clipboard

[QUESTION] Consumer not working with stopAfterLastMessage enabled

Open eusonlito opened this issue 3 years ago • 0 comments

I have this code in a Laravel Command:

Kafka::createConsumer()
    ->subscribe($this->topic)
    ->withConsumerGroupId(config('kafka.consumer_group_id'))
    ->withHandler(fn (KafkaConsumerMessage $message) => $this->handler($handler, $message))
    ->stopAfterLastMessage()
    ->build()
    ->consume();

This command is executed in a cronjob every minute.

The problem is that never read any message from kafka, the pending queue offset is always in same position.

If I remove the stopAfterLastMessage option, the command will be executed without stop and then they can read the messages without problem.

Kafka::createConsumer()
    ->subscribe($this->topic)
    ->withConsumerGroupId(config('kafka.consumer_group_id'))
    ->withHandler(fn (KafkaConsumerMessage $message) => $this->handler($handler, $message))
    ->build()
    ->consume();

My config is:

<?php

return [
    /*
     | Your kafka brokers url.
     */
    'brokers' => env('KAFKA_BROKERS', 'localhost:9092'),

    /*
     | Kafka consumers belonging to the same consumer group share a group id.
     | The consumers in a group then divides the topic partitions as fairly amongst themselves as possible by
     | establishing that each partition is only consumed by a single consumer from the group.
     | This config defines the consumer group id you want to use for your project.
     */
    'consumer_group_id' => env('KAFKA_CONSUMER_GROUP_ID', 'group'),

    /*
     | After the consumer receives its assignment from the coordinator,
     | it must determine the initial position for each assigned partition.
     | When the group is first created, before any messages have been consumed, the position is set according to a configurable
     | offset reset policy (auto.offset.reset). Typically, consumption starts either at the earliest offset or the latest offset.
     | You can choose between "latest", "earliest" or "none".
     */
    'offset_reset' => env('KAFKA_OFFSET_RESET', 'earliest'),

    /*
     | If you set enable.auto.commit (which is the default), then the consumer will automatically commit offsets periodically at the
     | interval set by auto.commit.interval.ms.
     */
    'auto_commit' => env('KAFKA_AUTO_COMMIT', true),

    'sleep_on_error' => env('KAFKA_ERROR_SLEEP', 5),

    'partition' => env('KAFKA_PARTITION', -1),

    /*
     | Kafka supports 4 compression codecs: none , gzip , lz4 and snappy
     */
    'compression' => env('KAFKA_COMPRESSION_TYPE', 'snappy'),

    /*
     | Choose if debug is enabled or not.
     */
    'debug' => env('KAFKA_DEBUG', false),

    /*
     | Repository for batching messages together
     | Implement BatchRepositoryInterface to save batches in different storage
     */
    'batch_repository' => env('KAFKA_BATCH_REPOSITORY', \Junges\Kafka\BatchRepositories\InMemoryBatchRepository::class),
];

Any idea why can not read the pending queue messages when stopAfterLastMessage is enabled?

I can read the pending queue messages using the kafka command ./bin/kafka-console-consumer.sh --topic topic --bootstrap-server localhost:9092

Thanks!

eusonlito avatar Sep 29 '22 10:09 eusonlito