kafka-php icon indicating copy to clipboard operation
kafka-php copied to clipboard

消费者取不到数据

Open cwl168 opened this issue 6 years ago • 1 comments

Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0 Start Request ClientId: kafka-php ApiKey: GroupCoordinatorRequest ApiVersion: 0 Start Request ClientId: kafka-php ApiKey: JoinGroupRequest ApiVersion: 1 Join group start, params:{"group_id":"test","session_timeout":30000,"rebalance_timeout":30000,"member_id":"","data":[{"protocol_name":"range","version":0,"subscription":["sale-stat"],"user_data":""}]} Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0 Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0 Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0 Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0 Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0 Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0 Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0 Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0

按照demo写的,消费取不到数据,代码:

<?php 
require '../vendor/autoload.php';
date_default_timezone_set('PRC');
use Monolog\Logger;
use Monolog\Handler\StdoutHandler;
// Create the logger
$logger = new Logger('my_logger');
// Now add some handlers
$logger->pushHandler(new StdoutHandler());

$config = \Kafka\ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('10.13.4.159:9192');
$config->setGroupId('test');
$config->setBrokerVersion('1.0.0');
$config->setTopics(['test']);
//$config->setOffsetReset('earliest');
$consumer = new \Kafka\Consumer();
$consumer->setLogger($logger);
$consumer->start(function($topic, $part, $message) {
	var_dump($message);
});

cwl168 avatar Aug 09 '19 02:08 cwl168

Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0 Start Request ClientId: kafka-php ApiKey: GroupCoordinatorRequest ApiVersion: 0 Start Request ClientId: kafka-php ApiKey: JoinGroupRequest ApiVersion: 1 Join group start, params:{"group_id":"test","session_timeout":30000,"rebalance_timeout":30000,"member_id":"","data":[{"protocol_name":"range","version":0,"subscription":["sale-stat"],"user_data":""}]} Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0 Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0 Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0 Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0 Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0 Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0 Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0 Start sync metadata request Start sync metadata request params:["sale-stat"] Start Request ClientId: kafka-php ApiKey: MetadataRequest ApiVersion: 0

按照demo写的,消费取不到数据,代码:

<?php 
require '../vendor/autoload.php';
date_default_timezone_set('PRC');
use Monolog\Logger;
use Monolog\Handler\StdoutHandler;
// Create the logger
$logger = new Logger('my_logger');
// Now add some handlers
$logger->pushHandler(new StdoutHandler());

$config = \Kafka\ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('10.13.4.159:9192');
$config->setGroupId('test');
$config->setBrokerVersion('1.0.0');
$config->setTopics(['test']);
//$config->setOffsetReset('earliest');
$consumer = new \Kafka\Consumer();
$consumer->setLogger($logger);
$consumer->start(function($topic, $part, $message) {
	var_dump($message);
});

fix it now?

Alerander avatar Mar 02 '20 03:03 Alerander