代码初始化

This commit is contained in:
2025-08-07 20:21:47 +08:00
commit 50f3a2dbb0
2191 changed files with 374790 additions and 0 deletions

View File

@@ -0,0 +1,183 @@
<?php
/**
* Created by PhpStorm.
* User: Jenner
* Date: 2015/11/3
* Time: 14:37
*/
namespace Jenner\SimpleFork;
/**
* processes' pool
*
* @package Jenner\SimpleFork
*/
abstract class AbstractPool
{
/**
* process list
*
* @var Process[]
*/
protected $processes = array();
/**
* get process by pid
*
* @param $pid
* @return null|Process
*/
public function getProcessByPid($pid)
{
foreach ($this->processes as $process) {
if ($process->getPid() == $pid) {
return $process;
}
}
return null;
}
/**
* shutdown sub process and no wait. it is dangerous,
* maybe the sub process is working.
*/
public function shutdownForce()
{
$this->shutdown(SIGKILL);
}
/**
* shutdown all process
*
* @param int $signal
*/
public function shutdown($signal = SIGTERM)
{
foreach ($this->processes as $process) {
if ($process->isRunning()) {
$process->shutdown(true, $signal);
}
}
}
/**
* if all processes are stopped
*
* @return bool
*/
public function isFinished()
{
foreach ($this->processes as $process) {
if (!$process->isStopped()) {
return false;
}
}
return true;
}
/**
* waiting for the sub processes to exit
*
* @param bool|true $block if true the parent process will be blocked until all
* sub processes exit. else it will check if there are processes that had been exited once and return.
* @param int $sleep when $block is true, it will check sub processes every $sleep minute
*/
public function wait($block = true, $sleep = 100)
{
do {
foreach ($this->processes as $process) {
if (!$process->isRunning()) {
continue;
}
}
usleep($sleep);
} while ($block && $this->aliveCount() > 0);
}
/**
* get the count of running processes
*
* @return int
*/
public function aliveCount()
{
$count = 0;
foreach ($this->processes as $process) {
if ($process->isRunning()) {
$count++;
}
}
return $count;
}
/**
* get process by name
*
* @param string $name process name
* @return Process|null
*/
public function getProcessByName($name)
{
foreach ($this->processes as $process) {
if ($process->name() == $name) {
return $process;
}
}
return null;
}
/**
* remove process by name
*
* @param string $name process name
* @throws \RuntimeException
*/
public function removeProcessByName($name)
{
foreach ($this->processes as $key => $process) {
if ($process->name() == $name) {
if ($process->isRunning()) {
throw new \RuntimeException("can not remove a running process");
}
unset($this->processes[$key]);
}
}
}
/**
* remove exited process
*/
public function removeExitedProcess()
{
foreach ($this->processes as $key => $process) {
if ($process->isStopped()) {
unset($this->processes[$key]);
}
}
}
/**
* return process count
*
* @return int
*/
public function count()
{
return count($this->processes);
}
/**
* get all processes
*
* @return Process[]
*/
public function getProcesses()
{
return $this->processes;
}
}

View File

@@ -0,0 +1,53 @@
<?php
/**
* Created by PhpStorm.
* User: Jenner
* Date: 2015/8/12
* Time: 14:59
*/
namespace Jenner\SimpleFork\Cache;
/**
* cache for processes shared variables
*
* @package Jenner\SimpleFork\Cache
*/
interface CacheInterface
{
/**
* get var
*
* @param $key
* @param null $default
* @return bool|mixed
*/
public function get($key, $default = null);
/**
* set var
*
* @param $key
* @param null $value
* @return
*/
public function set($key, $value);
/**
* has var ?
*
* @param $key
* @return bool
*/
public function has($key);
/**
* delete var
*
* @param $key
* @return bool
*/
public function delete($key);
}

View File

