General Guidance for using kafka, and similar packages with frankenphp
Hi,
I am new to frankenphp, trying to evaluate this for a project. I started with https://github.com/f00b4r/2023-10-frankenphp and got it working. Now, I want to use some extensions like kafka, redis, mysql, etc. I used the following modifications to
- Dockerfile
# Install PHP extensions rdkafka
RUN install-php-extensions rdkafka
- Docker-compose.yml
services:
app:
image: frankapp
build: .
ports:
- 80:80
environment:
- CADDY_DEBUG=debug
- DEBUG=debug
- FRANKENPHP_CONFIG=worker /srv/public/worker.php
- SERVER_NAME=:80
volumes:
- ./:/srv
- ./Caddyfile:/etc/caddy/Caddyfile
kafka1:
image: 'bitnami/kafka:3.7.0'
container_name: kafka-3.7-controller-1
ports:
- 9093
environment:
# KRaft settings. No longer using zookeeper method.
- KAFKA_CFG_NODE_ID=1
- KAFKA_KRAFT_CLUSTER_ID=ZmNDEyNT
- KAFKA_CFG_PROCESS_ROLES=controller
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093
# Listeners
- KAFKA_CFG_LISTENERS=CONTROLLER://kafka1:9093
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
kafka4:
image: 'bitnami/kafka:3.7.0'
container_name: kafka-3.7-broker-4
ports:
- 9092
environment:
# KRaft settings. No longer using zookeeper method.
- KAFKA_CFG_NODE_ID=4
- KAFKA_KRAFT_CLUSTER_ID=ZmNDEyNT
- KAFKA_CFG_PROCESS_ROLES=broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://kafka4:9092
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka4:9092
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
- /public/worker.php
<?php declare(strict_types = 1);
ignore_user_abort(true);
do {
$running = frankenphp_handle_request(function () {
echo "Hello FrankenPHP!!";
//print_r($_GET); // for all GET variables
//phpinfo();
if(isset($_POST['testData'])){
$payload=trim($_POST['testData']);
$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', 'kafka4:9092');
$conf->set('socket.timeout.ms', 60);
$conf->set('request.timeout.ms', 60);
if (function_exists('pcntl_sigprocmask')) {
pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
$conf->set('internal.termination.signal', SIGIO);
} else {
$conf->set('queue.buffering.max.ms', 1);
}
$producer = new RdKafka\Producer($conf);
$topic="testData";
$kafkaTopic = $producer->newTopic($topic);
$kafkaTopic->produce(RD_KAFKA_PARTITION_UA, 0, $payload);
$producer->poll(0);
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
$result = $producer->flush(10000);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
echo "Message Published";
break;
}
}
}
});
gc_collect_cycles();
} while ($running);
However, If I do a test on worker.php, I am getting the following error.
INFO[0001] Response body: Hello FrankenPHP!<br />
<b>Fatal error</b>: Uncaught TypeError: RdKafka\Conf::set(): Argument #2 ($value) must be of type string, int given in /srv/public/worker.php:14
Stack trace:
#0 /srv/public/worker.php(14): RdKafka\Conf->set('socket.timeout....', 60)
#1 [internal function]: {closure}()
#2 /srv/public/worker.php(4): frankenphp_handle_request(Object(Closure))
#3 {main}
thrown in <b>/srv/public/worker.php</b> on line <b>14</b><br /> source=console
I have another script, producer.php in the same folder, with the same publish code, and If I send a post request, it successfully publishes to kafka. So, it looks like the worker part is having the issue? I looked around for all available resources, including ai help, but their recommendations have not resolved this. Any help is much appreciated.
Can you show your failing worker script? It looks like you pass an int instead of a string when calling RdKafka\Conf::set().
@dunglas thank you for the quick response. The failing worker script is already in the original post (worker.php). On the otherhand, here is the producer.php script that works just fine.
<?php
if(isset($_POST['testData'])){
$payload=trim($_POST['testData']);
$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', 'kafka4:9092');
$conf->set('socket.timeout.ms', 60); // or socket.blocking.max.ms, depending on librdkafka version
$conf->set('request.timeout.ms', 60); // request timeout
if (function_exists('pcntl_sigprocmask')) {
pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
$conf->set('internal.termination.signal', SIGIO);
} else {
$conf->set('queue.buffering.max.ms', 1);
}
$producer = new RdKafka\Producer($conf);
$topic="testData";
$kafkaTopic = $producer->newTopic($topic);
$kafkaTopic->produce(RD_KAFKA_PARTITION_UA, 0, $payload);
$producer->poll(0);
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
$result = $producer->flush(10000);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
echo "Message Published";
break;
}
}
}
else{//If you want to manually test, simply call producer.php, this form will appear, enter a string and submitl?>
<!DOCTYPE html>
<head></head>
<body>
<br/><br/>
<form id= "img_upload2" action="#" method="post" style="background-color:#ccc;padding:20px;">
<textarea rows="30" cols="50" name="testData" ></textarea>
<input type='submit' value=' Go '/>
</form>
</body>
</html>
<?php }
?>
@dunglas Just wanted to check in on this. Would be great to know if this is something I am doing wrong, or if there is anything else to be addressed. much appreaciated.
pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
It is probably not wise to mess with signals in frankenphp which is Go running PHP. If you get the signal while the request is running -- this is fine. But other threads running Go might not work too well with this set on the process.
RdKafka\Conf::set(): Argument #2 ($value) must be of type string, int given in /srv/public/worker.php:14
You should disable strict types so PHP will automatically cast your integer to a string, otherwise, you need to make sure types are the correct type:
$conf->set('socket.timeout.ms', '60'); // or socket.blocking.max.ms, depending on librdkafka version