php-rdkafka icon indicating copy to clipboard operation
php-rdkafka copied to clipboard

Persistant connection would be nice to have

Open the100rabh opened this issue 8 years ago • 35 comments

Each time PHP script gets executed, a new instance is created of Kafka. It would be nice if there was some way of keeping the connection persistent and not having to fetch meta data for each request being made. This is especially helpful in case of web services with php-fpm and nginx to avoid latencies due to meta data fetch.

Instead of creating a new topic object each time, it would reuse the same topic object across script invocations.

the100rabh avatar Mar 21 '16 05:03 the100rabh

Is there any plan to implement this? I think it's most important feature for PHP which running in FPM environment. Otherwise each request it have to trigger a bunch of requests to Kafka brokers which really makes it slow.

bennfocus avatar Dec 30 '16 01:12 bennfocus

@arnaud-lb Can we have this ? How can I help ?

djibomar avatar Jun 20 '17 18:06 djibomar

I would gladly accept help on this.

Basically, we would create an API like this:

$instanceName = "somename";
if (!$rk = \RdKafka\Producer::getInstance($instanceName)) {
    $rk = new \RdKafka\Producer($conf, $instanceName);
}

\RdKafka\Producer::getInstance() would search for an existing persistent Producer instance with the given name, and return it.

\RdKafka\Producer($conf, $name) would register itself if a name is passed as second argument.

Internally, only the rdkafka handle would be persistent, not the actual PHP object. getInstance() would search the persistent rdkafka handle, and create a new Producer PHP object with the rdkafka handle.

Producer() would not destroy the rdkafka handle if it's a persistent handle.

arnaud-lb avatar Jun 21 '17 08:06 arnaud-lb

Hi @arnaud-lb! Since we need the persistent producer we decided to implement something ourselves. I already have something started (still WIP, only tested on PHP 7.1) here: https://github.com/awons/php-rdkafka/tree/issue-42/persistent-producer

I am not a C developer and this is my first time with PHP's extension so hopefully this is not too bad ;) I managed to implement a persistent kafka instance. It works quite well (tested within our own integration tests using the extension) but still have one main issue - callbacks. When request ends all callbacks are cleared and kafka, obviously, segfaults when trying to call an arbitrary memory location.

I wanted to ask what do you think about the following solution: For persistent producers we create a hash table with all possible callbacks doing nothing. Those callbacks are then registered in kafka. Then on getInstance() we also need to provide new callbacks like getInstance('name', function(){}). This way kafka always sees pointer to the same global callback and we only replace pointers within this callback. After request those temporary pointers are nullified and when we forget to re-register them kafka will still call the global function doing nothing.

I am still not sure how to handle situation when different requests will need register different callbacks for the same instance, but I will try to figure something out.

What do you think about this idea?

awons avatar Mar 02 '18 15:03 awons

@awons: I'll take a look this week!

arnaud-lb avatar Mar 12 '18 07:03 arnaud-lb

Awesome!

I like the general idea.

About callbacks: You are right, we could do it that way. Maybe we could use the opaque setting to store this data (see rd_kafka_conf_set_opaque() / rd_kafka_opaque()). C would just forward calls to the user callbacks referenced in the opaque struct. We could add methods to change these callbacks after getInstance().

About the instance table: Zend hash tables can be created persistent (see zend_hash_init()).

arnaud-lb avatar Mar 19 '18 15:03 arnaud-lb

Totally agree with the approach to callbacks. There is just one more thing I cannot grasp. We need a way to figure out which callback needs to be called exactly. One instance can only register one callback. Hence one event can, in theory, be propagated to n different requests simultaneously. We the would need a way to register per-request callbacks somehow (maybe with request ID/thread ID?). Not sure yet how to approach this.

As for zend_hash_init. I know it can be used for persistent allocation but the issue I found is that every element in that table needs to be a zval. And php < 5 cannot store arbitrary pointers in zvals. PHP 7 can, but then we either have to keep two different implementations or drop support for php<7. At least this is what I figured. Maybe we could treat pointers as integers and store them like that... It is kind of a hack but maybe it could work.

awons avatar Mar 25 '18 09:03 awons