@@ -0,0 +1,263 @@
<?php
/**
* Created by PhpStorm.
* User: Jenner
* Date: 2016/6/22
* Time: 16:18
*/
namespace Jenner\SimpleFork\Cache;
class FileCache implements CacheInterface
{
/**
* 缓存目录
* @var
*/
private $cache_dir;
/**
* @param string $cache_dir
* @throws \Exception
*/
public function __construct($cache_dir)
{
$this->cache_dir = $cache_dir;
if (!is_dir($cache_dir)) {
$make_dir_result = mkdir($cache_dir, 0755, true);
if ($make_dir_result === false) throw new \Exception('Cannot create the cache directory');
}
}
/**
* get value by key, and check if it is expired
* @param string $key
* @param string $default
* @return mixed
*/
public function get($key, $default = null)
{
$cache_data = $this->getItem($key);
if ($cache_data === false || !is_array($cache_data)) return $default;
return $cache_data['data'];
}
/**
* 添加或覆盖一个key
* @param string $key
* @param mixed $value
* @param int $expire expire time in seconds
* @return mixed
*/
public function set($key, $value, $expire = 0)
{
return $this->setItem($key, $value, time(), $expire);
}
/**
* 设置包含元数据的信息
* @param $key
* @param $value
* @param $time
* @param $expire
* @return bool
*/
private function setItem($key, $value, $time, $expire)
{
$cache_file = $this->createCacheFile($key);
if ($cache_file === false) return false;
$cache_data = array('data' => $value, 'time' => $time, 'expire' => $expire);
$cache_data = serialize($cache_data);
$put_result = file_put_contents($cache_file, $cache_data);
if ($put_result === false) return false;
return true;
}
/**
* 创建缓存文件
* @param $key
* @return bool|string
*/
private function createCacheFile($key)
{
$cache_file = $this->path($key);
if (!file_exists($cache_file)) {
$directory = dirname($cache_file);
if (!is_dir($directory)) {
$make_dir_result = mkdir($directory, 0755, true);
if ($make_dir_result === false) return false;
}
$create_result = touch($cache_file);
if ($create_result === false) return false;
}
return $cache_file;
}
/**
* 判断Key是否存在
* @param $key
* @return mixed
*/
public function has($key)
{
$value = $this->get($key);
if ($value === false) return false;
return true;
}
/**
* 加法递增
* @param $key
* @param int $value
* @return mixed
*/
public function increment($key, $value = 1)
{
$item = $this->getItem($key);
if ($item === false) {
$set_result = $this->set($key, $value);
if ($set_result === false) return false;
return $value;
}
$check_expire = $this->checkExpire($item);
if ($check_expire === false) return false;
$item['data'] += $value;
$result = $this->setItem($key, $item['data'], $item['time'], $item['expire']);
if ($result === false) return false;
return $item['data'];
}
/**
* 减法递增
* @param $key
* @param int $value
* @return mixed
*/
public function decrement($key, $value = 1)
{
$item = $this->getItem($key);
if ($item === false) {
$value = 0 - $value;
$set_result = $this->set($key, $value);
if ($set_result === false) return false;
return $value;
}
$check_expire = $this->checkExpire($item);
if ($check_expire === false) return false;
$item['data'] -= $value;
$result = $this->setItem($key, $item['data'], $item['time'], $item['expire']);
if ($result === false) return false;
return $item['data'];
}
/**
* 删除一个key同事会删除缓存文件
* @param $key
* @return boolean
*/
public function delete($key)
{
$cache_file = $this->path($key);
if (file_exists($cache_file)) {
$unlink_result = unlink($cache_file);
if ($unlink_result === false) return false;
}
return true;
}
/**
* 清楚所有缓存
* @return mixed
*/
public function flush()
{
return $this->delTree($this->cache_dir);
}
/**
* 递归删除目录
* @param $dir
* @return bool
*/
function delTree($dir)
{
$files = array_diff(scandir($dir), array('.', '..'));
foreach ($files as $file) {
(is_dir("$dir/$file")) ? $this->delTree("$dir/$file") : unlink("$dir/$file");
}
return rmdir($dir);
}
/**
* 根据key获取缓存文件路径
*
* @param string $key
* @return string
*/
protected function path($key)
{
$parts = array_slice(str_split($hash = md5($key), 2), 0, 2);
return $this->cache_dir . '/' . implode('/', $parts) . '/' . $hash;
}
/**
* 获取含有元数据的信息
* @param $key
* @return bool|mixed|string
*/
protected function getItem($key)
{
$cache_file = $this->path($key);
if (!file_exists($cache_file) || !is_readable($cache_file)) {
return false;
}
$data = file_get_contents($cache_file);
if (empty($data)) return false;
$cache_data = unserialize($data);
if ($cache_data === false) {
return false;
}
$check_expire = $this->checkExpire($cache_data);
if ($check_expire === false) {
$this->delete($key);
return false;
}
return $cache_data;
}
/**
* 检查key是否过期
* @param $cache_data
* @return bool
*/
protected function checkExpire($cache_data)
{
$time = time();
$is_expire = intval($cache_data['expire']) !== 0 && (intval($cache_data['time']) + intval($cache_data['expire']) < $time);
if ($is_expire) return false;
return true;
}
}

View File

@@ -0,0 +1,126 @@
<?php
/**
* Created by PhpStorm.
* User: Jenner
* Date: 2015/8/20
* Time: 15:14
*/
namespace Jenner\SimpleFork\Cache;
/**
* redis cache
*
* @package Jenner\SimpleFork\Cache
*/
class RedisCache implements CacheInterface
{
/**
* @var \Redis
*/
protected $redis;
protected $prefix;
/**
* @param string $host
* @param int $port
* @param int $database
* @param string $prefix
*/
public function __construct(
$host = '127.0.0.1',
$port = 6379,
$database = 0,
$prefix = 'simple-fork'
)
{
$this->redis = new \Redis();
$connection_result = $this->redis->connect($host, $port);
if (!$connection_result) {
throw new \RuntimeException('can not connect to the redis server');
}
if ($database != 0) {
$select_result = $this->redis->select($database);
if (!$select_result) {
throw new \RuntimeException('can not select the database');
}
}
if (empty($prefix)) {
throw new \InvalidArgumentException('prefix can not be empty');
}
$this->prefix = $prefix;
}
/**
* close redis connection
*/
public function __destruct()
{
$this->close();
}
/**
* close the connection
*/
public function close()
{
$this->redis->close();
}
/**
* get var
*
* @param $key
* @param null $default
* @return bool|string|null
*/
public function get($key, $default = null)
{
$result = $this->redis->hGet($this->prefix, $key);
if ($result !== false) return $result;
return $default;
}
/**
* set var
*
* @param $key
* @param null $value
* @return boolean
*/
public function set($key, $value)
{
return $this->redis->hSet($this->prefix, $key, $value);
}
/**
* has var ?
*
* @param $key
* @return bool
*/
public function has($key)
{
return $this->redis->hExists($this->prefix, $key);
}
/**
* delete var
*
* @param $key
* @return bool
*/
public function delete($key)
{
if ($this->redis->hDel($this->prefix, $key) > 0) {
return true;
}
return false;
}
}

View File

