frankenphp icon indicating copy to clipboard operation
frankenphp copied to clipboard

General Guidance for using kafka, and similar packages with frankenphp

Open kapyaar opened this issue 11 months ago • 4 comments

Hi,

I am new to frankenphp, trying to evaluate this for a project. I started with https://github.com/f00b4r/2023-10-frankenphp and got it working. Now, I want to use some extensions like kafka, redis, mysql, etc. I used the following modifications to

  1. Dockerfile
# Install PHP extensions rdkafka
RUN install-php-extensions rdkafka
  1. Docker-compose.yml
services:
  app:
    image: frankapp
    build: .
    ports:
      - 80:80
    environment:
      - CADDY_DEBUG=debug
      - DEBUG=debug
      - FRANKENPHP_CONFIG=worker /srv/public/worker.php
      - SERVER_NAME=:80
    volumes:
      - ./:/srv
      - ./Caddyfile:/etc/caddy/Caddyfile
  kafka1:
    image: 'bitnami/kafka:3.7.0'
    container_name: kafka-3.7-controller-1
    ports:
      - 9093
    environment:
      # KRaft settings. No longer using zookeeper method.
      - KAFKA_CFG_NODE_ID=1
      - KAFKA_KRAFT_CLUSTER_ID=ZmNDEyNT
      - KAFKA_CFG_PROCESS_ROLES=controller
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093
      # Listeners
      - KAFKA_CFG_LISTENERS=CONTROLLER://kafka1:9093
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
  kafka4:
    image: 'bitnami/kafka:3.7.0'
    container_name: kafka-3.7-broker-4
    ports:
      - 9092
    environment:
      # KRaft settings. No longer using zookeeper method.
      - KAFKA_CFG_NODE_ID=4
      - KAFKA_KRAFT_CLUSTER_ID=ZmNDEyNT
      - KAFKA_CFG_PROCESS_ROLES=broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
      # Listeners
      - KAFKA_CFG_LISTENERS=PLAINTEXT://kafka4:9092
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka4:9092
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
  1. /public/worker.php
<?php declare(strict_types = 1);
ignore_user_abort(true);
do {
	$running = frankenphp_handle_request(function () {
		echo "Hello FrankenPHP!!";
		//print_r($_GET);  // for all GET variables
		//phpinfo();
		if(isset($_POST['testData'])){		
			$payload=trim($_POST['testData']);	
			$conf = new RdKafka\Conf();
			$conf->set('metadata.broker.list', 'kafka4:9092');
			
			$conf->set('socket.timeout.ms', 60); 
			$conf->set('request.timeout.ms', 60); 
			
			if (function_exists('pcntl_sigprocmask')) {
				pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
				$conf->set('internal.termination.signal', SIGIO);
			} else {
				$conf->set('queue.buffering.max.ms', 1);
			}
			
			$producer = new RdKafka\Producer($conf);
			$topic="testData";
			$kafkaTopic = $producer->newTopic($topic);
			$kafkaTopic->produce(RD_KAFKA_PARTITION_UA, 0, $payload);
			$producer->poll(0);
			for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
				$result = $producer->flush(10000);
				if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
					echo "Message Published";
					break;
				}
			}
			
		}
	});

	gc_collect_cycles();
} while ($running);

However, If I do a test on worker.php, I am getting the following error.

INFO[0001] Response body: Hello FrankenPHP!<br />
<b>Fatal error</b>:  Uncaught TypeError: RdKafka\Conf::set(): Argument #2 ($value) must be of type string, int given in /srv/public/worker.php:14
Stack trace:
#0 /srv/public/worker.php(14): RdKafka\Conf-&gt;set('socket.timeout....', 60)
#1 [internal function]: {closure}()
#2 /srv/public/worker.php(4): frankenphp_handle_request(Object(Closure))
#3 {main}
  thrown in <b>/srv/public/worker.php</b> on line <b>14</b><br />  source=console

I have another script, producer.php in the same folder, with the same publish code, and If I send a post request, it successfully publishes to kafka. So, it looks like the worker part is having the issue? I looked around for all available resources, including ai help, but their recommendations have not resolved this. Any help is much appreciated.

kapyaar avatar Feb 03 '25 16:02 kapyaar

Can you show your failing worker script? It looks like you pass an int instead of a string when calling RdKafka\Conf::set().

dunglas avatar Feb 03 '25 17:02 dunglas

@dunglas thank you for the quick response. The failing worker script is already in the original post (worker.php). On the otherhand, here is the producer.php script that works just fine.

<?php 
if(isset($_POST['testData'])){		
	$payload=trim($_POST['testData']);	
	$conf = new RdKafka\Conf();
	$conf->set('metadata.broker.list', 'kafka4:9092');
	
	$conf->set('socket.timeout.ms', 60); // or socket.blocking.max.ms, depending on librdkafka version
	$conf->set('request.timeout.ms', 60); // request timeout
	
	if (function_exists('pcntl_sigprocmask')) {
		pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
		$conf->set('internal.termination.signal', SIGIO);
	} else {
		$conf->set('queue.buffering.max.ms', 1);
	}
	
	$producer = new RdKafka\Producer($conf);
	$topic="testData";
	$kafkaTopic = $producer->newTopic($topic);
	$kafkaTopic->produce(RD_KAFKA_PARTITION_UA, 0, $payload);
	$producer->poll(0);
	for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
		$result = $producer->flush(10000);
		if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
			echo "Message Published";
			break;
		}
	}
	
}
else{//If you want to manually test, simply call producer.php, this form will appear, enter a string and submitl?>

<!DOCTYPE html>
<head></head>
<body>
<br/><br/>
<form id= "img_upload2" action="#" method="post"  style="background-color:#ccc;padding:20px;"> 
			<textarea rows="30" cols="50" name="testData" ></textarea>
                        <input type='submit' value=' Go '/> 
</form>
</body>
</html>

<?php }
?>

kapyaar avatar Feb 03 '25 17:02 kapyaar

@dunglas Just wanted to check in on this. Would be great to know if this is something I am doing wrong, or if there is anything else to be addressed. much appreaciated.

kapyaar avatar Feb 10 '25 21:02 kapyaar

pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));

It is probably not wise to mess with signals in frankenphp which is Go running PHP. If you get the signal while the request is running -- this is fine. But other threads running Go might not work too well with this set on the process.

RdKafka\Conf::set(): Argument #2 ($value) must be of type string, int given in /srv/public/worker.php:14

You should disable strict types so PHP will automatically cast your integer to a string, otherwise, you need to make sure types are the correct type:

$conf->set('socket.timeout.ms', '60'); // or socket.blocking.max.ms, depending on librdkafka version

withinboredom avatar Feb 12 '25 14:02 withinboredom