Hence one event can, in theory, be propagated to n different requests simultaneously

Are you sharing the rdkafka instance between multiple threads ? I believe that it could become very difficult to manage. From the point of view of the user, we will receive events that are not related to the current request. From the point of view of the extension, there is a lot of work required to make sure that callbacks are called in the right thread.

It would be simpler to not share rdkafka instances between multiple threads. Sharing the instance between subsequent requests on the same thread would be enough I think (this is how it would behave on non-threaded environments, like php-fm, too).

PHP internals have a notion of per-thread globals that you can use to achieve that. This is not used in php-rdkafka currently, but you can add support for per-thread globals by applying something like this: https://gist.github.com/arnaud-lb/ed0a6501f17eee05e3f814b6ae2dccd6. Basically, the ZEND_BEGIN_MODULE_GLOBALS() declares a struct that will be thread-specific (each thread has its own instance). You can access field with the RDKAFKA_G() marco, e.g. RDKAFKA_G(some_value)=1.

As for zend_hash_init. I know it can be used for persistent allocation but the issue I found is that every element in that table needs to be a zval

Actually you can store pointers, even though it's not super obvious in PHP 5. See add_consuming_toppar() in rdkafka.c.

arnaud-lb avatar Mar 26 '18 10:03 arnaud-lb

Sorry for late response. Was too busy with another project.

Are you sharing the rdkafka instance between multiple threads ?.

That was my initial idea - to have this feature also available in ZTS. But right now I see it is probably way too complicated and could lead to too many issues. It is probably just not worth it. We could simply throw a RutimeException if someone tries to use this method in ZTS mode, or not even compile it there; not sure which one would be better.

Actually you can store pointers, even though it's not super obvious in PHP 5. See add_consuming_toppar() in rdkafka.c

That is a really nifty workaround :) I will refactor instances handling to use this tick.

awons avatar Apr 12 '18 18:04 awons

I was just thinking about another thing. You mentioned module globals. Correct me if I am wrong, but if we are not going to implement this feature for ZTS then it makes no sense to use module globals because in NTS mode they always resolve to a simple variable.

awons avatar Apr 12 '18 19:04 awons

We could simply throw a RutimeException if someone tries to use this method in ZTS mode, or not even compile it there; not sure which one would be better.

Actually there is no problem with using this feature in ZTS mode, as long as we use module globals. Module globals are the way to go when we need variables whose value persist across requests, but should not be shared across threads.

arnaud-lb avatar Apr 13 '18 08:04 arnaud-lb

Actually there is no problem with using this feature in ZTS mode, as long as we use module globals. Module globals are the way to go when we need variables whose value persist across requests, but should not be shared across threads.

Did my homework. Got confused by an explanation of threaded model I found somewhere on the internet. But you are 100% right. We either have one PHP interpreter per process or one per thread. One interpreter will be responsible for handling multiple requests sequentially (either withing one process or one thread), so it makes perfect sense having this feature working in ZTS mode. Getting back to work then :)

awons avatar Apr 13 '18 18:04 awons

@arnaud-lb I think I am missing something again. You mentioned the add_consuming_toppar() method as an example of how to use raw pointers with hash tables in PHP 5. The problem I see is that methods like zend_hash_str_add_ptr and zend_hash_str_find_ptr are only available in PHP 7. PHP 5 only has zend_hash_add and zend_hash_find.