@@ -0,0 +1,176 @@
<?php
/**
* Created by PhpStorm.
* User: Jenner
* Date: 2015/8/12
* Time: 15:00
*/
namespace Jenner\SimpleFork\Cache;
/**
* shared memory cache
*
* @package Jenner\SimpleFork\Cache
*/
class SharedMemory implements CacheInterface
{
/**
* holds shared memory resource
* @var resource
*/
protected $shm;
/**
* shared memory ipc key
* @var string
*/
protected $client_count_key = 'system_client_count';
/**
* memory size
* @var int
*/
protected $size;
/**
* @param int $size memory size
* @param string $file
*/
public function __construct($size = 33554432, $file = __FILE__)
{
$this->size = $size;
if (function_exists("shm_attach") === false) {
$message = "\nYour PHP configuration needs adjustment. " .
"See: http://us2.php.net/manual/en/shmop.setup.php. " .
"To enable the System V shared memory support compile " .
" PHP with the option --enable-sysvshm.";
throw new \RuntimeException($message);
}
$this->attach($file); //create resources (shared memory)
}
/**
* connect shared memory
*
* @param string $file
*/
public function attach($file = __FILE__)
{
if (!file_exists($file)) {
$touch = touch($file);
if (!$touch) {
throw new \RuntimeException("file is not exists and it can not be created. file: {$file}");
}
}
$key = ftok($file, 'a');
$this->shm = shm_attach($key, $this->size); //allocate shared memory
}
/**
* remove shared memory.
* you should know that it maybe does not work.
*
* @return bool
*/
public function remove()
{
//dallocate shared memory
if (!shm_remove($this->shm)) {
return false;
}
$this->dettach();
// shm_remove maybe not working. it likes a php bug.
unset($this->shm);
return true;
}
/**
* @return bool
*/
public function dettach()
{
return shm_detach($this->shm); //allocate shared memory
}
/**
* set var
*
* @param $key
* @param $value
* @return bool
*/
public function set($key, $value)
{
return shm_put_var($this->shm, $this->shm_key($key), $value); //store var
}
/**
* generate shm key
*
* @param $val
* @return mixed
*/
public function shm_key($val)
{ // enable all world langs and chars !
// text to number system.
return preg_replace("/[^0-9]/", "", (preg_replace("/[^0-9]/", "", md5($val)) / 35676248) / 619876);
}
/**
* get var
*
* @param $key
* @param null $default
* @return bool|mixed
*/
public function get($key, $default = null)
{
if ($this->has($key)) {
return shm_get_var($this->shm, $this->shm_key($key));
} else {
return $default;
}
}
/**
* has var ?
*
* @param $key
* @return bool
*/
public function has($key)
{
if (shm_has_var($this->shm, $this->shm_key($key))) { // check is isset
return true;
} else {
return false;
}
}
/**
* delete var
*
* @param $key
* @return bool
*/
public function delete($key)
{
if ($this->has($key)) {
return shm_remove_var($this->shm, $this->shm_key($key));
} else {
return false;
}
}
/**
* init when wakeup
*/
public function __wakeup()
{
$this->attach();
}
}

View File

@@ -0,0 +1,67 @@
<?php
/**
* Created by PhpStorm.
* User: Jenner
* Date: 2015/11/2
* Time: 17:45
*/
namespace Jenner\SimpleFork;
/**
* fixed pool
*
* @package Jenner\SimpleFork
*/
class FixedPool extends AbstractPool
{
/**
* @var int max process count
*/
protected $max;
/**
* @param int $max
*/
public function __construct($max = 4)
{
$this->max = $max;
}
public function execute(Process $process)
{
Utils::checkOverwriteRunMethod(get_class($process));
if ($this->aliveCount() < $this->max && !$process->isStarted()) {
$process->start();
}
array_push($this->processes, $process);
}
/**
* wait for all process done
*
* @param bool $block block the master process
* to keep the sub process count all the time
* @param int $interval check time interval
*/
public function wait($block = false, $interval = 100)
{
do {
if ($this->isFinished()) {
return;
}
parent::wait(false);
if ($this->aliveCount() < $this->max) {
foreach ($this->processes as $process) {
if ($process->isStarted()) continue;
$process->start();
if ($this->aliveCount() >= $this->max) break;
}
}
$block ? usleep($interval) : null;
} while ($block);
}
}

View File

@@ -0,0 +1,126 @@
<?php
/**
* Created by PhpStorm.
* User: Jenner
* Date: 2015/8/21
* Time: 14:30
*/
namespace Jenner\SimpleFork\Lock;
/**
* file lock
*
* @package Jenner\SimpleFork\Lock
*/
class FileLock implements LockInterface
{
/**
* @var string lock file
*/
protected $file;
/**
* @var resource
*/
protected $fp;
/**
* @var bool
*/
protected $locked = false;
/**
* @param $file
*/
private function __construct($file)
{
if (!file_exists($file) || !is_readable($file)) {
throw new \RuntimeException("{$file} is not exists or not readable");
}
$this->fp = fopen($file, "r+");
if (!is_resource($this->fp)) {
throw new \RuntimeException("open {$file} failed");
}
}
/**
* create a file lock instance
* if the file is not exists, it will be created
*
* @param string $file lock file
* @return FileLock
*/
public static function create($file)
{
return new FileLock($file);
}
/**
* get a lock
*
* @param bool $blocking
* @return mixed
*/
public function acquire($blocking = true)
{
if ($this->locked) {
throw new \RuntimeException('already lock by yourself');
}
if ($blocking) {
$locked = flock($this->fp, LOCK_EX);
} else {
$locked = flock($this->fp, LOCK_EX | LOCK_NB);
}
if ($locked !== true) {
return false;
}
$this->locked = true;
return true;
}
/**
* is locked
*
* @return mixed
*/
public function isLocked()
{
return $this->locked === true ? true : false;
}
/**
*
*/
public function __destory()
{
if ($this->locked) {
$this->release();
}
}
/**
* release lock
*
* @return mixed
*/
public function release()
{
if (!$this->locked) {
throw new \RuntimeException('release a non lock');
}
$unlock = flock($this->fp, LOCK_UN);
fclose($this->fp);
if ($unlock !== true) {
return false;
}
$this->locked = false;
return true;
}
}

View File

