Pawl icon indicating copy to clipboard operation
Pawl copied to clipboard

Messages batching / detecting whether there is backlog of messages

Open tomekit opened this issue 4 years ago • 1 comments

I am quite new to WebSockets and concept of Event Loop. I've got code that buffers all messages and runs some calculations which takes some time to finish. I want to run the calculations as soon as there is new message coming in. At the same time I don't want to "block" the loop that receives a messages as there may be a backlog of messages to process. In that case I would like to skip the calculations and only executed them if there isn't any backlog of messages.

Is that possible?

Example code:

$messages = [];
$fnCalculate = function($messages) {
// do some calculations for all messages
};

\Ratchet\Client\connect('wss://echo.websocket.org:443')->then(function($conn) use($messages, $fnCalculate) {
	$conn->on('message', function($msg) use ($conn, $messages, $fnCalculate) {
		$messages += $msg;

		$isMessageBufferEmpty = true; // In other words aren't any messages waiting in the buffer that we need to process?
		if ($isMessageBufferEmpty) { // No messages to process, free to run the calculations
			$fnCalculate($messages);
		}

	});

	$conn->send('subscribe');
}, function ($e) {
	echo "Could not connect: {$e->getMessage()}\n";
});

I am aware that this might not be directly related to Pawl library. I would appreciate if anyone could shed some light how to solve this issue and whether Pawl can help here.

In other words I want to process messages as fast as possible and run calculations as fast as possible, however in case there are more messages waiting to be fetched I want to skip calculations and run them once buffer is empty.

tomekit avatar May 08 '20 11:05 tomekit

@tomekit Interesting question!

I'm not sure I follow the exact issue you're trying to solve, but have you seen https://github.com/clue/reactphp-mq? This allows you to set up a Queue object with executes a number of operations and allows you to reject further operations when its $limit is reached.

// each job should use the browser to GET a certain URL
// limit number of concurrent jobs to 1 here, reject any further ones
$q = new Clue\React\Mq\Queue(1, 1, function ($url) use ($browser) {
    return $browser->get($url);
});

$q($url); // works
$q($url); // will be rejected while the first is still pending

clue avatar May 08 '20 15:05 clue