phpkafka icon indicating copy to clipboard operation
phpkafka copied to clipboard

在swoole服务中, kafka生产者每过一段时间就报错

Open insomniazz opened this issue 3 years ago • 5 comments

  • 你遇到了什么问题? 在swoole服务中, kafka生产者每过一段时间就报错

  • Kafka 环境是自建还是云服务? 自建

  • 请执行下面的命令获取环境信息。

php -v & php --ri swoole & composer info | grep longlang/phpkafka

PHP 7.4.25 (cli) (built: Oct 19 2021 15:18:10) ( NTS )
Copyright (c) The PHP Group
Zend Engine v3.4.0, Copyright (c) Zend Technologies
    with Zend OPcache v7.4.25, Copyright (c), by Zend Technologies

swoole

Swoole => enabled
Author => Swoole Team <[email protected]>
Version => 4.6.1
Built => Jan 11 2021 12:30:02
coroutine => enabled with boost asm context
trace_log => enabled
epoll => enabled
eventfd => enabled
signalfd => enabled
cpu_affinity => enabled
spinlock => enabled
rwlock => enabled
sockets => enabled
openssl => OpenSSL 1.0.2k-fips  26 Jan 2017
http2 => enabled
json => enabled
curl-native => enabled
pcre => enabled
zlib => 1.2.7
mutex_timedlock => enabled
pthread_barrier => enabled
futex => enabled
mysqlnd => enabled
async_redis => enabled

Directive => Local Value => Master Value
swoole.enable_coroutine => On => On
swoole.enable_library => On => On
swoole.enable_preemptive_scheduler => Off => Off
swoole.display_errors => On => On
swoole.use_shortname => On => On
swoole.unixsock_buffer_size => 8388608 => 8388608

  • 提供最小可复现代码:
// 你的代码
declare(strict_types=1);//语法严格模式
require_once dirname(__DIR__).'/vendor/autoload.php';
ini_set('memory_limit', '2048M');
use longlang\phpkafka\Producer\Producer;
use longlang\phpkafka\Producer\ProducerConfig;

$http = new Swoole\Http\Server("0.0.0.0", 9503);
$http->set([
    'tcp_fastopen' => true,
    'max_wait_time' => 1,
    'enable_reuse_port'=>false,//同一个端口复用
    'log_file'=>dirname(__DIR__)."/log/swoole.log"
]);

$config = new ProducerConfig();
$config->setBootstrapServer('172.18.50.74:9092,172.18.50.75:9092');
$config->setUpdateBrokers(true);
$config->setAcks(-1);
$producer = new Producer($config);
$http->on('request', function ($request, $response) use ($producer) {
    try {
        $topic = 'play_history_pre3';
        var_dump($request->server['request_uri']);
        $sendData = json_encode($request->post);
        $producer->send($topic,$sendData);
        unset($sendData,$topic);
        $response->end('success');
    } catch (\Throwable $e) {
        var_dump($e->getMessage());
        $response->end(json_encode(array(
            'code' => '999',
            'message' => '记录失败'
        )));
    }
});

$http->start();

  • 错误信息
PHP Fatal error:  Uncaught longlang\phpkafka\Exception\SocketException: Could not recv data from stream,  [0] in /data/www/kafka/swoole/vendor/longlang/phpkafka/src/Socket/SwooleSocket.php:129
Stack trace:
#0 /data/www/kafka/swoole/vendor/longlang/phpkafka/src/Client/SwooleClient.php(135): longlang\phpkafka\Socket\SwooleSocket->recv(4, -1)
#1 {main}
  thrown in /data/www/kafka/swoole/vendor/longlang/phpkafka/src/Socket/SwooleSocket.php on line 129

insomniazz avatar Jul 25 '22 03:07 insomniazz

是因为kafka收回了空闲的连接,而swoole这时候还没对连接销毁关闭,这phpkafka没有对connections.max.idle.ms进行支持,这时候你可以重写longlang\phpkafka\Client\SwooleClient 在方法startRecvCo的Expection加入一个判断当出现这个错误的时候把当前的无用连接给销毁 private function startRecvCo(): void { $this->coRecvRunning = true; $this->recvCoId = true; $this->recvCoId = Coroutine::create(function () { while ($this->coRecvRunning) { try { $data = $this->socket->recv(4, -1); if ('' === $data) { break; } $length = Int32::unpack($data); $data = $this->socket->recv($length); $correlationId = Int32::unpack($data); if (isset($this->recvChannels[$correlationId])) { $this->recvChannels[$correlationId]->push($data); } } catch (Exception $e) { if ($e instanceof SocketException && !$this->connected) { return; } if ($e instanceof SocketException && $this->connected) { $this->socket->close(); break; } $callback = $this->getConfig()->getExceptionCallback(); if ($callback) { $callback($e); } else { throw $e; } } } }); }