@@ -0,0 +1,40 @@
<?php
/**
* Created by PhpStorm.
* User: Jenner
* Date: 2015/8/21
* Time: 14:24
*/
namespace Jenner\SimpleFork\Lock;
/**
* lock for processes to mutual exclusion
*
* @package Jenner\SimpleFork\Lock
*/
interface LockInterface
{
/**
* get a lock
*
* @param bool $blocking
* @return bool
*/
public function acquire($blocking = true);
/**
* release lock
*
* @return bool
*/
public function release();
/**
* is locked
*
* @return bool
*/
public function isLocked();
}

View File

@@ -0,0 +1,163 @@
<?php
/**
* Created by PhpStorm.
* User: Jenner
* Date: 2015/8/12
* Time: 20:52
*/
namespace Jenner\SimpleFork\Lock;
/**
* sem lock
*
* @package Jenner\SimpleFork\Lock
*/
class Semaphore implements LockInterface
{
/**
* @var
*/
private $lock_id;
/**
* @var bool
*/
private $locked = false;
/**
* init a lock
*
* @param $key
* @param $count
* @throws \RuntimeException
*/
private function __construct($key, $count = 1)
{
if (($this->lock_id = sem_get($this->_stringToSemKey($key), $count)) === false) {
throw new \RuntimeException("Cannot create semaphore for key: {$key}");
}
}
/**
* Semaphore requires a numeric value as the key
*
* @param $identifier
* @return int
*/
protected function _stringToSemKey($identifier)
{
$md5 = md5($identifier);
$key = 0;
for ($i = 0; $i < 32; $i++) {
$key += ord($md5{$i}) * $i;
}
return $key;
}
/**
* create a lock instance
*
* @param $key
* @return Semaphore
*/
public static function create($key)
{
return new Semaphore($key);
}
/**
* release lock
*
* @throws \RuntimeException
*/
public function __destruct()
{
if ($this->isLocked()) {
$this->release();
}
}
/**
* is locked
*
* @return bool
*/
public function isLocked()
{
return $this->locked === true ? true : false;
}
/**
* release lock
*
* @return bool
* @throws \RuntimeException
*/
public function release()
{
if (!$this->locked) {
throw new \RuntimeException("release a non lock");
}
if (!sem_release($this->lock_id)) {
return false;
}
$this->locked = false;
return true;
}
/**
* get a lock
*
* @param bool $blocking
* @return bool
*/
public function acquire($blocking = true)
{
if ($this->locked) {
throw new \RuntimeException('already lock by yourself');
}
if ($blocking === false) {
if (version_compare(PHP_VERSION, '5.6.0') < 0) {
throw new \RuntimeException('php version is at least 5.6.0 for param blocking');
}
if (!sem_acquire($this->lock_id, true)) {
return false;
}
$this->locked = true;
return true;
}
if (!sem_acquire($this->lock_id)) {
return false;
}
$this->locked = true;
return true;
}
/**
* remove the semaphore resource
*
* @return bool
*/
public function remove()
{
if ($this->locked) {
throw new \RuntimeException('can not remove a locked semaphore resource');
}
if (!is_resource($this->lock_id)) {
throw new \RuntimeException('can not remove a empty semaphore resource');
}
if (!sem_release($this->lock_id)) {
return false;
}
return true;
}
}

View File

@@ -0,0 +1,106 @@
<?php
/**
* @author Jenner <hypxm@qq.com>
* @blog http://www.huyanping.cn
* @license https://opensource.org/licenses/MIT MIT
* @datetime: 2015/11/19 20:49
*/
namespace Jenner\SimpleFork;
/**
* parallel pool
*
* @package Jenner\SimpleFork
*/
class ParallelPool extends AbstractPool
{
/**
* @var callable|Runnable sub process callback
*/
protected $runnable;
/**
* @var int max process count
*/
protected $max;
/**
* @param callable|Runnable $callback
* @param int $max
*/
public function __construct($callback, $max = 4)
{
if (!is_callable($callback) && !($callback instanceof Runnable)) {
throw new \InvalidArgumentException('callback must be a callback function or a object of Runnalbe');
}
$this->runnable = $callback;
$this->max = $max;
}
/**
* start the same number processes and kill the old sub process
* just like nginx -s reload
* this method will block until all the old process exit;
*
* @param bool $block
*/
public function reload($block = true)
{
$old_processes = $this->processes;
for ($i = 0; $i < $this->max; $i++) {
$process = new Process($this->runnable);
$process->start();
$this->processes[$process->getPid()] = $process;
}
foreach ($old_processes as $process) {
$process->shutdown();
$process->wait($block);
unset($this->processes[$process->getPid()]);
}
}
/**
* keep sub process count
*
* @param bool $block block the master process
* to keep the sub process count all the time
* @param int $interval check time interval
*/
public function keep($block = false, $interval = 100)
{
do {
$this->start();
// recycle sub process and delete the processes
// which are not running from process list
foreach ($this->processes as $process) {
if (!$process->isRunning()) {
unset($this->processes[$process->getPid()]);
}
}
$block ? usleep($interval) : null;
} while ($block);
}
/**
* start the pool
*/
public function start()
{
$alive_count = $this->aliveCount();
// create sub process and run
if ($alive_count < $this->max) {
$need = $this->max - $alive_count;
for ($i = 0; $i < $need; $i++) {
$process = new Process($this->runnable);
$process->start();
$this->processes[$process->getPid()] = $process;
}
}
}
}

View File

@@ -0,0 +1,38 @@
<?php
/**
* Created by PhpStorm.
* User: Jenner
* Date: 2015/8/12
* Time: 17:54
*/
namespace Jenner\SimpleFork;
/**
* pool
*
* @package Jenner\SimpleFork
*/
class Pool extends AbstractPool
{
/**
* add a process
*
* @param Process $process
* @param null|string $name process name
* @return int
*/
public function execute(Process $process, $name = null)
{
if (!is_null($name)) {
$process->name($name);
}
if (!$process->isStarted()) {
$process->start();
}
return array_push($this->processes, $process);
}
}

View File

