Pawl
Pawl copied to clipboard
Messages batching / detecting whether there is backlog of messages
I am quite new to WebSockets and concept of Event Loop. I've got code that buffers all messages and runs some calculations which takes some time to finish. I want to run the calculations as soon as there is new message coming in. At the same time I don't want to "block" the loop that receives a messages as there may be a backlog of messages to process. In that case I would like to skip the calculations and only executed them if there isn't any backlog of messages.
Is that possible?
Example code:
$messages = [];
$fnCalculate = function($messages) {
// do some calculations for all messages
};
\Ratchet\Client\connect('wss://echo.websocket.org:443')->then(function($conn) use($messages, $fnCalculate) {
$conn->on('message', function($msg) use ($conn, $messages, $fnCalculate) {
$messages += $msg;
$isMessageBufferEmpty = true; // In other words aren't any messages waiting in the buffer that we need to process?
if ($isMessageBufferEmpty) { // No messages to process, free to run the calculations
$fnCalculate($messages);
}
});
$conn->send('subscribe');
}, function ($e) {
echo "Could not connect: {$e->getMessage()}\n";
});
I am aware that this might not be directly related to Pawl
library.
I would appreciate if anyone could shed some light how to solve this issue and whether Pawl
can help here.
In other words I want to process messages as fast as possible and run calculations as fast as possible, however in case there are more messages waiting to be fetched I want to skip calculations and run them once buffer is empty.
@tomekit Interesting question!
I'm not sure I follow the exact issue you're trying to solve, but have you seen https://github.com/clue/reactphp-mq? This allows you to set up a Queue
object with executes a number of operations and allows you to reject further operations when its $limit
is reached.
// each job should use the browser to GET a certain URL
// limit number of concurrent jobs to 1 here, reject any further ones
$q = new Clue\React\Mq\Queue(1, 1, function ($url) use ($browser) {
return $browser->get($url);
});
$q($url); // works
$q($url); // will be rejected while the first is still pending