laravel-kafka icon indicating copy to clipboard operation
laravel-kafka copied to clipboard

[QUESTION] Example Using Custom Commiter

Open tharyono opened this issue 3 years ago • 2 comments

Hi, thanks for creating this package. I'm using this and works great so far. But I'm having difficulty to find some example for using custom commiter in this documentation https://junges.dev/documentation/laravel-kafka/v1.7/advanced-usage/4-custom-committers

Can you help to give code example for using this method?

tharyono avatar Aug 26 '22 08:08 tharyono

Hey @tharyono what's your use case for custom commiter exactly?

mateusjunges avatar Sep 03 '22 21:09 mateusjunges

Hi @mateusjunges I want to do manual commit after I make sure all processes in my consumer has been proceed successfully. Because right now, if something wrong happen (eg. error happen when insert data to database), the message is consumed, and I cannot reprocess it again.

tharyono avatar Sep 05 '22 08:09 tharyono

I kinda had the same requirements not to commit the offset if an exception was thrown in my handler did this using a custom CommiterFactory added during building the consumer. I was a bit confused about that existing $success parameter, which wasn't used in the existing committer implementations.

$consumer = Kafka::createConsumer()
    ->usingCommitterFactory(new AppCommitterFactory())
    ->build();

example factory implementation:

<?php

declare(strict_types=1);

namespace App\Kafka;

use Junges\Kafka\Commit\Contracts\Committer as CommitterContract;
use Junges\Kafka\Commit\Contracts\CommitterFactory as CommitterFactoryContract;
use Junges\Kafka\Commit\NativeSleeper;
use Junges\Kafka\Commit\RetryableCommitter;
use Junges\Kafka\Config\Config;
use RdKafka\KafkaConsumer;

class AppCommitterFactory implements CommitterFactoryContract
{
    public function make(KafkaConsumer $kafkaConsumer, Config $config): CommitterContract
    {
        return new RetryableCommitter(
            new SuccessCommitter(
                $kafkaConsumer
            ),
            new NativeSleeper(),
            $config->getMaxCommitRetries()
        );
    }
}

example committer implementation:

<?php

declare(strict_types=1);

namespace App\Kafka;

use Junges\Kafka\Commit\Contracts\Committer as CommitterContract;
use RdKafka\KafkaConsumer;
use RdKafka\Message;

class SuccessCommitter implements CommitterContract
{
    public function __construct(
        private KafkaConsumer $consumer,
    ) {
    }

    public function commitMessage(Message $message, bool $success): void
    {
        /**
         * only commit offset for messages that are handled without an exception being thrown:
         * @see \Junges\Kafka\Consumers\Consumer::executeMessage
         **/
        if ($success) {
            $this->consumer->commit($message);
        }
    }

    public function commitDlq(Message $message): void
    {
        $this->consumer->commit($message);
    }
}

deegital avatar Nov 03 '22 20:11 deegital

Hey @tharyono I added a example to the docs. Pls let me know if it helps :slightly_smiling_face:

https://junges.dev/documentation/laravel-kafka/v1.9/advanced-usage/4-custom-committers

mateusjunges avatar Nov 08 '22 19:11 mateusjunges