@@ -0,0 +1,56 @@
<?php
/**
* @author Jenner <hypxm@qq.com>
* @blog http://www.huyanping.cn
* @license https://opensource.org/licenses/MIT MIT
* @datetime: 2015/11/19 21:14
*/
namespace Jenner\SimpleFork;
class PoolFactory
{
/**
* create a pool instance
*
* @return Pool
*/
public static function newPool()
{
return new Pool();
}
/**
* create a fixed pool instance
*
* @param int $max
* @return FixedPool
*/
public static function newFixedPool($max = 4)
{
return new FixedPool($max);
}
/**
* create a parallel pool instance
*
* @param $callback
* @param int $max
* @return ParallelPool
*/
public static function newParallelPool($callback, $max = 4)
{
return new ParallelPool($callback, $max);
}
/**
* create a single pool
*
* @return SinglePool
*/
public static function newSinglePool()
{
return new SinglePool();
}
}

View File

@@ -0,0 +1,373 @@
<?php
/**
* Created by PhpStorm.
* User: Jenner
* Date: 2015/8/12
* Time: 15:25
*/
namespace Jenner\SimpleFork;
class Process
{
/**
* @var Runnable|callable
*/
protected $runnable;
/**
* @var int
*/
protected $pid = 0;
/**
* @var string custom process name
*/
protected $name = null;
/**
* @var bool if the process is started
*/
protected $started = false;
/**
* @var bool
*/
protected $running = false;
/**
* @var int the signal which made the process terminate
*/
protected $term_signal = null;
/**
* @var int the signal which made the process stop
*/
protected $stop_signal = null;
/**
* @var int error code
*/
protected $errno = null;
/**
* @var string error message
*/
protected $errmsg = null;
/**
* @var bool
*/
protected $if_signal = false;
/**
* @var array
*/
protected $callbacks = array();
/**
* @var array signal handlers
*/
protected $signal_handlers = array();
/**
* @param string $execution it can be a Runnable object, callback function or null
* @param null $name process name,you can manager the process by it's name.
*/
public function __construct($execution = null, $name = null)
{
if (!is_null($execution) && $execution instanceof Runnable) {
$this->runnable = $execution;
} elseif (!is_null($execution) && is_callable($execution)) {
$this->runnable = $execution;
} elseif (!is_null($execution)) {
throw new \InvalidArgumentException('param execution is not a object of Runnable or callable');
} else {
Utils::checkOverwriteRunMethod(get_class($this));
}
if (!is_null($name)) {
$this->name = $name;
}
$this->initStatus();
}
/**
* init process status
*/
protected function initStatus()
{
$this->pid = null;
$this->running = null;
$this->term_signal = null;
$this->stop_signal = null;
$this->errno = null;
$this->errmsg = null;
}
/**
* get pid
*
* @return int
*/
public function getPid()
{
return $this->pid;
}
/**
* get or set name
*
* @param string|null $name
* @return mixed
*/
public function name($name = null)
{
if (!is_null($name)) {
$this->name = $name;
} else {
return $this->name;
}
}
/**
* if the process is stopped
*
* @return bool
*/
public function isStopped()
{
if (is_null($this->errno)) {
return false;
}
return true;
}
/**
* if the process is started
*
* @return bool
*/
public function isStarted()
{
return $this->started;
}
/**
* get pcntl errno
*
* @return int
*/
public function errno()
{
return $this->errno;
}
/**
* get pcntl errmsg
*
* @return string
*/
public function errmsg()
{
return $this->errmsg;
}
public function ifSignal()
{
return $this->if_signal;
}
/**
* start the sub process
* and run the callback
*
* @return string pid
*/
public function start()
{
if (!empty($this->pid) && $this->isRunning()) {
throw new \LogicException("the process is already running");
}
$callback = $this->getCallable();
$pid = pcntl_fork();
if ($pid < 0) {
throw new \RuntimeException("fork error");
} elseif ($pid > 0) {
$this->pid = $pid;
$this->running = true;
$this->started = true;
} else {
$this->pid = getmypid();
$this->signal();
foreach ($this->signal_handlers as $signal => $handler) {
pcntl_signal($signal, $handler);
}
call_user_func($callback);
exit(0);
}
}
/**
* if the process is running
*
* @return bool
*/
public function isRunning()
{
$this->updateStatus();
return $this->running;
}
/**
* update the process status
*
* @param bool $block
*/
protected function updateStatus($block = false)
{
if ($this->running !== true) {
return;
}
if ($block) {
$res = pcntl_waitpid($this->pid, $status);
} else {
$res = pcntl_waitpid($this->pid, $status, WNOHANG | WUNTRACED);
}
if ($res === -1) {
throw new \RuntimeException('pcntl_waitpid failed. the process maybe available');
} elseif ($res === 0) {
$this->running = true;
} else {
if (pcntl_wifsignaled($status)) {
$this->term_signal = pcntl_wtermsig($status);
}
if (pcntl_wifstopped($status)) {
$this->stop_signal = pcntl_wstopsig($status);
}
if (pcntl_wifexited($status)) {
$this->errno = pcntl_wexitstatus($status);
$this->errmsg = pcntl_strerror($this->errno);
} else {
$this->errno = pcntl_get_last_error();
$this->errmsg = pcntl_strerror($this->errno);
}
if (pcntl_wifsignaled($status)) {
$this->if_signal = true;
} else {
$this->if_signal = false;
}
$this->running = false;
}
}
/**
* get sub process callback
*
* @return array|callable|null
*/
protected function getCallable()
{
$callback = null;
if (is_object($this->runnable) && $this->runnable instanceof Runnable) {
$callback = array($this->runnable, 'run');
} elseif (is_callable($this->runnable)) {
$callback = $this->runnable;
} else {
$callback = array($this, 'run');
}
return $callback;
}
/**
* register signal SIGTERM handler,
* when the parent process call shutdown and use the default signal,
* this handler will be triggered
*/
protected function signal()
{
pcntl_signal(SIGTERM, function () {
exit(0);
});
}
/**
* kill self
*
* @param bool|true $block
* @param int $signal
*/
public function shutdown($block = true, $signal = SIGTERM)
{
if (empty($this->pid)) {
throw new \LogicException('the process pid is null, so maybe the process is not started');
}
if (!$this->isRunning()) {
throw new \LogicException("the process is not running");
}
if (!posix_kill($this->pid, $signal)) {
throw new \RuntimeException("kill son process failed");
}
$this->updateStatus($block);
}
/**
* waiting for the sub process exit
*
* @param bool|true $block if block the process
* @param int $sleep default 0.1s check sub process status
* every $sleep milliseconds.
*/
public function wait($block = true, $sleep = 100000)
{
while (true) {
if ($this->isRunning() === false) {
return;
}
if (!$block) {
break;
}
usleep($sleep);
}
}
/**
* register sub process signal handler,
* when the sub process start, the handlers will be registered
*
* @param $signal
* @param callable $handler
*/
public function registerSignalHandler($signal, callable $handler)
{
$this->signal_handlers[$signal] = $handler;
}
/**
* after php-5.3.0, we can call pcntl_singal_dispatch to call signal handlers for pending signals
* which can save cpu resources than using declare(tick=n)
*
* @return bool
*/
public function dispatchSignal()
{
return pcntl_signal_dispatch();
}
/**
* you should overwrite this function
* if you do not use the Runnable or callback.
*/
public function run()
{
}
}

