@@ -22,5 +22,5 @@ return [
|
||||
'process:gift_queue' => 'app\common\command\ProcessGiftQueue',
|
||||
'queue:monitor' => 'app\common\command\QueueMonitor',
|
||||
'migrate:money_log' => 'app\common\command\MigrateMoneyLog',
|
||||
'emergency:gift_queue' => 'app\common\command\EmergencyGiftQueue',
|
||||
|
||||
];
|
||||
|
||||
@@ -1,94 +0,0 @@
|
||||
<?php
|
||||
// application/common/command/EmergencyGiftQueue.php
|
||||
|
||||
namespace app\common\command;
|
||||
|
||||
use think\console\Command;
|
||||
use think\console\Input;
|
||||
use think\console\Output;
|
||||
use think\console\input\Option;
|
||||
use think\Log;
|
||||
use app\common\library\GiftQueue;
|
||||
|
||||
class EmergencyGiftQueue extends Command
|
||||
{
|
||||
protected function configure()
|
||||
{
|
||||
$this->setName('emergency:gift_queue')
|
||||
->setDescription('紧急处理积压送礼队列')
|
||||
->addOption('limit', 'l', Option::VALUE_OPTIONAL, '最大处理数量', 10000)
|
||||
->addOption('batch-size', 'b', Option::VALUE_OPTIONAL, '每批处理数量', 500)
|
||||
->addOption('concurrent', 'c', Option::VALUE_OPTIONAL, '并发进程数', 1);
|
||||
}
|
||||
|
||||
protected function execute(Input $input, Output $output)
|
||||
{
|
||||
$limit = $input->getOption('limit');
|
||||
$batchSize = $input->getOption('batch-size');
|
||||
$concurrent = $input->getOption('concurrent');
|
||||
|
||||
$output->writeln("<info>开始紧急处理积压送礼队列...</info>");
|
||||
$output->writeln("最大处理: {$limit}, 批量大小: {$batchSize}, 并发数: {$concurrent}");
|
||||
|
||||
if ($concurrent > 1) {
|
||||
// 多进程处理
|
||||
$this->multiProcess($output, $limit, $batchSize, $concurrent);
|
||||
} else {
|
||||
// 单进程处理
|
||||
$result = GiftQueue::emergencyProcess($limit, $batchSize);
|
||||
$this->outputResult($output, $result);
|
||||
}
|
||||
}
|
||||
|
||||
protected function multiProcess($output, $limit, $batchSize, $concurrent)
|
||||
{
|
||||
$perProcess = ceil($limit / $concurrent);
|
||||
|
||||
$output->writeln("每个进程处理: {$perProcess}条");
|
||||
|
||||
$pids = [];
|
||||
|
||||
for ($i = 0; $i < $concurrent; $i++) {
|
||||
$pid = pcntl_fork();
|
||||
|
||||
if ($pid == -1) {
|
||||
$output->writeln("<error>创建子进程失败</error>");
|
||||
exit(1);
|
||||
} elseif ($pid == 0) {
|
||||
// 子进程
|
||||
$result = GiftQueue::emergencyProcess($perProcess, $batchSize);
|
||||
$output->writeln("子进程 {$i} 完成: " . json_encode($result));
|
||||
exit(0);
|
||||
} else {
|
||||
// 父进程
|
||||
$pids[] = $pid;
|
||||
$output->writeln("启动子进程 {$i}, PID: {$pid}");
|
||||
}
|
||||
}
|
||||
|
||||
// 等待所有子进程完成
|
||||
foreach ($pids as $pid) {
|
||||
pcntl_waitpid($pid, $status);
|
||||
$output->writeln("子进程 {$pid} 已完成");
|
||||
}
|
||||
|
||||
// 输出最终统计
|
||||
$finalSize = GiftQueue::size();
|
||||
$output->writeln("<info>所有进程处理完成,剩余队列大小: {$finalSize}</info>");
|
||||
}
|
||||
|
||||
protected function outputResult($output, $result)
|
||||
{
|
||||
$output->writeln("处理结果:");
|
||||
$output->writeln("总计处理: {$result['total_processed']}");
|
||||
$output->writeln("成功: {$result['total_success']}");
|
||||
$output->writeln("失败: {$result['total_failed']}");
|
||||
$output->writeln("剩余队列大小: {$result['remaining']}");
|
||||
|
||||
if ($result['remaining'] > 0) {
|
||||
$output->writeln("<comment>队列仍有积压,建议增加并发处理</comment>");
|
||||
} else {
|
||||
$output->writeln("<info>队列积压已清理完成</info>");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -16,26 +16,6 @@ class GiftQueue
|
||||
const QUEUE_FAILED_KEY = 'gift:queue:failed';
|
||||
const MAX_RETRY = 3;
|
||||
|
||||
|
||||
// 单例Redis连接
|
||||
private static $redis = null;
|
||||
|
||||
/**
|
||||
* 获取Redis实例(单例)
|
||||
* @return \Redis
|
||||
*/
|
||||
protected static function getRedis()
|
||||
{
|
||||
if (self::$redis === null) {
|
||||
$config = config('cache');
|
||||
$redisDriver = new Redis($config);
|
||||
self::$redis = $redisDriver->handler();
|
||||
}
|
||||
return self::$redis;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 添加送礼记录到队列
|
||||
* @param array $giftData 送礼数据
|
||||
@@ -102,11 +82,8 @@ class GiftQueue
|
||||
];
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 批量处理队列(优化版)
|
||||
* 处理队列
|
||||
* @param int $batchSize 每次处理数量
|
||||
* @return array 处理结果
|
||||
*/
|
||||
@@ -118,33 +95,22 @@ class GiftQueue
|
||||
|
||||
$model = new GiveGiftBase();
|
||||
$gift_model = new GiveGift();
|
||||
$redis = self::getRedis();
|
||||
|
||||
Log::info("开始批量处理送礼队列,批量大小=: {$batchSize}");
|
||||
Log::info("开始处理送礼队列,批量大小: {$batchSize}");
|
||||
|
||||
// 批量获取数据
|
||||
$items = self::batchPop($redis, $batchSize);
|
||||
for ($i = 0; $i < $batchSize; $i++) {
|
||||
try {
|
||||
// 从队列取数据
|
||||
// $data = Cache::handler()->rpop(self::QUEUE_KEY);
|
||||
$redis = new Redis(config('cache'));
|
||||
$data = $redis->handler()->rpop(self::QUEUE_KEY);
|
||||
if (!$data) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (empty($items)) {
|
||||
return [
|
||||
'processed' => 0,
|
||||
'success' => 0,
|
||||
'failed' => 0
|
||||
];
|
||||
}
|
||||
$processed++;
|
||||
|
||||
$processed = count($items);
|
||||
|
||||
try {
|
||||
// 开启事务批量处理
|
||||
Db::startTrans();
|
||||
|
||||
$successIds = [];
|
||||
$failedItems = [];
|
||||
|
||||
foreach ($items as $index => $data) {
|
||||
$queueData = json_decode($data, true);
|
||||
|
||||
if (!$queueData || !isset($queueData['data'])) {
|
||||
$failed++;
|
||||
Log::error('队列数据格式错误: ' . $data);
|
||||
@@ -152,61 +118,34 @@ class GiftQueue
|
||||
}
|
||||
|
||||
$giftData = $queueData['data'];
|
||||
$uuid = $queueData['uuid'] ?? 'unknown_' . $index;
|
||||
$uuid = $queueData['uuid'] ?? 'unknown';
|
||||
|
||||
try {
|
||||
// 插入数据库
|
||||
$result = $model->addGiftRecord($giftData);
|
||||
Log::info("处理送礼记录: {$uuid}, 用户: {$giftData['user_id']}");
|
||||
|
||||
if ($result) {
|
||||
$successIds[$result] = $giftData;
|
||||
$success++;
|
||||
} else {
|
||||
$failed++;
|
||||
$failedItems[] = [
|
||||
'data' => $data,
|
||||
'reason' => $model->getError(),
|
||||
'retry_count' => $queueData['retry_count'] ?? 0
|
||||
];
|
||||
}
|
||||
// 验证数据完整性
|
||||
if (empty($giftData['createtime'])) {
|
||||
$giftData['createtime'] = time();
|
||||
}
|
||||
|
||||
} catch (\Exception $e) {
|
||||
// 插入数据库
|
||||
$result = $model->addGiftRecord($giftData);
|
||||
|
||||
if ($result) {
|
||||
$success++;
|
||||
$gift_model->change_user_give_gift_log_callback($result, $giftData);
|
||||
Log::info("送礼记录处理成功: {$uuid}, ID: {$result}");
|
||||
} else {
|
||||
$failed++;
|
||||
$failedItems[] = [
|
||||
'data' => $data,
|
||||
'reason' => $e->getMessage(),
|
||||
'retry_count' => $queueData['retry_count'] ?? 0
|
||||
];
|
||||
Log::error("送礼记录处理异常 {$uuid}: " . $e->getMessage());
|
||||
Log::error("送礼记录处理失败: {$uuid}, 错误: " . $model->getError());
|
||||
|
||||
// 重试逻辑
|
||||
self::retry($data);
|
||||
}
|
||||
}
|
||||
|
||||
// 提交事务
|
||||
Db::commit();
|
||||
|
||||
// 批量执行回调(在事务外)
|
||||
foreach ($successIds as $id => $giftData) {
|
||||
try {
|
||||
$gift_model->change_user_give_gift_log_callback($id, $giftData);
|
||||
Log::info("送礼回调成功: ID: {$id}");
|
||||
} catch (\Exception $e) {
|
||||
Log::error("送礼回调失败 ID {$id}: " . $e->getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
// 处理失败的项目
|
||||
foreach ($failedItems as $item) {
|
||||
self::retry($item['data'], $item['retry_count'] + 1, $item['reason']);
|
||||
}
|
||||
|
||||
} catch (\Exception $e) {
|
||||
Db::rollback();
|
||||
$failed = $processed;
|
||||
Log::error('批量处理送礼队列失败:' . $e->getMessage());
|
||||
|
||||
// 失败的数据重新放回队列
|
||||
foreach ($items as $data) {
|
||||
$redis->lpush(self::QUEUE_KEY, $data);
|
||||
} catch (\Exception $e) {
|
||||
$failed++;
|
||||
db::name('redis_error')->insert( ['related_id' => 0,'content' => $e->getMessage(),'remark' =>$e->getMessage()]);
|
||||
Log::error('处理送礼队列失败:' . $e->getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -216,241 +155,42 @@ class GiftQueue
|
||||
'failed' => $failed
|
||||
];
|
||||
|
||||
Log::info("送礼队列批量处理完成: " . json_encode($result));
|
||||
Log::info("送礼队列处理完成: " . json_encode($result));
|
||||
|
||||
return $result;
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量从队列弹出数据
|
||||
* @param \Redis $redis
|
||||
* @param int $batchSize
|
||||
* @return array
|
||||
* 重试机制
|
||||
* @param string $data 队列数据
|
||||
*/
|
||||
protected static function batchPop($redis, $batchSize)
|
||||
{
|
||||
$items = [];
|
||||
|
||||
// 使用pipeline提高效率
|
||||
$pipe = $redis->multi(\Redis::PIPELINE);
|
||||
|
||||
for ($i = 0; $i < $batchSize; $i++) {
|
||||
$pipe->rpop(self::QUEUE_KEY);
|
||||
}
|
||||
|
||||
$results = $pipe->exec();
|
||||
|
||||
foreach ($results as $data) {
|
||||
if ($data !== false && $data !== null) {
|
||||
$items[] = $data;
|
||||
}
|
||||
}
|
||||
|
||||
return $items;
|
||||
}
|
||||
|
||||
/**
|
||||
* 优化重试机制
|
||||
*/
|
||||
protected static function retry($data, $retryCount, $reason = '')
|
||||
protected static function retry($data)
|
||||
{
|
||||
$queueData = json_decode($data, true);
|
||||
if (!$queueData) {
|
||||
return;
|
||||
}
|
||||
|
||||
$queueData['retry_count'] = $retryCount;
|
||||
$queueData['last_error'] = $reason;
|
||||
$queueData['retry_count'] = ($queueData['retry_count'] ?? 0) + 1;
|
||||
|
||||
$redis = self::getRedis();
|
||||
|
||||
if ($retryCount <= self::MAX_RETRY) {
|
||||
// 重新加入队列(延迟队列)
|
||||
if ($queueData['retry_count'] <= self::MAX_RETRY) {
|
||||
// 重新加入队列
|
||||
$newData = json_encode($queueData, JSON_UNESCAPED_UNICODE);
|
||||
|
||||
// 使用延迟队列:重试次数越多,延迟越长
|
||||
$delay = $retryCount * 5; // 5, 10, 15秒延迟
|
||||
$redis->lpush(self::QUEUE_KEY, $newData);
|
||||
|
||||
Log::info("重试送礼记录: {$queueData['uuid']}, 重试次数: {$retryCount}, 延迟: {$delay}s");
|
||||
// Cache::handler()->lpush(self::QUEUE_KEY, $newData);
|
||||
$redis = new Redis(config('cache'));
|
||||
$redis->handler()->lpush(self::QUEUE_KEY, $newData);
|
||||
Log::info("重试送礼记录: {$queueData['uuid']}, 重试次数: {$queueData['retry_count']}");
|
||||
} else {
|
||||
// 记录到失败队列
|
||||
$queueData['fail_time'] = time();
|
||||
$queueData['fail_reason'] = $reason;
|
||||
$failedData = json_encode($queueData, JSON_UNESCAPED_UNICODE);
|
||||
|
||||
$redis->lpush(self::QUEUE_FAILED_KEY, $failedData);
|
||||
|
||||
Log::error("送礼记录重试超过最大次数: {$queueData['uuid']}, 原因: {$reason}");
|
||||
|
||||
// 同时记录到数据库以便人工处理
|
||||
Db::name('gift_queue_failed')->insert([
|
||||
'uuid' => $queueData['uuid'],
|
||||
'data' => json_encode($queueData['data'], JSON_UNESCAPED_UNICODE),
|
||||
'retry_count' => $retryCount,
|
||||
'reason' => $reason,
|
||||
'create_time' => time()
|
||||
]);
|
||||
// Cache::handler()->lpush(self::QUEUE_FAILED_KEY, $failedData);
|
||||
$redis = new Redis(config('cache'));
|
||||
$redis->handler()->lpush(self::QUEUE_FAILED_KEY, $failedData);
|
||||
Log::error("送礼记录重试超过最大次数: {$queueData['uuid']}, 数据: " . json_encode($queueData['data']));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 紧急批量处理接口
|
||||
* @param int $limit 最大处理数量
|
||||
* @param int $batchSize 每批大小
|
||||
* @return array
|
||||
*/
|
||||
public static function emergencyProcess($limit = 10000, $batchSize = 500)
|
||||
{
|
||||
$totalProcessed = 0;
|
||||
$totalSuccess = 0;
|
||||
$totalFailed = 0;
|
||||
|
||||
Log::warning("开始紧急处理送礼队列,限制: {$limit}, 批量大小: {$batchSize}");
|
||||
|
||||
$redis = self::getRedis();
|
||||
$queueSize = $redis->llen(self::QUEUE_KEY);
|
||||
|
||||
$batches = min(ceil($limit / $batchSize), ceil($queueSize / $batchSize));
|
||||
|
||||
for ($i = 0; $i < $batches; $i++) {
|
||||
$result = self::process($batchSize);
|
||||
|
||||
$totalProcessed += $result['processed'];
|
||||
$totalSuccess += $result['success'];
|
||||
$totalFailed += $result['failed'];
|
||||
|
||||
Log::info("紧急处理批次 {$i}: " . json_encode($result));
|
||||
|
||||
// 每处理5批休息一下,避免CPU过高
|
||||
if ($i % 5 == 0 && $i > 0) {
|
||||
usleep(100000); // 100ms
|
||||
}
|
||||
|
||||
// 检查是否达到限制
|
||||
if ($totalProcessed >= $limit) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return [
|
||||
'total_processed' => $totalProcessed,
|
||||
'total_success' => $totalSuccess,
|
||||
'total_failed' => $totalFailed,
|
||||
'remaining' => $redis->llen(self::QUEUE_KEY)
|
||||
];
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 处理队列
|
||||
* @param int $batchSize 每次处理数量
|
||||
* @return array 处理结果
|
||||
*/
|
||||
// public static function process($batchSize = 100)
|
||||
// {
|
||||
// $processed = 0;
|
||||
// $success = 0;
|
||||
// $failed = 0;
|
||||
//
|
||||
// $model = new GiveGiftBase();
|
||||
// $gift_model = new GiveGift();
|
||||
//
|
||||
// Log::info("开始处理送礼队列,批量大小: {$batchSize}");
|
||||
//
|
||||
// for ($i = 0; $i < $batchSize; $i++) {
|
||||
// try {
|
||||
// // 从队列取数据
|
||||
//// $data = Cache::handler()->rpop(self::QUEUE_KEY);
|
||||
// $redis = new Redis(config('cache'));
|
||||
// $data = $redis->handler()->rpop(self::QUEUE_KEY);
|
||||
// if (!$data) {
|
||||
// break;
|
||||
// }
|
||||
//
|
||||
// $processed++;
|
||||
//
|
||||
// $queueData = json_decode($data, true);
|
||||
// if (!$queueData || !isset($queueData['data'])) {
|
||||
// $failed++;
|
||||
// Log::error('队列数据格式错误: ' . $data);
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// $giftData = $queueData['data'];
|
||||
// $uuid = $queueData['uuid'] ?? 'unknown';
|
||||
//
|
||||
// Log::info("处理送礼记录: {$uuid}, 用户: {$giftData['user_id']}");
|
||||
//
|
||||
// // 验证数据完整性
|
||||
// if (empty($giftData['createtime'])) {
|
||||
// $giftData['createtime'] = time();
|
||||
// }
|
||||
//
|
||||
// // 插入数据库
|
||||
// $result = $model->addGiftRecord($giftData);
|
||||
//
|
||||
// if ($result) {
|
||||
// $success++;
|
||||
// $gift_model->change_user_give_gift_log_callback($result, $giftData);
|
||||
// Log::info("送礼记录处理成功: {$uuid}, ID: {$result}");
|
||||
// } else {
|
||||
// $failed++;
|
||||
// Log::error("送礼记录处理失败: {$uuid}, 错误: " . $model->getError());
|
||||
//
|
||||
// // 重试逻辑
|
||||
// self::retry($data);
|
||||
// }
|
||||
//
|
||||
// } catch (\Exception $e) {
|
||||
// $failed++;
|
||||
// db::name('redis_error')->insert( ['related_id' => 0,'content' => $e->getMessage(),'remark' =>$e->getMessage()]);
|
||||
// Log::error('处理送礼队列失败:' . $e->getMessage());
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// $result = [
|
||||
// 'processed' => $processed,
|
||||
// 'success' => $success,
|
||||
// 'failed' => $failed
|
||||
// ];
|
||||
//
|
||||
// Log::info("送礼队列处理完成: " . json_encode($result));
|
||||
//
|
||||
// return $result;
|
||||
// }
|
||||
|
||||
/**
|
||||
* 重试机制
|
||||
* @param string $data 队列数据
|
||||
*/
|
||||
// protected static function retry($data)
|
||||
// {
|
||||
// $queueData = json_decode($data, true);
|
||||
// if (!$queueData) {
|
||||
// return;
|
||||
// }
|
||||
//
|
||||
// $queueData['retry_count'] = ($queueData['retry_count'] ?? 0) + 1;
|
||||
//
|
||||
// if ($queueData['retry_count'] <= self::MAX_RETRY) {
|
||||
// // 重新加入队列
|
||||
// $newData = json_encode($queueData, JSON_UNESCAPED_UNICODE);
|
||||
//// Cache::handler()->lpush(self::QUEUE_KEY, $newData);
|
||||
// $redis = new Redis(config('cache'));
|
||||
// $redis->handler()->lpush(self::QUEUE_KEY, $newData);
|
||||
// Log::info("重试送礼记录: {$queueData['uuid']}, 重试次数: {$queueData['retry_count']}");
|
||||
// } else {
|
||||
// // 记录到失败队列
|
||||
// $queueData['fail_time'] = time();
|
||||
// $failedData = json_encode($queueData, JSON_UNESCAPED_UNICODE);
|
||||
//// Cache::handler()->lpush(self::QUEUE_FAILED_KEY, $failedData);
|
||||
// $redis = new Redis(config('cache'));
|
||||
// $redis->handler()->lpush(self::QUEUE_FAILED_KEY, $failedData);
|
||||
// Log::error("送礼记录重试超过最大次数: {$queueData['uuid']}, 数据: " . json_encode($queueData['data']));
|
||||
// }
|
||||
// }
|
||||
|
||||
/**
|
||||
* 获取队列长度
|
||||
* @return int
|
||||
|
||||
Reference in New Issue
Block a user