phpkafka icon indicating copy to clipboard operation
phpkafka copied to clipboard

Random "Unexpected EOF while reading 4 bytes from stream (no data)"

Open Jeckerson opened this issue 1 year ago • 0 comments

What problem did you encounter?

From time to time longlang\phpkafka\Exception\SocketException exception is launched, with "Unexpected EOF while reading 4 bytes from stream (no data)" message.

Is the Kafka environment self-built or cloud service?

  • AWS MSK - https://aws.amazon.com/msk/
  • kafka version: 2.8.1

Please execute the following command to get environment information.

PHP 8.1.24 (cli) (built: Sep 30 2023 03:38:17) (NTS)
Copyright (c) The PHP Group
Zend Engine v4.1.24, Copyright (c) Zend Technologies

swoole

Swoole => enabled
Author => Swoole Team <[email protected]>
Version => 5.1.0
Built => Oct  9 2023 13:58:13
coroutine => enabled with boost asm context
epoll => enabled
eventfd => enabled
signalfd => enabled
spinlock => enabled
rwlock => enabled
sockets => enabled
openssl => OpenSSL 3.1.3 19 Sep 2023
dtls => enabled
http2 => enabled
json => enabled
curl-native => enabled
zlib => 1.2.13
brotli => E16777225/D16777225
mutex_timedlock => enabled
pthread_barrier => enabled
mysqlnd => enabled
async_redis => enabled

Directive => Local Value => Master Value
swoole.enable_coroutine => On => On
swoole.enable_library => On => On
swoole.enable_fiber_mock => Off => Off
swoole.enable_preemptive_scheduler => Off => Off
swoole.display_errors => On => On
swoole.use_shortname => On => On
swoole.unixsock_buffer_size => 8388608 => 8388608

Provide the smallest reproducible code:

<?php

declare(strict_types=1);

use longlang\phpkafka\Producer\Producer;
use longlang\phpkafka\Producer\ProducerConfig;
use longlang\phpkafka\Sasl\ScramSha512Sasl;
use Swoole\Http\Server;

$config = new ProducerConfig();
$config->setBrokers($_ENV['KAFKA_BROKERS']);
$config->setBootstrapServers($_ENV['KAFKA_BROKERS']);
$config->setAcks(-1);
$config->setSsl(['open' => true]);
$config->setSasl([
    'type' => ScramSha512Sasl::class,
    'username' => $_ENV['KAFKA_USER'],
    'password' => $_ENV['KAFKA_PASSWORD'],
]);
$producer = new Producer($config);

$server = new Server('0.0.0.0', 9501);
$server->set(['task_worker_num' => 8]);
$server->on('start', function () {
    echo "Swoole http server is started\n";
});
$server->on('task', function(Server $server, int $taskId, $reactorId, $data) use ($producer) {
    if (!isset($data)) {
        return;
    }

    $producer->send($_ENV['KAFKA_TOPIC'], $message);
});

Extra

I'm using with custom made SCRAM-SHA-512 mechanism - https://github.com/Jeckerson/phpkafka/blob/master/src/Sasl/ScramSha512Sasl.php (https://github.com/Jeckerson/phpkafka/commit/c2badad88559dde68e9aefa0e2ed067aba401e50) (related https://github.com/swoole/phpkafka/issues/73)

Jeckerson avatar Oct 09 '23 15:10 Jeckerson