messenger-kafka
messenger-kafka copied to clipboard
Add a way to log statistics
Hi, I didn't find anything to log statistics.
I tried to set statistics.interval.ms
and stats_cb
but got errors.
Using
statistics.interval.ms: '100'
stats_cb: '@App\Messenger\StatsCallBack'
Result on
In PhpDumper.php line 1561:
[Symfony\Component\DependencyInjection\Exception\InvalidArgumentException]
You cannot dump a container with parameters that contain references to other services (reference to service "App\Messenger\StatsCallBack" found in "/0/stats_cb").
Exception trace:
at /srv/app/vendor/symfony/dependency-injection/Dumper/PhpDumper.php:1561
Symfony\Component\DependencyInjection\Dumper\PhpDumper->exportParameters() at /srv/app/vendor/symfony/dependency-injection/Dumper/PhpDumper.php:1553
Symfony\Component\DependencyInjection\Dumper\PhpDumper->exportParameters() at /srv/app/vendor/symfony/dependency-injection/Dumper/PhpDumper.php:1439
Symfony\Component\DependencyInjection\Dumper\PhpDumper->addDefaultParametersMethod() at /srv/app/vendor/symfony/dependency-injection/Dumper/PhpDumper.php:221
Symfony\Component\DependencyInjection\Dumper\PhpDumper->dump() at /srv/app/vendor/symfony/http-kernel/Kernel.php:780
Symfony\Component\HttpKernel\Kernel->dumpContainer() at /srv/app/vendor/symfony/http-kernel/Kernel.php:606
Symfony\Component\HttpKernel\Kernel->initializeContainer() at /srv/app/vendor/symfony/http-kernel/Kernel.php:136
Symfony\Component\HttpKernel\Kernel->boot() at /srv/app/vendor/symfony/framework-bundle/Console/Application.php:169
Symfony\Bundle\FrameworkBundle\Console\Application->registerCommands() at /srv/app/vendor/symfony/framework-bundle/Console/Application.php:75
Symfony\Bundle\FrameworkBundle\Console\Application->doRun() at /srv/app/vendor/symfony/console/Application.php:149
Symfony\Component\Console\Application->run() at /srv/app/bin/console:39
Using
statistics.interval.ms: '100'
stats_cb: 'App\Messenger\StatsCallBack'
Result on
In KafkaTransportFactory.php line 68:
[RdKafka\Exception (-1)]
Property "stats_cb" must be set through dedicated .._set_..() function
Exception trace:
at /srv/app/vendor/koco/messenger-kafka/src/Messenger/KafkaTransportFactory.php:68
RdKafka\Conf->set() at /srv/app/vendor/koco/messenger-kafka/src/Messenger/KafkaTransportFactory.php:68
Koco\Kafka\Messenger\KafkaTransportFactory->createTransport() at /srv/app/vendor/symfony/messenger/Transport/TransportFactory.php:36
Symfony\Component\Messenger\Transport\TransportFactory->createTransport() at /srv/app/var/cache/test/ContainerT9GqRDx/getMessenger_Transport_ProductupdateService.php:13
require() at /srv/app/var/cache/test/ContainerT9GqRDx/srcApp_KernelTestDebugContainer.php:787
ContainerT9GqRDx\srcApp_KernelTestDebugContainer->load() at /srv/app/vendor/symfony/dependency-injection/Container.php:448
Symfony\Component\DependencyInjection\Container->getService() at /srv/app/vendor/symfony/dependency-injection/Argument/ServiceLocator.php:42
Symfony\Component\DependencyInjection\Argument\ServiceLocator->get() at /srv/app/vendor/symfony/messenger/Command/ConsumeMessagesCommand.php:153
Symfony\Component\Messenger\Command\ConsumeMessagesCommand->execute() at /srv/app/vendor/symfony/console/Command/Command.php:255
Symfony\Component\Console\Command\Command->run() at /srv/app/vendor/symfony/console/Application.php:1027
Symfony\Component\Console\Application->doRunCommand() at /srv/app/vendor/symfony/framework-bundle/Console/Application.php:97
Symfony\Bundle\FrameworkBundle\Console\Application->doRunCommand() at /srv/app/vendor/symfony/console/Application.php:273
Symfony\Component\Console\Application->doRun() at /srv/app/vendor/symfony/framework-bundle/Console/Application.php:83
Symfony\Bundle\FrameworkBundle\Console\Application->doRun() at /srv/app/vendor/symfony/console/Application.php:149
Symfony\Component\Console\Application->run() at /srv/app/bin/console:39
Here is my logger:
<?php
namespace App\Messenger;
use Psr\Log\LoggerInterface;
class StatsCallBack
{
private LoggerInterface $logger;
public function __construct(LoggerInterface $logger)
{
$this->logger = $logger;
}
public function __invoke($kafka, $json, $json_len): void
{
/**
* Allow logging statistics
* @see https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md#partitions
* @see https://github.com/edenhill/librdkafka/wiki/Consumer-lag-monitoring
*/
foreach ($kafka->getAssignment() as $partition) {
$this->logger->info(
'Kafka: Stats on {topic}, {partition}',
[
'json_len' => $json_len,
'json' => $json,
'topic' => $partition->getTopic(),
'partition' => $partition->getPartition(),
]
);
}
}
}
I need it to keep an eye on my consumer lag in order to achieve auto-scaling when lag increase.
Versions
rdkafka
rdkafka support => enabled version => 4.0.3 build date => May 3 2022 14:54:39 librdkafka version (runtime) => 1.5.0 librdkafka version (build) => 1.5.0.255
koco/messenger-kafka: v0.17.0
Just added a PR. Please let me know if I'm heading forward on the right way. I have no clue how to test it, I'd be happy to add test if you let me know what is the test strategy on this.