phpkafka icon indicating copy to clipboard operation
phpkafka copied to clipboard

Support for RedPanda?

Open slushpuppy opened this issue 4 years ago • 0 comments

  • What problem did you encounter?

PHP Fatal error: Uncaught longlang\phpkafka\Exception\KafkaErrorException: [35] The version of API is not supported. in /src/vendor/longlang/phpkafka/src/Protocol/ErrorCode.php:385 Stack trace: #0 /src/vendor/longlang/phpkafka/src/Util/KafkaUtil.php(69): longlang\phpkafka\Protocol\ErrorCode::check() #1 /src/vendor/longlang/phpkafka/src/Group/GroupManager.php(111): longlang\phpkafka\Util\KafkaUtil::retry() #2 /src/vendor/longlang/phpkafka/src/Consumer/Consumer.php(162): longlang\phpkafka\Group\GroupManager->syncGroup() #3 /src/vendor/longlang/phpkafka/src/Consumer/Consumer.php(128): longlang\phpkafka\Consumer\Consumer->rejoin() #4 /src/Lib/AbstractServiceEventWorker.php(53): longlang\phpkafka\Consumer\Consumer->__construct() #5 /src/App/cli/serviceEventWorker.php(19): Lib\AbstractServiceEventWorker->start()

  • Is the Kafka environment self-built or cloud service?

apt installation

  • Please execute the following command to get environment information.

php -v & php --ri swoole & composer info | grep longlang/phpkafka php8.0 -v & php8.0 --ri openswoole [1] 533 PHP 8.0.13 (cli) (built: Nov 22 2021 09:50:24) ( NTS ) Copyright (c) The PHP Group Zend Engine v4.0.13, Copyright (c) Zend Technologies with Zend OPcache v8.0.13, Copyright (c), by Zend Technologies with Xdebug v3.0.4, Copyright (c) 2002-2021, by Derick Rethans

openswoole

Open Swoole => enabled Author => Open Swoole Group & Contributors [email protected] Version => 4.8.1 Built => Dec 6 2021 17:59:05 coroutine => enabled with boost asm context epoll => enabled eventfd => enabled signalfd => enabled cpu_affinity => enabled spinlock => enabled rwlock => enabled sockets => enabled openssl => OpenSSL 1.1.1 11 Sep 2018 dtls => enabled http2 => enabled json => enabled curl-native => enabled pcre => enabled zlib => 1.2.11 mutex_timedlock => enabled pthread_barrier => enabled futex => enabled mysqlnd => enabled async_redis => enabled

Directive => Local Value => Master Value swoole.enable_coroutine => On => On swoole.enable_library => On => On swoole.enable_preemptive_scheduler => Off => Off swoole.display_errors => On => On swoole.use_shortname => On => On swoole.unixsock_buffer_size => 8388608 => 8388608

"longlang/phpkafka": "^1.2",

# Paste here

  • Provide the smallest reproducible code:
// Your code
         $config = new ConsumerConfig();
      $addr = Config::getBroker();
      $config->setBroker($addr->toIPPortFormat());
      $config->setTopic($this->getTopic()); // topic
      $config->setClientId($this->sanitizeClassName(static::class)); // client ID. Use
      $config->setInterval(0.1);
      $config->setSsl(Config::getSSLConfig());
      $config->setGroupId($this->sanitizeClassName($this->getGroupId()));
      $config->setGroupInstanceId($this->sanitizeClassName(static::class));
      $this->config = $config;


......

        $this->consumer = new Consumer($this->config, [$this,'consume']);

        $this->consumer->start();

I am able to connect and consume topics using other php kafka libraries such as simple-kafka-client

 $builder = KafkaConsumerBuilder::create();

        $consumer = $builder->withAdditionalConfig(
            [
                // start at the very beginning of the topic when reading for the first time
                'auto.offset.reset' => 'earliest',

                // will be visible in broker logs
                'client.id' => 'php-kafka-lib-high-level-consumer',

                // SSL settings
                'security.protocol' => 'ssl',
                'ssl.ca.location' => Config::CA_CERT,
                'ssl.certificate.location' => Config::CLIENT_CERT,
                'ssl.key.location' => Config::CLIENT_KEY,

                // SASL settings
                //'sasl.mechanisms' => '',
                //'ssl.endpoint.identification.algorithm' => 'https',
                //'sasl.username' => '',
                //'sasl.password' => '',

                // Add additional output if you need to debug a problem
                // 'log_level' => (string) LOG_DEBUG,
                // 'debug' => 'all'
            ]
        )
            ->withAdditionalBroker(Config::getBroker()->toIPPortFormat())
            ->withConsumerGroup('php-kafka-lib-high-level-consumer')
            ->withSubscription('php-kafka-lib-test-topic')
            ->build();

        $consumer->subscribe();

        while (true) {
            try {
                $message = $consumer->consume(10000);
            } catch (KafkaConsumerTimeoutException|KafkaConsumerEndOfPartitionException $e) {
                echo 'Didn\'t receive any messages, waiting for more...' . PHP_EOL;
                continue;
            } catch (KafkaConsumerConsumeException $e) {
                echo $e->getMessage() . PHP_EOL;
                continue;
            }

            echo sprintf(
                    'Read message with key:%s payload:%s topic:%s partition:%d offset:%d headers:%s',
                    $message->getKey(),
                    $message->getBody(),
                    $message->getTopicName(),
                    $message->getPartition(),
                    $message->getOffset(),
                    implode(',', $message->getHeaders())
                ) . PHP_EOL;

            $consumer->commit($message);
        }

slushpuppy avatar Dec 09 '21 05:12 slushpuppy