更新
This commit is contained in:
119
vendor/topthink/think-queue/src/queue/Worker.php
vendored
Normal file
119
vendor/topthink/think-queue/src/queue/Worker.php
vendored
Normal file
@@ -0,0 +1,119 @@
|
||||
<?php
|
||||
// +----------------------------------------------------------------------
|
||||
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
|
||||
// +----------------------------------------------------------------------
|
||||
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
|
||||
// +----------------------------------------------------------------------
|
||||
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
|
||||
// +----------------------------------------------------------------------
|
||||
// | Author: yunwuxin <448901948@qq.com>
|
||||
// +----------------------------------------------------------------------
|
||||
|
||||
namespace think\queue;
|
||||
|
||||
use Exception;
|
||||
use think\Hook;
|
||||
use think\Queue;
|
||||
|
||||
class Worker
|
||||
{
|
||||
|
||||
/**
|
||||
* 执行下个任务
|
||||
* @param string $queue
|
||||
* @param int $delay
|
||||
* @param int $sleep
|
||||
* @param int $maxTries
|
||||
* @return array
|
||||
*/
|
||||
public function pop($queue = null, $delay = 0, $sleep = 3, $maxTries = 0)
|
||||
{
|
||||
|
||||
$job = $this->getNextJob($queue);
|
||||
|
||||
if (!is_null($job)) {
|
||||
Hook::listen('worker_before_process', $queue);
|
||||
return $this->process($job, $maxTries, $delay);
|
||||
}
|
||||
|
||||
Hook::listen('worker_before_sleep', $queue);
|
||||
$this->sleep($sleep);
|
||||
|
||||
return ['job' => null, 'failed' => false];
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取下个任务
|
||||
* @param string $queue
|
||||
* @return Job
|
||||
*/
|
||||
protected function getNextJob($queue)
|
||||
{
|
||||
if (is_null($queue)) {
|
||||
return Queue::pop();
|
||||
}
|
||||
|
||||
foreach (explode(',', $queue) as $queue) {
|
||||
if (!is_null($job = Queue::pop($queue))) {
|
||||
return $job;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a given job from the queue.
|
||||
* @param \think\queue\Job $job
|
||||
* @param int $maxTries
|
||||
* @param int $delay
|
||||
* @return array
|
||||
* @throws Exception
|
||||
*/
|
||||
public function process(Job $job, $maxTries = 0, $delay = 0)
|
||||
{
|
||||
if ($maxTries > 0 && $job->attempts() > $maxTries) {
|
||||
return $this->logFailedJob($job);
|
||||
}
|
||||
|
||||
try {
|
||||
$job->fire();
|
||||
|
||||
return ['job' => $job, 'failed' => false];
|
||||
} catch (Exception $e) {
|
||||
if (!$job->isDeleted()) {
|
||||
$job->release($delay);
|
||||
}
|
||||
|
||||
throw $e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Log a failed job into storage.
|
||||
* @param \Think\Queue\Job $job
|
||||
* @return array
|
||||
*/
|
||||
protected function logFailedJob(Job $job)
|
||||
{
|
||||
if (!$job->isDeleted()) {
|
||||
try {
|
||||
$job->delete();
|
||||
$job->failed();
|
||||
} finally {
|
||||
Hook::listen('queue_failed', $job);
|
||||
}
|
||||
}
|
||||
|
||||
return ['job' => $job, 'failed' => true];
|
||||
}
|
||||
|
||||
/**
|
||||
* Sleep the script for a given number of seconds.
|
||||
* @param int $seconds
|
||||
* @return void
|
||||
*/
|
||||
public function sleep($seconds)
|
||||
{
|
||||
sleep($seconds);
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user