消费者取不到数据
Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0 Start Request ClientId: kafka-php ApiKey: GroupCoordinatorRequest ApiVersion: 0 Start Request ClientId: kafka-php ApiKey: JoinGroupRequest ApiVersion: 1 Join group start, params:{"group_id":"test","session_timeout":30000,"rebalance_timeout":30000,"member_id":"","data":[{"protocol_name":"range","version":0,"subscription":["sale-stat"],"user_data":""}]} Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0 Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0 Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0 Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0 Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0 Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0 Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0 Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0
按照demo写的,消费取不到数据,代码:
<?php
require '../vendor/autoload.php';
date_default_timezone_set('PRC');
use Monolog\Logger;
use Monolog\Handler\StdoutHandler;
// Create the logger
$logger = new Logger('my_logger');
// Now add some handlers
$logger->pushHandler(new StdoutHandler());
$config = \Kafka\ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('10.13.4.159:9192');
$config->setGroupId('test');
$config->setBrokerVersion('1.0.0');
$config->setTopics(['test']);
//$config->setOffsetReset('earliest');
$consumer = new \Kafka\Consumer();
$consumer->setLogger($logger);
$consumer->start(function($topic, $part, $message) {
var_dump($message);
});
Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0 Start Request ClientId: kafka-php ApiKey: GroupCoordinatorRequest ApiVersion: 0 Start Request ClientId: kafka-php ApiKey: JoinGroupRequest ApiVersion: 1 Join group start, params:{"group_id":"test","session_timeout":30000,"rebalance_timeout":30000,"member_id":"","data":[{"protocol_name":"range","version":0,"subscription":["sale-stat"],"user_data":""}]} Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0 Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0 Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0 Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0 Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0 Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0 Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0 Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0
按照demo写的,消费取不到数据,代码:
<?php require '../vendor/autoload.php'; date_default_timezone_set('PRC'); use Monolog\Logger; use Monolog\Handler\StdoutHandler; // Create the logger $logger = new Logger('my_logger'); // Now add some handlers $logger->pushHandler(new StdoutHandler()); $config = \Kafka\ConsumerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('10.13.4.159:9192'); $config->setGroupId('test'); $config->setBrokerVersion('1.0.0'); $config->setTopics(['test']); //$config->setOffsetReset('earliest'); $consumer = new \Kafka\Consumer(); $consumer->setLogger($logger); $consumer->start(function($topic, $part, $message) { var_dump($message); });
fix it now?