dygin avatar Aug 03 '22 04:08 dygin

这个异常问题什么时候可以修复下?

ushell avatar Aug 17 '22 08:08 ushell

@Yurunsoft 同样遇到这个问题

src/Client/SwooleClient.php 文件 145 行

https://github.com/swoole/phpkafka/blob/f15474647d5a3ed70f5480d46c7a37cfe6e6a973/src/Client/SwooleClient.php#L145-L154

改成

// 忽略其他代码
} catch (Exception $e) {
    if ($e instanceof SocketException && !$this->connected) {
        return;
    }
    if ($e instanceof SocketException && $this->connected) {
        $this->socket->close();
        break;
    }
    $callback = $this->getConfig()->getExceptionCallback();
    if ($callback) {
        $callback($e);
    } else {
        throw $e;
    }
}

就没出现报错了

peibinzhu avatar Aug 24 '22 03:08 peibinzhu

connections.max.idle.ms

原来是这样。。我说第一次投递消息后,一直有一个协程阻塞在revc()这里。这个包对长连接不进行超时或者回收处理

shuifa avatar Oct 21 '22 12:10 shuifa

@Yurunsoft 同样遇到这个问题

src/Client/SwooleClient.php 文件 145 行

https://github.com/swoole/phpkafka/blob/f15474647d5a3ed70f5480d46c7a37cfe6e6a973/src/Client/SwooleClient.php#L145-L154

改成

// 忽略其他代码
} catch (Exception $e) {
    if ($e instanceof SocketException && !$this->connected) {
        return;
    }
    if ($e instanceof SocketException && $this->connected) {
        $this->socket->close();
        break;
    }
    $callback = $this->getConfig()->getExceptionCallback();
    if ($callback) {
        $callback($e);
    } else {
        throw $e;
    }
}

就没出现报错了

改成这样后,出现了新的错误,如下: "错误信息":"[10753] Unknown", "错误文件":"/data/www/kafka/swoole/vendor/longlang/phpkafka/src/Protocol/ErrorCode.php", "错误行号":385, "php trace":[ { "file":"/data/www/kafka/swoole/vendor/longlang/phpkafka/src/Client/SyncClient.php", "line":196, "function":"check", "class":"longlang\phpkafka\Protocol\ErrorCode", "type":"::" }, { "file":"/data/www/kafka/swoole/vendor/longlang/phpkafka/src/Client/SyncClient.php", "line":103, "function":"updateApiVersions", "class":"longlang\phpkafka\Client\SyncClient", "type":"->" }, { "file":"/data/www/kafka/swoole/vendor/longlang/phpkafka/src/Client/SwooleClient.php", "line":52, "function":"connect", "class":"longlang\phpkafka\Client\SyncClient", "type":"->" }, { "file":"/data/www/kafka/swoole/vendor/longlang/phpkafka/src/Broker.php", "line":175, "function":"connect", "class":"longlang\phpkafka\Client\SwooleClient", "type":"->" }, { "file":"/data/www/kafka/swoole/vendor/longlang/phpkafka/src/Broker.php", "line":157, "function":"getClientByBrokerId", "class":"longlang\phpkafka\Broker", "type":"->" }, { "file":"/data/www/kafka/swoole/vendor/longlang/phpkafka/src/Producer/Producer.php", "line":160, "function":"getClient", "class":"longlang\phpkafka\Broker", "type":"->" }, { "file":"/data/www/kafka/swoole/vendor/longlang/phpkafka/src/Producer/Producer.php", "line":55, "function":"sendBatch", "class":"longlang\phpkafka\Producer\Producer", "type":"->" }, { "file":"/data/www/kafka/swoole/producer/kafkaProducer.php", "line":76, "function":"send", "class":"longlang\phpkafka\Producer\Producer", "type":"->" } ]

insomniazz avatar Dec 16 '22 02:12 insomniazz