Now, I see three options here:

  1. I am missing something obvious here and this should work with PHP 5 (highly doubt it);
  2. This is a bug and should be fixed for the extension to work with PHP 5;
  3. This is not a bug and the extension is just not meant to work with PHP 5 (but looking at all the IF/ELSE statements checking for different versions - don't really think so);

Which is it then?

awons avatar Apr 21 '18 11:04 awons

These functions are defined in php_rdkafka_priv.h if the version of PHP doesn't have then already: https://github.com/arnaud-lb/php-rdkafka/blob/6f6f4b046b5bd0bdd8e2ed1aab82b37566ac57d7/php_rdkafka_priv.h#L129-L134

arnaud-lb avatar Apr 23 '18 10:04 arnaud-lb

Hey guys

Is there like an update, status, eta? I am very interested in this as well.

nick-zh avatar Nov 14 '18 11:11 nick-zh

+1

Bibob7 avatar Jan 09 '19 09:01 Bibob7

Another country, another job, and too little time for after hour projects. And I don't use Kafka in my current job anymore. But the problem here is that the kafka object that we have is not the same as the one used later by kafka. Kafka makes a copy of it and I couldn't figure out how get the correct reference. Additionally this would caused lots of changes in the extension itself. My C knowledge is unfortunately very limited :(

awons avatar Jan 09 '19 09:01 awons

Thanks for the update @awons :bowing_man:

nick-zh avatar Jan 09 '19 11:01 nick-zh

Hi @arnaud-lb! Since we need the persistent producer we decided to implement something ourselves. I already have something started (still WIP, only tested on PHP 7.1) here: https://github.com/awons/php-rdkafka/tree/issue-42/persistent-producer

I am not a C developer and this is my first time with PHP's extension so hopefully this is not too bad ;) I managed to implement a persistent kafka instance. It works quite well (tested within our own integration tests using the extension) but still have one main issue - callbacks. When request ends all callbacks are cleared and kafka, obviously, segfaults when trying to call an arbitrary memory location.

I wanted to ask what do you think about the following solution: For persistent producers we create a hash table with all possible callbacks doing nothing. Those callbacks are then registered in kafka. Then on getInstance() we also need to provide new callbacks like getInstance('name', function(){}). This way kafka always sees pointer to the same global callback and we only replace pointers within this callback. After request those temporary pointers are nullified and when we forget to re-register them kafka will still call the global function doing nothing.

I am still not sure how to handle situation when different requests will need register different callbacks for the same instance, but I will try to figure something out.

What do you think about this idea?

@awons I have test it under php-fpm, It could not work.

Below is code

                  $conn = $this->config->getConn();
 34             $this->produce = \Rdkafka\Producer::getInstance($conn);
 35             $pid = getmypid();
 36             if (!$this->produce) {
 37                 Log::info("<kafka($conn)>php-process($pid):create single instance of producer");
 38                 $this->produce = new \RdKafka\Producer($this->config->getProduceConf());
 39             } else {
 40                 Log::debug("<kafka($conn)>php-process($pid):no need to create producer for exists instance");
 41             }

Below is log:

[2019-09-20 16:55:59.110786] test.INFO: #[RL][b3167be39f61e38a72a0][3][0.001/0.08] <kafka(device_status)>php-process(380):create single instance of producer
[2019-09-20 16:56:01.560371] test.INFO: #[RL][cfe51416e3e3e73953e9][3][0.031/0.10] <kafka(device_status)>php-process(381):create single instance of producer
[2019-09-20 16:56:02.310157] test.INFO: #[RL][d11170019b80e9a13ee6][3][0.001/0.11] <kafka(device_status)>php-process(382):create single instance of producer
[2019-09-20 16:56:02.911125] test.INFO: #[RL][098a32f35726af04f142][3][0.001/0.09] <kafka(device_status)>php-process(383):create single instance of producer
[2019-09-20 16:56:03.602770] test.INFO: #[RL][8521dbb62aed83ea2d46][3][0.001/0.14] <kafka(device_status)>php-process(384):create single instance of producer
[2019-09-20 16:56:04.204491] test.INFO: #[RL][eaf5cdbdd848addf1185][3][0.001/0.22] <kafka(device_status)>php-process(397):create single instance of producer
[2019-09-20 16:56:04.611187] test.INFO: #[RL][ba0b84f5d65a283a9181][3][0.001/0.13] <kafka(device_status)>php-process(380):create single instance of producer
[2019-09-20 16:56:04.920222] test.INFO: #[RL][cfc30d377a7d1e404711][3][0.001/0.07] <kafka(device_status)>php-process(381):create single instance of producer
[2019-09-20 16:56:05.409298] test.INFO: #[RL][5f58979bf1646cf45273][3][0.001/0.11] <kafka(device_status)>php-process(382):create single instance of producer
[2019-09-20 16:56:05.917587] test.INFO: #[RL][0d5603654ec3dde23c0d][3][0.001/0.09] <kafka(device_status)>php-process(383):create single instance of producer
[2019-09-20 16:56:06.511175] test.INFO: #[RL][c8141141725e66436a66][3][0.001/0.19] <kafka(device_status)>php-process(384):create single instance of producer
[2019-09-20 16:56:06.920342] test.INFO: #[RL][2a8f4ef4a096490bf70b][3][0.001/0.15] <kafka(device_status)>php-process(397):create single instance of producer
[2019-09-20 16:56:07.306934] test.INFO: #[RL][875b552fe5b2ec7e4491][3][0.001/0.13] <kafka(device_status)>php-process(380):create single instance of producer
[2019-09-20 16:56:07.605764] test.INFO: #[RL][6149c1deb2d07e4dbc2a][3][0.001/0.08] <kafka(device_status)>php-process(381):create single instance of producer
[2019-09-20 16:56:10.114968] test.INFO: #[RL][62347f12aa59ee944183][3][0.001/0.11] <kafka(device_status)>php-process(382):create single instance of producer
[2019-09-20 16:56:10.804955] test.INFO: #[RL][9cda48bb918466d49bf8][3][0.001/0.22] <kafka(device_status)>php-process(383):create single instance of producer
[2019-09-20 16:56:11.321044] test.INFO: #[RL][ac6473b208cc20518860][3][0.001/0.11] <kafka(device_status)>php-process(384):create single instance of producer
[2019-09-20 16:56:11.811838] test.INFO: #[RL][3b6acd3425719aeddad4][3][0.001/0.10] <kafka(device_status)>php-process(397):create single instance of producer
[2019-09-20 16:56:12.305731] test.INFO: #[RL][073a3af1b0596a5158ce][3][0.001/0.09] <kafka(device_status)>php-process(380):create single instance of producer
[2019-09-20 16:56:13.587820] test.INFO: #[RL][e1883b9d64def0a0bc95][3][0.001/0.20] <kafka(device_status)>php-process(381):create single instance of producer
[2019-09-20 16:56:14.628406] test.INFO: #[RL][a84e795007746f9137d8][3][0.001/0.07] <kafka(device_status)>php-process(382):create single instance of producer

dawei101 avatar Sep 20 '19 09:09 dawei101

@dawei101 as @awons stated before, he is not working on this anymore. There is no current progress on this. I would say this is on hold for now and i am not sure if this will move forward in the next few months unless somebody picks up where @awons left off

nick-zh avatar Sep 20 '19 09:09 nick-zh

@dawei101 as @awons stated before, he is not working on this anymore. There is no current progress on this. I would say this is on hold for now and i am not sure if this will move forward in the next few months unless somebody picks up where @awons left off

Ok, thanks

dawei101 avatar Sep 20 '19 09:09 dawei101

I can have a look at this next, but i will first take care of introducing the Admin API, so my rough estimate would be, maybe i can find time in november, just as fyi

nick-zh avatar Sep 20 '19 09:09 nick-zh

Excuse me if I'm wrong, but what if there is a php background script running as a daemon accepting new messages (maybe via socket/ api / or checking a mysql table for queued messages) and forward them to kafka?

This script would then just open the connection with kafka once and produce messages when needed.

It's probably not a good idea to shoot messages to kafka during a page load right?

MrMoronIV avatar Sep 28 '19 07:09 MrMoronIV

@MrMoronIV you're partially right, currently it's not exactly 100% safe to send messages to Kafka from PHP process that handles web requests, like PHP-FPM, Apache PHP mod or PHP SAPI - but the reason is not that it's slow.

Usually what phprdkafka does when you're producing a message is:

  1. Send a message to background thread (separate from the one handling the web request, similar in concept to your "listening PHP process"). If necessary this thread is started.
  2. PHP then returns to normal execution until you start polling (to check the result of producing a message - this basically means thread reports which messages we're delivered successfully, and call any registered callbacks). Note that during this time the message is actually being sent.
  3. When PHP process reaches shutdown at the end, thread will keep retrying to send messages until it reaches (configurable) timeouts. Due to how threads work this thread will keep the process alive if there are any issues with sending.

Point 3. is what will cause issues if you're not careful - a dead Kafka broker or connectivity issues will prevent thread from sending a message and will cause PHP processes to live longer than expected. Since usually your webserver is configured to allow only a specific number of PHP processes to handle incoming requests this eventually causes those threads to starve webserver from resources.

Phprdkafka 4.0 will no longer contain the code handling point 3: thread will terminate as soon as PHP process reaches shutdown. User (programmer) will be responsible to handle delivery checks on their own.

In case of my apps I'm doing exactly what you described - I'm keeping a background process that all messages are delegated into.

Steveb-p avatar Sep 28 '19 07:09 Steveb-p

In case of my apps I'm doing exactly what you described - I'm keeping a background process that all messages are delegated into.

Are you willing to share this code? I'm so confused as to why there is no popular standalone daemon that takes messages via a socket and delivers them in the order they were received (and takes care of retries in case of outage)

MrMoronIV avatar Oct 03 '19 14:10 MrMoronIV

Are you willing to share this code? I'm so confused as to why there is no popular standalone daemon that takes messages via a socket and delivers them in the order they were received (and takes care of retries in case of outage)

@MrMoronIV It's nothing too complicated really.

What I've meant by delegating is I'm saving them to a "temporary" storage (in my case Redis, but it can be anything). Background process simply reads that storage and pushes messages into Kafka.

If you think about it this helps alleviate a number of issues. Using a permanently running process that listens on a socket would be fine, but in case of any failure it would mean potentially losing messages (or storing them somewhere, but then... what's the point? :P). Introducing a temporary storage prevents that and does not introduce any complicated concepts.

I'm using php-enqueue with two transport - redis and kafka. However, I'm not willing to recommend it because you can run into issues with it. You can give it a go and see if it works for you. I also have to mention that I'm helping with maintenance of Kafka package there so I can be biased in my opinion. I'm only mentioning this because you've asked what I'm using :)

Steveb-p avatar Oct 03 '19 22:10 Steveb-p

you can run into issues with it. You can give it a go and see if it works for you.

Why do you say this? Did you have issues yourself or do you just want to be sure I'm not fixated on the first thing mentioned and should consider all available options first? You kinda make it sound like I should avoid this package all together!

On a side note: I do like the redis approach but yeah, it means another package installed on my servers yet again.

MrMoronIV avatar Oct 04 '19 06:10 MrMoronIV

@Steveb-p @MrMoronIV i love the inputs, don't get me wrong, but i think we are getting side tracked here now. If you want to continue discussing this, i would appreciate it, that you move it to another issue, thx guys :+1:

nick-zh avatar Oct 04 '19 06:10 nick-zh

@Steveb-p @MrMoronIV i love the inputs, don't get me wrong, but i think we are getting side tracked here now. If you want to continue discussing this, i would appreciate it, that you move it to another issue, thx guys

Sure, we will move to phprdkafka gitter. Do you mind @MrMoronIV ?

Why do you say this? Did you have issues yourself or do you just want to be sure I'm not fixated on the first thing mentioned and should consider all available options first? You kinda make it sound like I should avoid this package all together!

I don't want to advertise it, and I know some people run into issues with it, so I don't want you to waste your time for too long if you run into some. Give it a shot and see if it's alright for you. For all I know Timesplinter (which is a contributor here) does use it, I think?

On a side note: I do like the redis approach but yeah, it means another package installed on my servers yet again.

You can use filesystem to store messages. Php-enqueue actually does have a transport which does exactly that. However, it should be rather simple to just file_put_contents($filename, json_encode($message)) and then subsequently use DirectoryIterator to iterate over directory contents (to prevent memory issues with scandir) and sent each message to Kafka.

// Offtopic off For permanent Kafka connections, doesn't Mysqli and postgres drivers not allow something similar with permanent database connections? So it should be at least possible?

Steveb-p avatar Oct 04 '19 07:10 Steveb-p

@Steveb-p it is indeed possible, but from what i got, it is still not that easy. I don't have a lot of time right now and since we use swoole in our project, i don't have an urgent need for this feature anymore, but i will still try to give it a go after i resolved issue 215

nick-zh avatar Oct 04 '19 08:10 nick-zh