laravel-kafka
laravel-kafka copied to clipboard
[QUESTION] Example Using Custom Commiter
Hi, thanks for creating this package. I'm using this and works great so far. But I'm having difficulty to find some example for using custom commiter in this documentation https://junges.dev/documentation/laravel-kafka/v1.7/advanced-usage/4-custom-committers
Can you help to give code example for using this method?
Hey @tharyono what's your use case for custom commiter exactly?
Hi @mateusjunges I want to do manual commit after I make sure all processes in my consumer has been proceed successfully. Because right now, if something wrong happen (eg. error happen when insert data to database), the message is consumed, and I cannot reprocess it again.
I kinda had the same requirements not to commit the offset if an exception was thrown in my handler did this using a custom CommiterFactory added during building the consumer. I was a bit confused about that existing $success parameter, which wasn't used in the existing committer implementations.
$consumer = Kafka::createConsumer()
->usingCommitterFactory(new AppCommitterFactory())
->build();
example factory implementation:
<?php
declare(strict_types=1);
namespace App\Kafka;
use Junges\Kafka\Commit\Contracts\Committer as CommitterContract;
use Junges\Kafka\Commit\Contracts\CommitterFactory as CommitterFactoryContract;
use Junges\Kafka\Commit\NativeSleeper;
use Junges\Kafka\Commit\RetryableCommitter;
use Junges\Kafka\Config\Config;
use RdKafka\KafkaConsumer;
class AppCommitterFactory implements CommitterFactoryContract
{
public function make(KafkaConsumer $kafkaConsumer, Config $config): CommitterContract
{
return new RetryableCommitter(
new SuccessCommitter(
$kafkaConsumer
),
new NativeSleeper(),
$config->getMaxCommitRetries()
);
}
}
example committer implementation:
<?php
declare(strict_types=1);
namespace App\Kafka;
use Junges\Kafka\Commit\Contracts\Committer as CommitterContract;
use RdKafka\KafkaConsumer;
use RdKafka\Message;
class SuccessCommitter implements CommitterContract
{
public function __construct(
private KafkaConsumer $consumer,
) {
}
public function commitMessage(Message $message, bool $success): void
{
/**
* only commit offset for messages that are handled without an exception being thrown:
* @see \Junges\Kafka\Consumers\Consumer::executeMessage
**/
if ($success) {
$this->consumer->commit($message);
}
}
public function commitDlq(Message $message): void
{
$this->consumer->commit($message);
}
}
Hey @tharyono I added a example to the docs. Pls let me know if it helps :slightly_smiling_face:
https://junges.dev/documentation/laravel-kafka/v1.9/advanced-usage/4-custom-committers