kafka-php
kafka-php copied to clipboard
run producer in cunsumer
Hii
Can i run producer into Consumer in kafka-php?
@SunnnyGohil for now you can only run the sync producer while consuming messages, since the control of the event loop is done inside of the classes
thanks for reply @lcobucci
when i run producer into consumer that time i got "Exception 'LogicException' with message 'Cannot run() recursively; event reactor already active" this error.
if you have solution than please help me
@SunnnyGohil the producer should be run in the synchronous mode: https://github.com/weiboad/kafka-php#synchronous-mode
@lcobucci i aleardy run producer in sync mode but i got error
Can you put your code here?
require '../vendor/autoload.php';
date_default_timezone_set('PRC');
$config = \Kafka\ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('127.0.0.1:9092');
$config->setGroupId('test');
$config->setBrokerVersion('0.10.1.0');
$config->setTopics(['test']);
$config->setOffsetReset('earliest');
$config->setClientId(1);
$consumer = new \Kafka\Consumer();
$consumer->start(function($topic,$part,$message) use (&$partition){
$part=(int)$partition;
$st=@(time()+microtime());
$data=(array)json_decode($message['message']['value']);
$data['data']=(array)$data['data'];
$GLOBALS['log']=$data;
$config_producer = \Kafka\ProducerConfig::getInstance();
$config_producer->setMetadataRefreshIntervalMs(10000);
$config_producer->setMetadataBrokerList('127.0.0.1:9092');
$config_producer->setBrokerVersion('0.10.1.0');
$config_producer->setRequiredAck(1);
$config_producer->setIsAsyn(FALSE);
$config_producer->setProduceInterval(500);
$producer = new \Kafka\Producer(
function() {
return [
[
'topic' => 'make',
'partId'=>0,
'value' => json_encode($GLOBALS['log']),
'key' => 'click',
],
];
}
);
$producer->send(true);
});
this is my code
@SunnnyGohil setIsAsyn()
does not put the consumer in synchronous mode, if you compare your code with the one in the link I've sent you you'll notice that there are differences in:
$producer = new \Kafka\Producer(
function() {
return [
[
'topic' => 'make',
'partId'=>0,
'value' => json_encode($GLOBALS['log']),
'key' => 'click',
],
];
}
);
$producer->send(true);
thanks @lcobucci i changes in my code but i got there new exception "Exception 'Kafka\Exception\Protocol' with message 'unpack failed. string(raw) length is 0 , TO N"
@SunnnyGohil are you using the stable release? If so, could you try with dev-master
?
yes @lcobucci i am using stable release. my project is on it, so i can't change it. can you give me another solution?
i solv this by calling self API
@SunnnyGohil I run into the same problem. Can you, please, explain what does it mean:
i solv this by calling self API
My error message is
Argument 2 passed to Kafka\\Protocol\\Protocol::unpack() must be of the type string, null given, called in /var/www/ads-export/vendor/nmred/kafka-php/src/Producer/SyncProcess.php on line 135
From what I was able to figure out the problem is that if you configure ConsumerConfig
and ProducerConfig
using the same metadata broker, one ends up overwritten. In my particular case I am faced with situation when I have Kafka\Socket
inside Kafka\Producer\SyncProcess
. So when calling syncMeta()
method, $socket->read(4)
returns null
instead of string (Kafka\SocketSync
returns string in this case):
$dataLen = Protocol::unpack(Protocol::BIT_B32, $socket->read(4));
And as unpack()
method expects string, but no null
the script fail.
@lcobucci, can you, please, advise how I can overcome this problem.
Thank you in advance.
dose partId is work?
@SunnnyGohil can you please tell me how did you solve this issue?
I has this problem too. And have no idea how i can solve it
@dawood @djklim87 it was solved in aforementioned PR, but it never merged.
@Sevavietl yes i saw that, bad that PR did not get merged.
@djklim87 I was able to use producer from consumer using low level api provided. look at https://github.com/weiboad/kafka-php/blob/master/example/protocol/Produce.php
so, you need to use low level api which is used internally anyways to communicate. Let me know if you need more help.
run producer into consumer,can any body solve this issue.
@lijunchen22 refer to https://github.com/weiboad/kafka-php/blob/master/example/protocol/Produce.php
you would need to use low level api as mentioned in above file.
@dawood I try. but it doesn't work. Can you show me your code here?
@lijunchen22
<?
$data = [
'required_ack' => 1,
'timeout' => '1000',
'data' => [
[
'topic_name' => 'SOME_TOPIC',
'partitions' => [
[
'partition_id' => 0,
'messages' => $eventsToSend,
],
],
],
],
];
Protocol::init('1.0.0');
$requestData = Protocol::encode(Protocol::PRODUCE_REQUEST, $data);
$socket = new Socket($kafkaBrokerHost, $kafkaBrokerPort);
$socket->setOnReadable(function ($data): void {
$coodid = \Kafka\Protocol\Protocol::unpack(\Kafka\Protocol\Protocol::BIT_B32, substr($data, 0, 4));
$result = Protocol::decode(Protocol::PRODUCE_REQUEST, substr($data, 4));
echo json_encode($result);
});
$socket->connect();
$socket->write($requestData);
@dawood it work,thanks a million