68 lines
1.6 KiB
PHP
68 lines
1.6 KiB
PHP
<?php
|
|
|
|
declare(strict_types=1);
|
|
/**
|
|
* This file is part of Hyperf.
|
|
*
|
|
* @link https://www.hyperf.io
|
|
* @document https://hyperf.wiki
|
|
* @contact group@hyperf.io
|
|
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
|
|
*/
|
|
namespace Hyperf\Utils;
|
|
|
|
use Closure;
|
|
use Hyperf\Engine\Channel;
|
|
use Hyperf\Utils\Exception\ExceptionThrower;
|
|
use Hyperf\Utils\Exception\WaitTimeoutException;
|
|
use Throwable;
|
|
|
|
class Waiter
|
|
{
|
|
/**
|
|
* @var float
|
|
*/
|
|
protected $pushTimeout = 10.0;
|
|
|
|
/**
|
|
* @var float
|
|
*/
|
|
protected $popTimeout = 10.0;
|
|
|
|
public function __construct(float $timeout = 10.0)
|
|
{
|
|
$this->popTimeout = $timeout;
|
|
}
|
|
|
|
/**
|
|
* @param null|float $timeout seconds
|
|
*/
|
|
public function wait(Closure $closure, ?float $timeout = null)
|
|
{
|
|
if ($timeout === null) {
|
|
$timeout = $this->popTimeout;
|
|
}
|
|
|
|
$channel = new Channel(1);
|
|
Coroutine::create(function () use ($channel, $closure) {
|
|
try {
|
|
$result = $closure();
|
|
} catch (Throwable $exception) {
|
|
$result = new ExceptionThrower($exception);
|
|
} finally {
|
|
$channel->push($result ?? null, $this->pushTimeout);
|
|
}
|
|
});
|
|
|
|
$result = $channel->pop($timeout);
|
|
if ($result === false && $channel->isTimeout()) {
|
|
throw new WaitTimeoutException(sprintf('Channel wait failed, reason: Timed out for %s s', $timeout));
|
|
}
|
|
if ($result instanceof ExceptionThrower) {
|
|
throw $result->getThrowable();
|
|
}
|
|
|
|
return $result;
|
|
}
|
|
}
|