Awesome-GitHub-Repo
Awesome-GitHub-Repo copied to clipboard
【项目自荐】Apache Pulsar Client for PHP
-
项目地址:https://github.com/ikilobyte/pulsar-client-php
-
类别:PHP
-
项目标题:PHP Native Client library for Apache Pulsar
-
项目描述:由于Pulsar官方未提供PHP客户端,php开发者使用pulsar时很不方便,所以开发了这个纯PHP实现的客户端,方便php开发者使用
-
示例代码:依据Pulsar.proto实现
Producer
<?php
use Pulsar\Authentication\Jwt;
use Pulsar\Compression\Compression;
use Pulsar\Producer;
use Pulsar\ProducerOptions;
use Pulsar\MessageOptions;
require_once __DIR__ . '/vendor/autoload.php';
$options = new ProducerOptions();
// If permission authentication is available
// Only JWT authentication is currently supported
$options->setAuthentication(new Jwt('token'));
$options->setConnectTimeout(3);
$options->setTopic('persistent://public/default/demo');
$options->setCompression(Compression::ZLIB);
$producer = new Producer('pulsar://localhost:6650', $options);
$producer->connect();
for ($i = 0; $i < 10; $i++) {
$messageID = $producer->send(sprintf('hello %d',$i));
echo 'messageID ' . $messageID . "\n";
}
// Sending messages asynchronously
//for ($i = 0; $i < 10; $i++) {
// $producer->sendAsync(sprintf('hello-async %d',$i),function(string $messageID){
// echo 'messageID ' . $messageID . "\n";
// });
//}
//
//// Add this line when sending asynchronously
//$producer->wait();
// Sending delayed messages
for ($i = 0; $i < 10; $i++) {
$producer->send(sprintf('hello-delay %d',$i),[
MessageOptions::DELAY_SECONDS => $i * 5, // Seconds
]);
}
// close
$producer->close();
Consumer
<?php
use Pulsar\Authentication\Jwt;
use Pulsar\Consumer;
use Pulsar\ConsumerOptions;
use Pulsar\SubscriptionType;
require_once __DIR__ . '/vendor/autoload.php';
$options = new ConsumerOptions();
// If permission authentication is available
// Only JWT authentication is currently supported
$options->setAuthentication(new Jwt('token'));
$options->setConnectTimeout(3);
$options->setTopic('persistent://public/default/demo');
$options->setSubscription('logic');
$options->setSubscriptionType(SubscriptionType::Shared);
// Configure how many seconds Nack's messages are redelivered, the default is 1 minute
$options->setNackRedeliveryDelay(20);
$consumer = new Consumer('pulsar://localhost:6650', $options);
$consumer->connect();
while (true) {
$message = $consumer->receive();
echo sprintf('Got message 【%s】messageID[%s] topic[%s] publishTime[%s]',
$message->getPayload(),
$message->getMessageId(),
$message->getTopic(),
$message->getPublishTime()
) . "\n";
// ...
// Remember to confirm that the message is complete after processing
$consumer->ack($message);
// When processing fails, you can also execute the Nack
// The message will be re-delivered after the specified time
// $consumer->nack($message);
}
$consumer->close();