phpkafka
phpkafka copied to clipboard
Getting SocketException while producing an event to Kafka Topic
- What problem did you encounter? I'm trying to integrate Kafka producer with Laravel worker. After starting my worker, I noticed that publishing to Kafka is failing with following exception
AMQP: Failed to Process data with exception: longlang\phpkafka\Exception\SocketException: Could not write 659 bytes to stream in /home/shard/hiver/web/web/vendor/longlang/phpkafka/src/Socket/StreamSocket.php:123
On further analysis, I found out that before publishing event to topic, we firstly check if socket is writable or not
// wait for stream to become available for writing
$writable = $this->select([$this->socket], $timeout, false);
if (false === $writable) {
$this->close();
throw new SocketException('Could not write ' . $bytesToWrite . ' bytes to stream');
}
Any help here would be appreciated
-
Is the Kafka environment self-built or cloud service?
-
Please execute the following command to get environment information.
php -v & php --ri swoole & composer info | grep longlang/phpkafka
# Paste here
- Provide the smallest reproducible code: My Kafka Client
class KafkaClientManager
{
public static function getInstance()
{
$brokers = explode(',', config('kafka_broker_url'));
$config = new ProducerConfig();
$config->setBootstrapServer($brokers[0]);
$config->setUpdateBrokers(true);
$config->setAcks(-1);
$producer = null;
try {
$producer = new Producer($config);
} catch (ConnectionException $ce) {
// when current broker cannot be connected to, try connecting to a different one
$config->setBootstrapServer($brokers[1]);
$producer = new Producer($config);
}
return $producer;
}
public static function sendMessage($topicName, $messageBody, $partitionKey = null)
{
$producer = app('kafka');
try {
$producer->send($topicName, $messageBody, $partitionKey);
} catch (KafkaErrorException | FatalThrowableError $e) {
// when current broker cannot be connected to, try connecting to a different one
$producer = self::getInstance();
$producer->send($topicName, $messageBody, $partitionKey);
}
}
}