phpkafka
phpkafka copied to clipboard
Random "Unexpected EOF while reading 4 bytes from stream (no data)"
What problem did you encounter?
From time to time longlang\phpkafka\Exception\SocketException
exception is launched, with "Unexpected EOF while reading 4 bytes from stream (no data)" message.
Is the Kafka environment self-built or cloud service?
- AWS MSK - https://aws.amazon.com/msk/
- kafka version: 2.8.1
Please execute the following command to get environment information.
PHP 8.1.24 (cli) (built: Sep 30 2023 03:38:17) (NTS)
Copyright (c) The PHP Group
Zend Engine v4.1.24, Copyright (c) Zend Technologies
swoole
Swoole => enabled
Author => Swoole Team <[email protected]>
Version => 5.1.0
Built => Oct 9 2023 13:58:13
coroutine => enabled with boost asm context
epoll => enabled
eventfd => enabled
signalfd => enabled
spinlock => enabled
rwlock => enabled
sockets => enabled
openssl => OpenSSL 3.1.3 19 Sep 2023
dtls => enabled
http2 => enabled
json => enabled
curl-native => enabled
zlib => 1.2.13
brotli => E16777225/D16777225
mutex_timedlock => enabled
pthread_barrier => enabled
mysqlnd => enabled
async_redis => enabled
Directive => Local Value => Master Value
swoole.enable_coroutine => On => On
swoole.enable_library => On => On
swoole.enable_fiber_mock => Off => Off
swoole.enable_preemptive_scheduler => Off => Off
swoole.display_errors => On => On
swoole.use_shortname => On => On
swoole.unixsock_buffer_size => 8388608 => 8388608
Provide the smallest reproducible code:
<?php
declare(strict_types=1);
use longlang\phpkafka\Producer\Producer;
use longlang\phpkafka\Producer\ProducerConfig;
use longlang\phpkafka\Sasl\ScramSha512Sasl;
use Swoole\Http\Server;
$config = new ProducerConfig();
$config->setBrokers($_ENV['KAFKA_BROKERS']);
$config->setBootstrapServers($_ENV['KAFKA_BROKERS']);
$config->setAcks(-1);
$config->setSsl(['open' => true]);
$config->setSasl([
'type' => ScramSha512Sasl::class,
'username' => $_ENV['KAFKA_USER'],
'password' => $_ENV['KAFKA_PASSWORD'],
]);
$producer = new Producer($config);
$server = new Server('0.0.0.0', 9501);
$server->set(['task_worker_num' => 8]);
$server->on('start', function () {
echo "Swoole http server is started\n";
});
$server->on('task', function(Server $server, int $taskId, $reactorId, $data) use ($producer) {
if (!isset($data)) {
return;
}
$producer->send($_ENV['KAFKA_TOPIC'], $message);
});
Extra
I'm using with custom made SCRAM-SHA-512
mechanism - https://github.com/Jeckerson/phpkafka/blob/master/src/Sasl/ScramSha512Sasl.php (https://github.com/Jeckerson/phpkafka/commit/c2badad88559dde68e9aefa0e2ed067aba401e50) (related https://github.com/swoole/phpkafka/issues/73)