View File

@@ -0,0 +1,143 @@
<?php
/**
* @author Jenner <hypxm@qq.com>
* @blog http://www.huyanping.cn
* @license https://opensource.org/licenses/MIT MIT
* @datetime: 2015/11/24 16:29
*/
namespace Jenner\SimpleFork\Queue;
class Pipe
{
/**
* @var resource
*/
protected $read;
/**
* @var resource
*/
protected $write;
/**
* @var string
*/
protected $filename;
/**
* @var bool
*/
protected $block;
/**
* @param string $filename fifo filename
* @param int $mode
* @param bool $block if blocking
*/
public function __construct($filename = '/tmp/simple-fork.pipe', $mode = 0666, $block = false)
{
if (!file_exists($filename) && !posix_mkfifo($filename, $mode)) {
throw new \RuntimeException('create pipe failed');
}
if (filetype($filename) != 'fifo') {
throw new \RuntimeException('file exists and it is not a fifo file');
}
$this->filename = $filename;
$this->block = $block;
}
public function setBlock($block = true)
{
if (is_resource($this->read)) {
$set = stream_set_blocking($this->read, $block);
if (!$set) {
throw new \RuntimeException('stream_set_blocking failed');
}
}
if (is_resource($this->write)) {
$set = stream_set_blocking($this->write, $block);
if (!$set) {
throw new \RuntimeException('stream_set_blocking failed');
}
}
$this->block = $block;
}
/**
* if the stream is blocking, you would better set the value of size,
* it will not return until the data size is equal to the value of param size
*
* @param int $size
* @return string
*/
public function read($size = 1024)
{
if (!is_resource($this->read)) {
$this->read = fopen($this->filename, 'r+');
if (!is_resource($this->read)) {
throw new \RuntimeException('open file failed');
}
if (!$this->block) {
$set = stream_set_blocking($this->read, false);
if (!$set) {
throw new \RuntimeException('stream_set_blocking failed');
}
}
}
return fread($this->read, $size);
}
/**
* @param $message
* @return int
*/
public function write($message)
{
if (!is_resource($this->write)) {
$this->write = fopen($this->filename, 'w+');
if (!is_resource($this->write)) {
throw new \RuntimeException('open file failed');
}
if (!$this->block) {
$set = stream_set_blocking($this->write, false);
if (!$set) {
throw new \RuntimeException('stream_set_blocking failed');
}
}
}
return fwrite($this->write, $message);
}
/**
*
*/
public function __destruct()
{
$this->close();
}
/**
*
*/
public function close()
{
if (is_resource($this->read)) {
fclose($this->read);
}
if (is_resource($this->write)) {
fclose($this->write);
}
}
public function remove()
{
return unlink($this->filename);
}
}

View File

@@ -0,0 +1,104 @@
<?php
/**
* @author Jenner <hypxm@qq.com>
* @blog http://www.huyanping.cn
* @license https://opensource.org/licenses/MIT MIT
* @datetime: 2015/11/24 18:38
*/
namespace Jenner\SimpleFork\Queue;
class PipeQueue implements QueueInterface
{
/**
* @var Pipe
*/
protected $pipe;
/**
* @var bool
*/
protected $block;
/**
* @param string $filename fifo filename
* @param int $mode
* @param bool $block if blocking
*/
public function __construct($filename = '/tmp/simple-fork.pipe', $mode = 0666)
{
$this->pipe = new Pipe($filename, $mode);
$this->block = false;
$this->pipe->setBlock($this->block);
}
/**
* put value into the queue of channel
*
* @param $value
* @return bool
*/
public function put($value)
{
$len = strlen($value);
if ($len > 2147483647) {
throw new \RuntimeException('value is too long');
}
$raw = pack('N', $len) . $value;
$write_len = $this->pipe->write($raw);
return $write_len == strlen($raw);
}
/**
* get value from the queue of channel
*
* @param bool $block if block when the queue is empty
* @return bool|string
*/
public function get($block = false)
{
if ($this->block != $block) {
$this->pipe->setBlock($block);
$this->block = $block;
}
$len = $this->pipe->read(4);
if ($len === false) {
throw new \RuntimeException('read pipe failed');
}
if (strlen($len) === 0) {
return null;
}
$len = unpack('N', $len);
if (empty($len) || !array_key_exists(1, $len) || empty($len[1])) {
throw new \RuntimeException('data protocol error');
}
$len = intval($len[1]);
$value = '';
while (true) {
$temp = $this->pipe->read($len);
if (strlen($temp) == $len) {
return $temp;
}
$value .= $temp;
$len -= strlen($temp);
if ($len == 0) {
return $value;
}
}
}
/**
* remove the queue resource
*
* @return bool
*/
public function remove()
{
$this->pipe->close();
$this->pipe->remove();
}
}

