仓库初始化
This commit is contained in:
143
addons/crontab/library/SimpleFork/Queue/Pipe.php
Normal file
143
addons/crontab/library/SimpleFork/Queue/Pipe.php
Normal file
@@ -0,0 +1,143 @@
|
||||
<?php
|
||||
/**
|
||||
* @author Jenner <hypxm@qq.com>
|
||||
* @blog http://www.huyanping.cn
|
||||
* @license https://opensource.org/licenses/MIT MIT
|
||||
* @datetime: 2015/11/24 16:29
|
||||
*/
|
||||
|
||||
namespace Jenner\SimpleFork\Queue;
|
||||
|
||||
|
||||
class Pipe
|
||||
{
|
||||
/**
|
||||
* @var resource
|
||||
*/
|
||||
protected $read;
|
||||
|
||||
/**
|
||||
* @var resource
|
||||
*/
|
||||
protected $write;
|
||||
|
||||
/**
|
||||
* @var string
|
||||
*/
|
||||
protected $filename;
|
||||
|
||||
/**
|
||||
* @var bool
|
||||
*/
|
||||
protected $block;
|
||||
|
||||
/**
|
||||
* @param string $filename fifo filename
|
||||
* @param int $mode
|
||||
* @param bool $block if blocking
|
||||
*/
|
||||
public function __construct($filename = '/tmp/simple-fork.pipe', $mode = 0666, $block = false)
|
||||
{
|
||||
if (!file_exists($filename) && !posix_mkfifo($filename, $mode)) {
|
||||
throw new \RuntimeException('create pipe failed');
|
||||
}
|
||||
if (filetype($filename) != 'fifo') {
|
||||
throw new \RuntimeException('file exists and it is not a fifo file');
|
||||
}
|
||||
|
||||
$this->filename = $filename;
|
||||
$this->block = $block;
|
||||
}
|
||||
|
||||
public function setBlock($block = true)
|
||||
{
|
||||
if (is_resource($this->read)) {
|
||||
$set = stream_set_blocking($this->read, $block);
|
||||
if (!$set) {
|
||||
throw new \RuntimeException('stream_set_blocking failed');
|
||||
}
|
||||
}
|
||||
|
||||
if (is_resource($this->write)) {
|
||||
$set = stream_set_blocking($this->write, $block);
|
||||
if (!$set) {
|
||||
throw new \RuntimeException('stream_set_blocking failed');
|
||||
}
|
||||
}
|
||||
|
||||
$this->block = $block;
|
||||
}
|
||||
|
||||
/**
|
||||
* if the stream is blocking, you would better set the value of size,
|
||||
* it will not return until the data size is equal to the value of param size
|
||||
*
|
||||
* @param int $size
|
||||
* @return string
|
||||
*/
|
||||
public function read($size = 1024)
|
||||
{
|
||||
if (!is_resource($this->read)) {
|
||||
$this->read = fopen($this->filename, 'r+');
|
||||
if (!is_resource($this->read)) {
|
||||
throw new \RuntimeException('open file failed');
|
||||
}
|
||||
if (!$this->block) {
|
||||
$set = stream_set_blocking($this->read, false);
|
||||
if (!$set) {
|
||||
throw new \RuntimeException('stream_set_blocking failed');
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return fread($this->read, $size);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param $message
|
||||
* @return int
|
||||
*/
|
||||
public function write($message)
|
||||
{
|
||||
if (!is_resource($this->write)) {
|
||||
$this->write = fopen($this->filename, 'w+');
|
||||
if (!is_resource($this->write)) {
|
||||
throw new \RuntimeException('open file failed');
|
||||
}
|
||||
if (!$this->block) {
|
||||
$set = stream_set_blocking($this->write, false);
|
||||
if (!$set) {
|
||||
throw new \RuntimeException('stream_set_blocking failed');
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return fwrite($this->write, $message);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public function __destruct()
|
||||
{
|
||||
$this->close();
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public function close()
|
||||
{
|
||||
if (is_resource($this->read)) {
|
||||
fclose($this->read);
|
||||
}
|
||||
if (is_resource($this->write)) {
|
||||
fclose($this->write);
|
||||
}
|
||||
}
|
||||
|
||||
public function remove()
|
||||
{
|
||||
return unlink($this->filename);
|
||||
}
|
||||
}
|
||||
104
addons/crontab/library/SimpleFork/Queue/PipeQueue.php
Normal file
104
addons/crontab/library/SimpleFork/Queue/PipeQueue.php
Normal file
@@ -0,0 +1,104 @@
|
||||
<?php
|
||||
/**
|
||||
* @author Jenner <hypxm@qq.com>
|
||||
* @blog http://www.huyanping.cn
|
||||
* @license https://opensource.org/licenses/MIT MIT
|
||||
* @datetime: 2015/11/24 18:38
|
||||
*/
|
||||
|
||||
namespace Jenner\SimpleFork\Queue;
|
||||
|
||||
|
||||
class PipeQueue implements QueueInterface
|
||||
{
|
||||
/**
|
||||
* @var Pipe
|
||||
*/
|
||||
protected $pipe;
|
||||
|
||||
/**
|
||||
* @var bool
|
||||
*/
|
||||
protected $block;
|
||||
|
||||
/**
|
||||
* @param string $filename fifo filename
|
||||
* @param int $mode
|
||||
* @param bool $block if blocking
|
||||
*/
|
||||
public function __construct($filename = '/tmp/simple-fork.pipe', $mode = 0666)
|
||||
{
|
||||
$this->pipe = new Pipe($filename, $mode);
|
||||
$this->block = false;
|
||||
$this->pipe->setBlock($this->block);
|
||||
}
|
||||
|
||||
/**
|
||||
* put value into the queue of channel
|
||||
*
|
||||
* @param $value
|
||||
* @return bool
|
||||
*/
|
||||
public function put($value)
|
||||
{
|
||||
$len = strlen($value);
|
||||
if ($len > 2147483647) {
|
||||
throw new \RuntimeException('value is too long');
|
||||
}
|
||||
$raw = pack('N', $len) . $value;
|
||||
$write_len = $this->pipe->write($raw);
|
||||
|
||||
return $write_len == strlen($raw);
|
||||
}
|
||||
|
||||
/**
|
||||
* get value from the queue of channel
|
||||
*
|
||||
* @param bool $block if block when the queue is empty
|
||||
* @return bool|string
|
||||
*/
|
||||
public function get($block = false)
|
||||
{
|
||||
if ($this->block != $block) {
|
||||
$this->pipe->setBlock($block);
|
||||
$this->block = $block;
|
||||
}
|
||||
$len = $this->pipe->read(4);
|
||||
if ($len === false) {
|
||||
throw new \RuntimeException('read pipe failed');
|
||||
}
|
||||
|
||||
if (strlen($len) === 0) {
|
||||
return null;
|
||||
}
|
||||
$len = unpack('N', $len);
|
||||
if (empty($len) || !array_key_exists(1, $len) || empty($len[1])) {
|
||||
throw new \RuntimeException('data protocol error');
|
||||
}
|
||||
$len = intval($len[1]);
|
||||
|
||||
$value = '';
|
||||
while (true) {
|
||||
$temp = $this->pipe->read($len);
|
||||
if (strlen($temp) == $len) {
|
||||
return $temp;
|
||||
}
|
||||
$value .= $temp;
|
||||
$len -= strlen($temp);
|
||||
if ($len == 0) {
|
||||
return $value;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* remove the queue resource
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function remove()
|
||||
{
|
||||
$this->pipe->close();
|
||||
$this->pipe->remove();
|
||||
}
|
||||
}
|
||||
34
addons/crontab/library/SimpleFork/Queue/QueueInterface.php
Normal file
34
addons/crontab/library/SimpleFork/Queue/QueueInterface.php
Normal file
@@ -0,0 +1,34 @@
|
||||
<?php
|
||||
/**
|
||||
* Created by PhpStorm.
|
||||
* User: Jenner
|
||||
* Date: 2015/8/12
|
||||
* Time: 15:11
|
||||
*/
|
||||
|
||||
namespace Jenner\SimpleFork\Queue;
|
||||
|
||||
|
||||
/**
|
||||
* queue for processes to transfer data
|
||||
*
|
||||
* @package Jenner\SimpleFork\Queue
|
||||
*/
|
||||
interface QueueInterface
|
||||
{
|
||||
/**
|
||||
* put value into the queue of channel
|
||||
*
|
||||
* @param $value
|
||||
* @return bool
|
||||
*/
|
||||
public function put($value);
|
||||
|
||||
/**
|
||||
* get value from the queue of channel
|
||||
*
|
||||
* @param bool $block if block when the queue is empty
|
||||
* @return bool|string
|
||||
*/
|
||||
public function get($block = false);
|
||||
}
|
||||
144
addons/crontab/library/SimpleFork/Queue/RedisQueue.php
Normal file
144
addons/crontab/library/SimpleFork/Queue/RedisQueue.php
Normal file
@@ -0,0 +1,144 @@
|
||||
<?php
|
||||
/**
|
||||
* Created by PhpStorm.
|
||||
* User: Jenner
|
||||
* Date: 2015/8/20
|
||||
* Time: 15:03
|
||||
*/
|
||||
|
||||
namespace Jenner\SimpleFork\Queue;
|
||||
|
||||
/**
|
||||
* redis queue
|
||||
*
|
||||
* @package Jenner\SimpleFork\Queue
|
||||
*/
|
||||
class RedisQueue implements QueueInterface
|
||||
{
|
||||
/**
|
||||
* @var \Redis
|
||||
*/
|
||||
protected $redis;
|
||||
|
||||
/**
|
||||
* @var string redis key of queue
|
||||
*/
|
||||
protected $channel;
|
||||
|
||||
/**
|
||||
* @param string $host redis server host
|
||||
* @param int $port redis server port
|
||||
* @param int $database redis server database num
|
||||
* @param string $channel redis queue key
|
||||
* @param string $prefix prefix of redis queue key
|
||||
*/
|
||||
public function __construct(
|
||||
$host = '127.0.0.1',
|
||||
$port = 6379,
|
||||
$database = 0,
|
||||
$channel = 'cache',
|
||||
$prefix = 'simple-fork-'
|
||||
)
|
||||
{
|
||||
$this->redis = new \Redis();
|
||||
$connection_result = $this->redis->connect($host, $port);
|
||||
if (!$connection_result) {
|
||||
throw new \RuntimeException('can not connect to the redis server');
|
||||
}
|
||||
|
||||
if ($database != 0) {
|
||||
$select_result = $this->redis->select($database);
|
||||
if (!$select_result) {
|
||||
throw new \RuntimeException('can not select the database');
|
||||
}
|
||||
}
|
||||
|
||||
if (empty($channel)) {
|
||||
throw new \InvalidArgumentException('channel can not be empty');
|
||||
}
|
||||
|
||||
$this->channel = $channel;
|
||||
|
||||
if (empty($prefix)) return;
|
||||
|
||||
$set_option_result = $this->redis->setOption(\Redis::OPT_PREFIX, $prefix);
|
||||
if (!$set_option_result) {
|
||||
throw new \RuntimeException('can not set the \Redis::OPT_PREFIX Option');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* put value into the queue
|
||||
*
|
||||
* @param $value
|
||||
* @return bool
|
||||
*/
|
||||
public function put($value)
|
||||
{
|
||||
|
||||
if ($this->redis->lPush($this->channel, $value) !== false) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* get value from the queue
|
||||
*
|
||||
* @param bool $block if block when the queue is empty
|
||||
* @return bool|string
|
||||
*/
|
||||
public function get($block = false)
|
||||
{
|
||||
if (!$block) {
|
||||
return $this->redis->rPop($this->channel);
|
||||
} else {
|
||||
while (true) {
|
||||
$record = $this->redis->rPop($this->channel);
|
||||
if ($record === false) {
|
||||
usleep(1000);
|
||||
continue;
|
||||
}
|
||||
|
||||
return $record;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* get the size of the queue
|
||||
*
|
||||
* @return int
|
||||
*/
|
||||
public function size()
|
||||
{
|
||||
return $this->redis->lSize($this->channel);
|
||||
}
|
||||
|
||||
/**
|
||||
* remove the queue resource
|
||||
*
|
||||
* @return mixed
|
||||
*/
|
||||
public function remove()
|
||||
{
|
||||
return $this->redis->delete($this->channel);
|
||||
}
|
||||
|
||||
/**
|
||||
* close the connection
|
||||
*/
|
||||
public function __destruct()
|
||||
{
|
||||
$this->close();
|
||||
}
|
||||
|
||||
/**
|
||||
* close the connection
|
||||
*/
|
||||
public function close()
|
||||
{
|
||||
$this->redis->close();
|
||||
}
|
||||
}
|
||||
293
addons/crontab/library/SimpleFork/Queue/SystemVMessageQueue.php
Normal file
293
addons/crontab/library/SimpleFork/Queue/SystemVMessageQueue.php
Normal file
@@ -0,0 +1,293 @@
|
||||
<?php
|
||||
/**
|
||||
* Created by PhpStorm.
|
||||
* User: Jenner
|
||||
* Date: 2015/8/12
|
||||
* Time: 15:15
|
||||
*/
|
||||
|
||||
namespace Jenner\SimpleFork\Queue;
|
||||
|
||||
|
||||
/**
|
||||
* system v message queue
|
||||
*
|
||||
* @package Jenner\SimpleFork\Queue
|
||||
*/
|
||||
class SystemVMessageQueue implements QueueInterface
|
||||
{
|
||||
/**
|
||||
* @var int channel
|
||||
*/
|
||||
protected $msg_type;
|
||||
|
||||
/**
|
||||
* @var
|
||||
*/
|
||||
protected $queue;
|
||||
|
||||
/**
|
||||
* @var bool
|
||||
*/
|
||||
protected $serialize_needed;
|
||||
|
||||
/**
|
||||
* @var bool
|
||||
*/
|
||||
protected $block_send;
|
||||
|
||||
/**
|
||||
* @var int
|
||||
*/
|
||||
protected $option_receive;
|
||||
|
||||
/**
|
||||
* @var int
|
||||
*/
|
||||
protected $maxsize;
|
||||
|
||||
/**
|
||||
* @var
|
||||
*/
|
||||
protected $key_t;
|
||||
|
||||
/**
|
||||
* @var string
|
||||
*/
|
||||
protected $ipc_filename;
|
||||
|
||||
/**
|
||||
* @param string $ipc_filename ipc file to make ipc key.
|
||||
* if it does not exists, it will try to create the file.
|
||||
* @param int $channel message type
|
||||
* @param bool $serialize_needed serialize or not
|
||||
* @param bool $block_send if block when the queue is full
|
||||
* @param int $option_receive if the value is MSG_IPC_NOWAIT it will not
|
||||
* going to wait a message coming. if the value is null,
|
||||
* it will block and wait a message
|
||||
* @param int $maxsize the max size of queue
|
||||
*/
|
||||
public function __construct(
|
||||
$ipc_filename = __FILE__,
|
||||
$channel = 1,
|
||||
$serialize_needed = true,
|
||||
$block_send = true,
|
||||
$option_receive = MSG_IPC_NOWAIT,
|
||||
$maxsize = 100000
|
||||
)
|
||||
{
|
||||
$this->ipc_filename = $ipc_filename;
|
||||
$this->msg_type = $channel;
|
||||
$this->serialize_needed = $serialize_needed;
|
||||
$this->block_send = $block_send;
|
||||
$this->option_receive = $option_receive;
|
||||
$this->maxsize = $maxsize;
|
||||
$this->initQueue($ipc_filename, $channel);
|
||||
}
|
||||
|
||||
/**
|
||||
* init queue
|
||||
*
|
||||
* @param $ipc_filename
|
||||
* @param $msg_type
|
||||
* @throws \Exception
|
||||
*/
|
||||
protected function initQueue($ipc_filename, $msg_type)
|
||||
{
|
||||
$this->key_t = $this->getIpcKey($ipc_filename, $msg_type);
|
||||
$this->queue = \msg_get_queue($this->key_t);
|
||||
if (!$this->queue) throw new \RuntimeException('msg_get_queue failed');
|
||||
}
|
||||
|
||||
/**
|
||||
* @param $ipc_filename
|
||||
* @param $msg_type
|
||||
* @throws \Exception
|
||||
* @return int
|
||||
*/
|
||||
public function getIpcKey($ipc_filename, $msg_type)
|
||||
{
|
||||
if (!file_exists($ipc_filename)) {
|
||||
$create_file = touch($ipc_filename);
|
||||
if ($create_file === false) {
|
||||
throw new \RuntimeException('ipc_file is not exists and create failed');
|
||||
}
|
||||
}
|
||||
|
||||
$key_t = \ftok($ipc_filename, $msg_type);
|
||||
if ($key_t == 0) throw new \RuntimeException('ftok error');
|
||||
|
||||
return $key_t;
|
||||
}
|
||||
|
||||
/**
|
||||
* get message
|
||||
*
|
||||
* @param bool $block if block when the queue is empty
|
||||
* @return bool|string
|
||||
*/
|
||||
public function get($block = false)
|
||||
{
|
||||
$queue_status = $this->status();
|
||||
if ($queue_status['msg_qnum'] > 0) {
|
||||
$option_receive = $block ? 0 : $this->option_receive;
|
||||
if (\msg_receive(
|
||||
$this->queue,
|
||||
$this->msg_type,
|
||||
$msgtype_erhalten,
|
||||
$this->maxsize,
|
||||
$data,
|
||||
$this->serialize_needed,
|
||||
$option_receive,
|
||||
$err
|
||||
) === true
|
||||
) {
|
||||
return $data;
|
||||
} else {
|
||||
throw new \RuntimeException($err);
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public function status()
|
||||
{
|
||||
$queue_status = \msg_stat_queue($this->queue);
|
||||
return $queue_status;
|
||||
}
|
||||
|
||||
/*
|
||||
* return array's keys
|
||||
* msg_perm.uid The uid of the owner of the queue.
|
||||
* msg_perm.gid The gid of the owner of the queue.
|
||||
* msg_perm.mode The file access mode of the queue.
|
||||
* msg_stime The time that the last message was sent to the queue.
|
||||
* msg_rtime The time that the last message was received from the queue.
|
||||
* msg_ctime The time that the queue was last changed.
|
||||
* msg_qnum The number of messages waiting to be read from the queue.
|
||||
* msg_qbytes The maximum number of bytes allowed in one message queue.
|
||||
* On Linux, this value may be read and modified via /proc/sys/kernel/msgmnb.
|
||||
* msg_lspid The pid of the process that sent the last message to the queue.
|
||||
* msg_lrpid The pid of the process that received the last message from the queue.
|
||||
*
|
||||
* @return array
|
||||
*/
|
||||
|
||||
/**
|
||||
* put message
|
||||
*
|
||||
* @param $message
|
||||
* @return bool
|
||||
* @throws \Exception
|
||||
*/
|
||||
public function put($message)
|
||||
{
|
||||
if (!\msg_send($this->queue, $this->msg_type, $message, $this->serialize_needed, $this->block_send, $err) === true) {
|
||||
throw new \RuntimeException($err);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* get the size of queue
|
||||
*
|
||||
* @return mixed
|
||||
*/
|
||||
public function size()
|
||||
{
|
||||
$status = $this->status();
|
||||
|
||||
return $status['msg_qnum'];
|
||||
}
|
||||
|
||||
/**
|
||||
* allows you to change the values of the msg_perm.uid,
|
||||
* msg_perm.gid, msg_perm.mode and msg_qbytes fields of the underlying message queue data structure
|
||||
*
|
||||
* @param string $key status key
|
||||
* @param int $value status value
|
||||
* @return bool
|
||||
*/
|
||||
public function setStatus($key, $value)
|
||||
{
|
||||
$this->checkSetPrivilege($key);
|
||||
if ($key == 'msg_qbytes')
|
||||
return $this->setMaxQueueSize($value);
|
||||
$queue_status[$key] = $value;
|
||||
|
||||
return \msg_set_queue($this->queue, $queue_status);
|
||||
}
|
||||
|
||||
/**
|
||||
* check the privilege of update the queue's status
|
||||
*
|
||||
* @param $key
|
||||
* @throws \Exception
|
||||
*/
|
||||
private function checkSetPrivilege($key)
|
||||
{
|
||||
$privilege_field = array('msg_perm.uid', 'msg_perm.gid', 'msg_perm.mode');
|
||||
if (!\in_array($key, $privilege_field)) {
|
||||
$message = 'you can only change msg_perm.uid, msg_perm.gid, ' .
|
||||
' msg_perm.mode and msg_qbytes. And msg_qbytes needs root privileges';
|
||||
|
||||
throw new \RuntimeException($message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* update the max size of queue
|
||||
* need root
|
||||
*
|
||||
* @param $size
|
||||
* @throws \Exception
|
||||
* @return bool
|
||||
*/
|
||||
public function setMaxQueueSize($size)
|
||||
{
|
||||
$user = \get_current_user();
|
||||
if ($user !== 'root')
|
||||
throw new \Exception('changing msg_qbytes needs root privileges');
|
||||
|
||||
return $this->setStatus('msg_qbytes', $size);
|
||||
}
|
||||
|
||||
/**
|
||||
* remove queue
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function remove()
|
||||
{
|
||||
return \msg_remove_queue($this->queue);
|
||||
}
|
||||
|
||||
/**
|
||||
* check if the queue is exists or not
|
||||
*
|
||||
* @param $key
|
||||
* @return bool
|
||||
*/
|
||||
public function queueExists($key)
|
||||
{
|
||||
return \msg_queue_exists($key);
|
||||
}
|
||||
|
||||
/**
|
||||
* init when wakeup
|
||||
*/
|
||||
public function __wakeup()
|
||||
{
|
||||
$this->initQueue($this->ipc_filename, $this->msg_type);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public function __destruct()
|
||||
{
|
||||
unset($this);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user