messenger-kafka icon indicating copy to clipboard operation
messenger-kafka copied to clipboard

Add a way to log statistics

Open jry25 opened this issue 2 years ago • 1 comments

Hi, I didn't find anything to log statistics.

I tried to set statistics.interval.ms and stats_cb but got errors.

Using

        statistics.interval.ms: '100'
        stats_cb: '@App\Messenger\StatsCallBack'

Result on

In PhpDumper.php line 1561:
                                                                                                                                                                      
  [Symfony\Component\DependencyInjection\Exception\InvalidArgumentException]                                                                                          
  You cannot dump a container with parameters that contain references to other services (reference to service "App\Messenger\StatsCallBack" found in "/0/stats_cb").  
                                                                                                                                                                      

Exception trace:
  at /srv/app/vendor/symfony/dependency-injection/Dumper/PhpDumper.php:1561
 Symfony\Component\DependencyInjection\Dumper\PhpDumper->exportParameters() at /srv/app/vendor/symfony/dependency-injection/Dumper/PhpDumper.php:1553
 Symfony\Component\DependencyInjection\Dumper\PhpDumper->exportParameters() at /srv/app/vendor/symfony/dependency-injection/Dumper/PhpDumper.php:1439
 Symfony\Component\DependencyInjection\Dumper\PhpDumper->addDefaultParametersMethod() at /srv/app/vendor/symfony/dependency-injection/Dumper/PhpDumper.php:221
 Symfony\Component\DependencyInjection\Dumper\PhpDumper->dump() at /srv/app/vendor/symfony/http-kernel/Kernel.php:780
 Symfony\Component\HttpKernel\Kernel->dumpContainer() at /srv/app/vendor/symfony/http-kernel/Kernel.php:606
 Symfony\Component\HttpKernel\Kernel->initializeContainer() at /srv/app/vendor/symfony/http-kernel/Kernel.php:136
 Symfony\Component\HttpKernel\Kernel->boot() at /srv/app/vendor/symfony/framework-bundle/Console/Application.php:169
 Symfony\Bundle\FrameworkBundle\Console\Application->registerCommands() at /srv/app/vendor/symfony/framework-bundle/Console/Application.php:75
 Symfony\Bundle\FrameworkBundle\Console\Application->doRun() at /srv/app/vendor/symfony/console/Application.php:149
 Symfony\Component\Console\Application->run() at /srv/app/bin/console:39

Using

        statistics.interval.ms: '100'
        stats_cb: 'App\Messenger\StatsCallBack'

Result on

In KafkaTransportFactory.php line 68:
                                                                          
  [RdKafka\Exception (-1)]                                                
  Property "stats_cb" must be set through dedicated .._set_..() function  
                                                                          

Exception trace:
  at /srv/app/vendor/koco/messenger-kafka/src/Messenger/KafkaTransportFactory.php:68
 RdKafka\Conf->set() at /srv/app/vendor/koco/messenger-kafka/src/Messenger/KafkaTransportFactory.php:68
 Koco\Kafka\Messenger\KafkaTransportFactory->createTransport() at /srv/app/vendor/symfony/messenger/Transport/TransportFactory.php:36
 Symfony\Component\Messenger\Transport\TransportFactory->createTransport() at /srv/app/var/cache/test/ContainerT9GqRDx/getMessenger_Transport_ProductupdateService.php:13
 require() at /srv/app/var/cache/test/ContainerT9GqRDx/srcApp_KernelTestDebugContainer.php:787
 ContainerT9GqRDx\srcApp_KernelTestDebugContainer->load() at /srv/app/vendor/symfony/dependency-injection/Container.php:448
 Symfony\Component\DependencyInjection\Container->getService() at /srv/app/vendor/symfony/dependency-injection/Argument/ServiceLocator.php:42
 Symfony\Component\DependencyInjection\Argument\ServiceLocator->get() at /srv/app/vendor/symfony/messenger/Command/ConsumeMessagesCommand.php:153
 Symfony\Component\Messenger\Command\ConsumeMessagesCommand->execute() at /srv/app/vendor/symfony/console/Command/Command.php:255
 Symfony\Component\Console\Command\Command->run() at /srv/app/vendor/symfony/console/Application.php:1027
 Symfony\Component\Console\Application->doRunCommand() at /srv/app/vendor/symfony/framework-bundle/Console/Application.php:97
 Symfony\Bundle\FrameworkBundle\Console\Application->doRunCommand() at /srv/app/vendor/symfony/console/Application.php:273
 Symfony\Component\Console\Application->doRun() at /srv/app/vendor/symfony/framework-bundle/Console/Application.php:83
 Symfony\Bundle\FrameworkBundle\Console\Application->doRun() at /srv/app/vendor/symfony/console/Application.php:149
 Symfony\Component\Console\Application->run() at /srv/app/bin/console:39

Here is my logger:

<?php

namespace App\Messenger;

use Psr\Log\LoggerInterface;

class StatsCallBack
{
    private LoggerInterface $logger;

    public function __construct(LoggerInterface $logger)
    {
        $this->logger = $logger;
    }

    public function __invoke($kafka, $json, $json_len): void
    {
        /**
         * Allow logging statistics
         * @see https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md#partitions
         * @see https://github.com/edenhill/librdkafka/wiki/Consumer-lag-monitoring
         */
        foreach ($kafka->getAssignment() as $partition) {
            $this->logger->info(
                'Kafka: Stats on {topic}, {partition}',
                [
                    'json_len' => $json_len,
                    'json' => $json,
                    'topic' => $partition->getTopic(),
                    'partition' => $partition->getPartition(),
                ]
            );
        }
    }
}

I need it to keep an eye on my consumer lag in order to achieve auto-scaling when lag increase.

Versions

rdkafka

rdkafka support => enabled version => 4.0.3 build date => May 3 2022 14:54:39 librdkafka version (runtime) => 1.5.0 librdkafka version (build) => 1.5.0.255

koco/messenger-kafka: v0.17.0

jry25 avatar May 06 '22 15:05 jry25

Just added a PR. Please let me know if I'm heading forward on the right way. I have no clue how to test it, I'd be happy to add test if you let me know what is the test strategy on this.

jry25 avatar May 06 '22 19:05 jry25