View File

@@ -0,0 +1,34 @@
<?php
/**
* Created by PhpStorm.
* User: Jenner
* Date: 2015/8/12
* Time: 15:11
*/
namespace Jenner\SimpleFork\Queue;
/**
* queue for processes to transfer data
*
* @package Jenner\SimpleFork\Queue
*/
interface QueueInterface
{
/**
* put value into the queue of channel
*
* @param $value
* @return bool
*/
public function put($value);
/**
* get value from the queue of channel
*
* @param bool $block if block when the queue is empty
* @return bool|string
*/
public function get($block = false);
}

View File

@@ -0,0 +1,144 @@
<?php
/**
* Created by PhpStorm.
* User: Jenner
* Date: 2015/8/20
* Time: 15:03
*/
namespace Jenner\SimpleFork\Queue;
/**
* redis queue
*
* @package Jenner\SimpleFork\Queue
*/
class RedisQueue implements QueueInterface
{
/**
* @var \Redis
*/
protected $redis;
/**
* @var string redis key of queue
*/
protected $channel;
/**
* @param string $host redis server host
* @param int $port redis server port
* @param int $database redis server database num
* @param string $channel redis queue key
* @param string $prefix prefix of redis queue key
*/
public function __construct(
$host = '127.0.0.1',
$port = 6379,
$database = 0,
$channel = 'cache',
$prefix = 'simple-fork-'
)
{
$this->redis = new \Redis();
$connection_result = $this->redis->connect($host, $port);
if (!$connection_result) {
throw new \RuntimeException('can not connect to the redis server');
}
if ($database != 0) {
$select_result = $this->redis->select($database);
if (!$select_result) {
throw new \RuntimeException('can not select the database');
}
}
if (empty($channel)) {
throw new \InvalidArgumentException('channel can not be empty');
}
$this->channel = $channel;
if (empty($prefix)) return;
$set_option_result = $this->redis->setOption(\Redis::OPT_PREFIX, $prefix);
if (!$set_option_result) {
throw new \RuntimeException('can not set the \Redis::OPT_PREFIX Option');
}
}
/**
* put value into the queue
*
* @param $value
* @return bool
*/
public function put($value)
{
if ($this->redis->lPush($this->channel, $value) !== false) {
return true;
}
return false;
}
/**
* get value from the queue
*
* @param bool $block if block when the queue is empty
* @return bool|string
*/
public function get($block = false)
{
if (!$block) {
return $this->redis->rPop($this->channel);
} else {
while (true) {
$record = $this->redis->rPop($this->channel);
if ($record === false) {
usleep(1000);
continue;
}
return $record;
}
}
}
/**
* get the size of the queue
*
* @return int
*/
public function size()
{
return $this->redis->lSize($this->channel);
}
/**
* remove the queue resource
*
* @return mixed
*/
public function remove()
{
return $this->redis->delete($this->channel);
}
/**
* close the connection
*/
public function __destruct()
{
$this->close();
}
/**
* close the connection
*/
public function close()
{
$this->redis->close();
}
}

View File

