kafka-php icon indicating copy to clipboard operation
kafka-php copied to clipboard

run producer in cunsumer

Open SunnnyGohil opened this issue 6 years ago • 22 comments

Hii

Can i run producer into Consumer in kafka-php?

SunnnyGohil avatar Apr 23 '18 10:04 SunnnyGohil

@SunnnyGohil for now you can only run the sync producer while consuming messages, since the control of the event loop is done inside of the classes

lcobucci avatar Apr 23 '18 12:04 lcobucci

thanks for reply @lcobucci

when i run producer into consumer that time i got "Exception 'LogicException' with message 'Cannot run() recursively; event reactor already active" this error.

if you have solution than please help me

SunnnyGohil avatar Apr 23 '18 12:04 SunnnyGohil

@SunnnyGohil the producer should be run in the synchronous mode: https://github.com/weiboad/kafka-php#synchronous-mode

lcobucci avatar Apr 23 '18 13:04 lcobucci

@lcobucci i aleardy run producer in sync mode but i got error

SunnnyGohil avatar Apr 23 '18 13:04 SunnnyGohil

Can you put your code here?

lcobucci avatar Apr 23 '18 13:04 lcobucci

require '../vendor/autoload.php';
    date_default_timezone_set('PRC');
    
     $config = \Kafka\ConsumerConfig::getInstance();
    $config->setMetadataRefreshIntervalMs(10000);
    $config->setMetadataBrokerList('127.0.0.1:9092');
    $config->setGroupId('test');
    $config->setBrokerVersion('0.10.1.0');
    $config->setTopics(['test']);
    $config->setOffsetReset('earliest');
    $config->setClientId(1);
    $consumer = new \Kafka\Consumer();
    $consumer->start(function($topic,$part,$message) use (&$partition){
        
        $part=(int)$partition;
        $st=@(time()+microtime());
        $data=(array)json_decode($message['message']['value']);
        $data['data']=(array)$data['data'];
                $GLOBALS['log']=$data;
                $config_producer = \Kafka\ProducerConfig::getInstance();
                $config_producer->setMetadataRefreshIntervalMs(10000);
                $config_producer->setMetadataBrokerList('127.0.0.1:9092');
                $config_producer->setBrokerVersion('0.10.1.0');
                $config_producer->setRequiredAck(1);
                $config_producer->setIsAsyn(FALSE);
                $config_producer->setProduceInterval(500);
                $producer = new \Kafka\Producer(
                    function() {
                        return [
                            [
                                'topic' => 'make',
                                'partId'=>0,
                                'value' => json_encode($GLOBALS['log']),
                                'key' => 'click',
                            ],
                        ];
                    }
                );
                $producer->send(true);
    });

this is my code

SunnnyGohil avatar Apr 23 '18 13:04 SunnnyGohil

@SunnnyGohil setIsAsyn() does not put the consumer in synchronous mode, if you compare your code with the one in the link I've sent you you'll notice that there are differences in:

$producer = new \Kafka\Producer(
    function() {
        return [
            [
                'topic' => 'make',
                'partId'=>0,
                'value' => json_encode($GLOBALS['log']),
                'key' => 'click',
            ],
        ];
    }
);

$producer->send(true);

lcobucci avatar Apr 23 '18 16:04 lcobucci

thanks @lcobucci i changes in my code but i got there new exception "Exception 'Kafka\Exception\Protocol' with message 'unpack failed. string(raw) length is 0 , TO N"

SunnnyGohil avatar Apr 23 '18 17:04 SunnnyGohil

@SunnnyGohil are you using the stable release? If so, could you try with dev-master?

lcobucci avatar Apr 23 '18 20:04 lcobucci

yes @lcobucci i am using stable release. my project is on it, so i can't change it. can you give me another solution?

SunnnyGohil avatar Apr 24 '18 05:04 SunnnyGohil

i solv this by calling self API

SunnnyGohil avatar Apr 27 '18 09:04 SunnnyGohil

@SunnnyGohil I run into the same problem. Can you, please, explain what does it mean:

i solv this by calling self API

My error message is

Argument 2 passed to Kafka\\Protocol\\Protocol::unpack() must be of the type string, null given, called in /var/www/ads-export/vendor/nmred/kafka-php/src/Producer/SyncProcess.php on line 135

From what I was able to figure out the problem is that if you configure ConsumerConfig and ProducerConfig using the same metadata broker, one ends up overwritten. In my particular case I am faced with situation when I have Kafka\Socket inside Kafka\Producer\SyncProcess. So when calling syncMeta() method, $socket->read(4) returns null instead of string (Kafka\SocketSync returns string in this case):

$dataLen       = Protocol::unpack(Protocol::BIT_B32, $socket->read(4));

And as unpack() method expects string, but no null the script fail.

@lcobucci, can you, please, advise how I can overcome this problem.

Thank you in advance.

Sevavietl avatar May 22 '18 15:05 Sevavietl

dose partId is work?

kevinmiao avatar Jul 03 '18 04:07 kevinmiao

@SunnnyGohil can you please tell me how did you solve this issue?

dawood avatar Oct 20 '18 12:10 dawood

I has this problem too. And have no idea how i can solve it

djklim87 avatar Nov 13 '18 14:11 djklim87

@dawood @djklim87 it was solved in aforementioned PR, but it never merged.

Sevavietl avatar Nov 13 '18 15:11 Sevavietl

@Sevavietl yes i saw that, bad that PR did not get merged.

@djklim87 I was able to use producer from consumer using low level api provided. look at https://github.com/weiboad/kafka-php/blob/master/example/protocol/Produce.php

so, you need to use low level api which is used internally anyways to communicate. Let me know if you need more help.

dawood avatar Nov 14 '18 04:11 dawood

run producer into consumer,can any body solve this issue.

lijunchen22 avatar Mar 11 '20 03:03 lijunchen22

@lijunchen22 refer to https://github.com/weiboad/kafka-php/blob/master/example/protocol/Produce.php

you would need to use low level api as mentioned in above file.

dawood avatar Mar 11 '20 06:03 dawood

@dawood I try. but it doesn't work. Can you show me your code here?

lijunchen22 avatar Mar 11 '20 09:03 lijunchen22

@lijunchen22

<?
$data = [
    'required_ack' => 1,	
    'timeout' => '1000',
    'data' => [
    	[
    		'topic_name' => 'SOME_TOPIC',
    		'partitions' => [
        		[
    				'partition_id' => 0,
    				'messages'     => $eventsToSend,
        		],
    		],
    	],
    ],
];
Protocol::init('1.0.0');
$requestData = Protocol::encode(Protocol::PRODUCE_REQUEST, $data);
$socket = new Socket($kafkaBrokerHost, $kafkaBrokerPort);
$socket->setOnReadable(function ($data): void {
$coodid = \Kafka\Protocol\Protocol::unpack(\Kafka\Protocol\Protocol::BIT_B32, substr($data, 0, 4));
$result = Protocol::decode(Protocol::PRODUCE_REQUEST, substr($data, 4));
echo json_encode($result);
});
$socket->connect();
$socket->write($requestData);

dawood avatar Mar 11 '20 09:03 dawood

@dawood it work,thanks a million

lijunchen22 avatar Mar 11 '20 09:03 lijunchen22