swoole-src
swoole-src copied to clipboard
Ability to wait one of multiple communication operations
I suggest to implement GoLang select
operator: https://tour.golang.org/concurrency/5
This can be a great way to implement cancellable operations and gives more control on operations flow.
Select
The select
statement lets a goroutine wait on multiple communication operations.
A select
blocks until one of its cases can run, then it executes that case. It chooses one at random if multiple are ready.
GoLang example
package main
import "fmt"
func fibonacci(c, quit chan int) {
x, y := 0, 1
for {
select {
case c <- x:
x, y = y, x+y
case <-quit:
fmt.Println("quit")
return
}
}
}
func main() {
c := make(chan int)
quit := make(chan int)
go func() {
for i := 0; i < 10; i++ {
fmt.Println(<-c)
}
quit <- 0
}()
fibonacci(c, quit)
}
PHP example (my vision of new API)
<?php
declare(strict_types=1);
use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
use Swoole\Timer;
use function Co\run;
/**
* Awaitable interface that represents any communication operation (socket read/write, channel, timer and etc)
*/
interface Awaitable
{
/**
* @return bool
*/
public function hasResult(): bool;
public function cancel(): void;
}
/** It is just a new Channel representation */
class AwaitableChannel extends Channel implements Awaitable
{
public function hasResult(): bool
{
return $this->stats()['queue_num'] > 0;
}
public function cancel(): void
{
}
}
/** It is just a new Timer representation */
class AwaitableTimer implements Awaitable
{
private int $tid = -1;
private bool $hasResult = false;
private function __construct()
{
}
public function __destruct()
{
$this->cancel();
}
public function hasResult(): bool
{
return $this->hasResult;
}
public function cancel(): void
{
if ($this->tid !== -1) {
Timer::clear($this->tid);
$this->tid = -1;
}
}
public static function after(int $ms): self
{
$timer = new AwaitableTimer();
$timer->tid = Timer::after(
$ms,
function () use ($timer) {
$timer->hasResult = true;
}
);
return $timer;
}
}
/**
* GoLang select operator implementation
*
* @param SplObjectStorage<Awaitable, Closure> $map
*/
function select(SplObjectStorage $map): void
{
// simulating block until any event arrives
while (true) {
/**
* @var Awaitable $awaitable
* @var Closure $handler
*/
foreach ($map as $awaitable) {
if ($awaitable->hasResult()) {
$handler = $map->getInfo();
$handler();
break 2;
}
}
// Coroutine should yield until any event arrives, but currently it is not possible
Coroutine::sleep(0.05);
}
foreach ($map as $awaitable) {
$awaitable->cancel();
}
}
class Producer
{
protected AwaitableChannel $msgCh;
protected AwaitableChannel $stopCh;
protected bool $stopped = false;
public function __construct()
{
$this->msgCh = new AwaitableChannel(1);
$this->stopCh = new AwaitableChannel(1);
}
public function getMessageChannel(): AwaitableChannel
{
return $this->msgCh;
}
public function stop(): AwaitableChannel
{
$this->stopped = true;
return $this->stopCh;
}
public function produce(): void
{
$counter = 0;
while (!$this->stopped) {
// simulating workload
Coroutine::sleep(0.5);
$this->msgCh->push(++$counter);
}
echo "[producer] Stopped" . PHP_EOL;
$this->stopCh->push(1);
}
}
/** This producer won't stop */
class BadProducer extends Producer {
public function produce(): void
{
$counter = 0;
while (true) {
// simulating workload
Coroutine::sleep(0.5);
$this->msgCh->push(++$counter);
}
}
}
class Consumer
{
private AwaitableChannel $msgCh;
private AwaitableChannel $quitCh;
private AwaitableChannel $stopCh;
private bool $stopped = false;
public function __construct(AwaitableChannel $msgCh)
{
$this->msgCh = $msgCh;
$this->quitCh = new AwaitableChannel(1);
$this->stopCh = new AwaitableChannel(1);
}
public function consume(): void
{
$selectMap = new SplObjectStorage();
// the select operator cases (<-msgCh)
$selectMap[$this->msgCh] = function () {
$msg = $this->msgCh->pop();
echo "[consumer] received: {$msg}" . PHP_EOL;
};
// <-quitCh
$selectMap[$this->quitCh] = function () {
$this->stopped = true;
// free resources
};
while (!$this->stopped) {
$timer = AwaitableTimer::after(500);
$selectMap[$timer] = function () {
echo "[consumer] No events in 500ms, aborting" . PHP_EOL;
$this->stop();
};
select($selectMap);
unset($selectMap[$timer]);
}
echo "[consumer] Stopped" . PHP_EOL;
$this->stopCh->push(1);
}
public function stop(): AwaitableChannel
{
$this->quitCh->push(1);
$this->stopped = true;
return $this->stopCh;
}
}
run(
static function () {
$producer = new Producer();
$badProducer = new BadProducer();
$consumer = new Consumer($producer->getMessageChannel());
Coroutine::create(Closure::fromCallable([$consumer, 'consume']));
Coroutine::create(Closure::fromCallable([$producer, 'produce']));
Coroutine::create(Closure::fromCallable([$badProducer, 'produce']));
// automatically stop app
Timer::after(
5000,
static function () {
\Swoole\Process::kill(
getmypid(),
SIGTERM
);
}
);
Coroutine::waitSignal(SIGTERM);
echo "[main] SIGTERM signal received" . PHP_EOL;
/**
* Stopping producer
*/
$stopCh = $producer->stop();
$map = new SplObjectStorage();
$map[$stopCh] = function () {
echo "[main] Producer is stopped" . PHP_EOL;
};
// give to Producer 100 ms for graceful shutdown
$map[AwaitableTimer::after(100)] = function () {
echo "[main] Producer is not stopped in 100ms" . PHP_EOL;
};
// wait for producer graceful shutdown
select($map);
/**
* Stopping consumer
*/
$stopCh = $consumer->stop();
$map = new SplObjectStorage();
$map[$stopCh] = function () {
echo "[main] Consumer is stopped" . PHP_EOL;
};
// give to Consumer 100 ms for graceful shutdown
$map[AwaitableTimer::after(100)] = function () {
echo "[main] Consumer is not stopped in 100ms" . PHP_EOL;
};
// wait for consumer graceful shutdown
select($map);
/**
* Stopping bad producer
*/
$stopCh = $badProducer->stop();
$map = new SplObjectStorage();
$map[$stopCh] = function () {
echo "[main] BadProducer is stopped" . PHP_EOL;
};
// give to BadProducer 100 ms for graceful shutdown
$map[AwaitableTimer::after(100)] = function () {
echo "[main] BadProducer is not stopped in 100ms" . PHP_EOL;
};
// wait for bad producer graceful shutdown
select($map);
}
);
exit(0);
This can only stop the coroutine if it's in the middle of some IO tho. (not your fibonacci function)
I achieved this in a simpler/not so generic way:
- WaitGroups for timeouts (original coroutine inside another coroutine)
- If I have a very long running piece of code, I check some "global" variable and end the execution tree there.
I would also like to see this implemented. One of my main use cases for Select is to do proactive cancellation.