@@ -0,0 +1,293 @@
<?php
/**
* Created by PhpStorm.
* User: Jenner
* Date: 2015/8/12
* Time: 15:15
*/
namespace Jenner\SimpleFork\Queue;
/**
* system v message queue
*
* @package Jenner\SimpleFork\Queue
*/
class SystemVMessageQueue implements QueueInterface
{
/**
* @var int channel
*/
protected $msg_type;
/**
* @var
*/
protected $queue;
/**
* @var bool
*/
protected $serialize_needed;
/**
* @var bool
*/
protected $block_send;
/**
* @var int
*/
protected $option_receive;
/**
* @var int
*/
protected $maxsize;
/**
* @var
*/
protected $key_t;
/**
* @var string
*/
protected $ipc_filename;
/**
* @param string $ipc_filename ipc file to make ipc key.
* if it does not exists, it will try to create the file.
* @param int $channel message type
* @param bool $serialize_needed serialize or not
* @param bool $block_send if block when the queue is full
* @param int $option_receive if the value is MSG_IPC_NOWAIT it will not
* going to wait a message coming. if the value is null,
* it will block and wait a message
* @param int $maxsize the max size of queue
*/
public function __construct(
$ipc_filename = __FILE__,
$channel = 1,
$serialize_needed = true,
$block_send = true,
$option_receive = MSG_IPC_NOWAIT,
$maxsize = 100000
)
{
$this->ipc_filename = $ipc_filename;
$this->msg_type = $channel;
$this->serialize_needed = $serialize_needed;
$this->block_send = $block_send;
$this->option_receive = $option_receive;
$this->maxsize = $maxsize;
$this->initQueue($ipc_filename, $channel);
}
/**
* init queue
*
* @param $ipc_filename
* @param $msg_type
* @throws \Exception
*/
protected function initQueue($ipc_filename, $msg_type)
{
$this->key_t = $this->getIpcKey($ipc_filename, $msg_type);
$this->queue = \msg_get_queue($this->key_t);
if (!$this->queue) throw new \RuntimeException('msg_get_queue failed');
}
/**
* @param $ipc_filename
* @param $msg_type
* @throws \Exception
* @return int
*/
public function getIpcKey($ipc_filename, $msg_type)
{
if (!file_exists($ipc_filename)) {
$create_file = touch($ipc_filename);
if ($create_file === false) {
throw new \RuntimeException('ipc_file is not exists and create failed');
}
}
$key_t = \ftok($ipc_filename, $msg_type);
if ($key_t == 0) throw new \RuntimeException('ftok error');
return $key_t;
}
/**
* get message
*
* @param bool $block if block when the queue is empty
* @return bool|string
*/
public function get($block = false)
{
$queue_status = $this->status();
if ($queue_status['msg_qnum'] > 0) {
$option_receive = $block ? 0 : $this->option_receive;
if (\msg_receive(
$this->queue,
$this->msg_type,
$msgtype_erhalten,
$this->maxsize,
$data,
$this->serialize_needed,
$option_receive,
$err
) === true
) {
return $data;
} else {
throw new \RuntimeException($err);
}
} else {
return false;
}
}
public function status()
{
$queue_status = \msg_stat_queue($this->queue);
return $queue_status;
}
/*
* return array's keys
* msg_perm.uid The uid of the owner of the queue.
* msg_perm.gid The gid of the owner of the queue.
* msg_perm.mode The file access mode of the queue.
* msg_stime The time that the last message was sent to the queue.
* msg_rtime The time that the last message was received from the queue.
* msg_ctime The time that the queue was last changed.
* msg_qnum The number of messages waiting to be read from the queue.
* msg_qbytes The maximum number of bytes allowed in one message queue.
* On Linux, this value may be read and modified via /proc/sys/kernel/msgmnb.
* msg_lspid The pid of the process that sent the last message to the queue.
* msg_lrpid The pid of the process that received the last message from the queue.
*
* @return array
*/
/**
* put message
*
* @param $message
* @return bool
* @throws \Exception
*/
public function put($message)
{
if (!\msg_send($this->queue, $this->msg_type, $message, $this->serialize_needed, $this->block_send, $err) === true) {
throw new \RuntimeException($err);
}
return true;
}
/**
* get the size of queue
*
* @return mixed
*/
public function size()
{
$status = $this->status();
return $status['msg_qnum'];
}
/**
* allows you to change the values of the msg_perm.uid,
* msg_perm.gid, msg_perm.mode and msg_qbytes fields of the underlying message queue data structure
*
* @param string $key status key
* @param int $value status value
* @return bool
*/
public function setStatus($key, $value)
{
$this->checkSetPrivilege($key);
if ($key == 'msg_qbytes')
return $this->setMaxQueueSize($value);
$queue_status[$key] = $value;
return \msg_set_queue($this->queue, $queue_status);
}
/**
* check the privilege of update the queue's status
*
* @param $key
* @throws \Exception
*/
private function checkSetPrivilege($key)
{
$privilege_field = array('msg_perm.uid', 'msg_perm.gid', 'msg_perm.mode');
if (!\in_array($key, $privilege_field)) {
$message = 'you can only change msg_perm.uid, msg_perm.gid, ' .
' msg_perm.mode and msg_qbytes. And msg_qbytes needs root privileges';
throw new \RuntimeException($message);
}
}
/**
* update the max size of queue
* need root
*
* @param $size
* @throws \Exception
* @return bool
*/
public function setMaxQueueSize($size)
{
$user = \get_current_user();
if ($user !== 'root')
throw new \Exception('changing msg_qbytes needs root privileges');
return $this->setStatus('msg_qbytes', $size);
}
/**
* remove queue
*
* @return bool
*/
public function remove()
{
return \msg_remove_queue($this->queue);
}
/**
* check if the queue is exists or not
*
* @param $key
* @return bool
*/
public function queueExists($key)
{
return \msg_queue_exists($key);
}
/**
* init when wakeup
*/
public function __wakeup()
{
$this->initQueue($this->ipc_filename, $this->msg_type);
}
/**
*
*/
public function __destruct()
{
unset($this);
}
}

View File

@@ -0,0 +1,20 @@
<?php
/**
* Created by PhpStorm.
* User: Jenner
* Date: 2015/8/12
* Time: 15:28
*/
namespace Jenner\SimpleFork;
interface Runnable
{
/**
* process entry
*
* @return mixed
*/
public function run();
}

View File

@@ -0,0 +1,26 @@
<?php
/**
* @author Jenner <hypxm@qq.com>
* @blog http://www.huyanping.cn
* @license https://opensource.org/licenses/MIT MIT
* @datetime: 2015/11/19 21:13
*/
namespace Jenner\SimpleFork;
/**
* Only one process will be started at the same time
*
* @package Jenner\SimpleFork
*/
class SinglePool extends FixedPool
{
/**
* SinglePool constructor.
*/
public function __construct()
{
parent::__construct(1);
}
}

View File

@@ -0,0 +1,48 @@
<?php
/**
* @author Jenner <hypxm@qq.com>
* @license https://opensource.org/licenses/MIT MIT
* @datetime: 2015/11/11 17:50
*/
namespace Jenner\SimpleFork;
class Utils
{
/**
* check if the sub class of Process has overwrite the run method
*
* @param $child_class
*/
public static function checkOverwriteRunMethod($child_class)
{
$parent_class = '\\Jenner\\SimpleFork\\Process';
if ($child_class == $parent_class) {
$message = "you should extend the `{$parent_class}`" .
' and overwrite the run method';
throw new \RuntimeException($message);
}
$child = new \ReflectionClass($child_class);
if ($child->getParentClass() === false) {
$message = "you should extend the `{$parent_class}`" .
' and overwrite the run method';
throw new \RuntimeException($message);
}
$parent_methods = $child->getParentClass()->getMethods(\ReflectionMethod::IS_PUBLIC);
foreach ($parent_methods as $parent_method) {
if ($parent_method->getName() !== 'run') continue;
$declaring_class = $child->getMethod($parent_method->getName())
->getDeclaringClass()
->getName();
if ($declaring_class === $parent_class) {
throw new \RuntimeException('you must overwrite the run method');
}
}
}
}