phpkafka icon indicating copy to clipboard operation
phpkafka copied to clipboard

Getting SocketException while producing an event to Kafka Topic

Open karunk-hq opened this issue 1 year ago • 0 comments

  • What problem did you encounter? I'm trying to integrate Kafka producer with Laravel worker. After starting my worker, I noticed that publishing to Kafka is failing with following exception
AMQP: Failed to Process data with exception: longlang\phpkafka\Exception\SocketException: Could not write 659 bytes to stream in /home/shard/hiver/web/web/vendor/longlang/phpkafka/src/Socket/StreamSocket.php:123

On further analysis, I found out that before publishing event to topic, we firstly check if socket is writable or not

// wait for stream to become available for writing
$writable = $this->select([$this->socket], $timeout, false);

if (false === $writable) {
       $this->close();
        throw new SocketException('Could not write ' . $bytesToWrite . ' bytes to stream');
}

Any help here would be appreciated

  • Is the Kafka environment self-built or cloud service?

  • Please execute the following command to get environment information.

php -v & php --ri swoole & composer info | grep longlang/phpkafka

# Paste here

  • Provide the smallest reproducible code: My Kafka Client
class KafkaClientManager
{
    
    public static function getInstance()
    {
        $brokers = explode(',', config('kafka_broker_url'));

        $config = new ProducerConfig();
        $config->setBootstrapServer($brokers[0]);
        $config->setUpdateBrokers(true);
        $config->setAcks(-1);

        $producer = null;

        try {
            $producer = new Producer($config);

        } catch (ConnectionException $ce) {
            // when current broker cannot be connected to, try connecting to a different one

            $config->setBootstrapServer($brokers[1]);
            $producer = new Producer($config);
        }
        return $producer;
    }

    public static function sendMessage($topicName, $messageBody, $partitionKey = null)
    {
        $producer = app('kafka');
        try {
            $producer->send($topicName, $messageBody, $partitionKey);
        } catch (KafkaErrorException | FatalThrowableError $e) {
            // when current broker cannot be connected to, try connecting to a different one

            $producer = self::getInstance();
            $producer->send($topicName, $messageBody, $partitionKey);
        }

    }
}

karunk-hq avatar Jul 12 '23 10:07 karunk-hq