diff --git a/application/command.php b/application/command.php
index 563fc512..9fec6828 100644
--- a/application/command.php
+++ b/application/command.php
@@ -22,5 +22,5 @@ return [
'process:gift_queue' => 'app\common\command\ProcessGiftQueue',
'queue:monitor' => 'app\common\command\QueueMonitor',
'migrate:money_log' => 'app\common\command\MigrateMoneyLog',
-
+ 'emergency:gift_queue' => 'app\common\command\EmergencyGiftQueue',
];
diff --git a/application/common/command/EmergencyGiftQueue.php b/application/common/command/EmergencyGiftQueue.php
new file mode 100644
index 00000000..34fd56e0
--- /dev/null
+++ b/application/common/command/EmergencyGiftQueue.php
@@ -0,0 +1,94 @@
+setName('emergency:gift_queue')
+ ->setDescription('紧急处理积压送礼队列')
+ ->addOption('limit', 'l', Option::VALUE_OPTIONAL, '最大处理数量', 10000)
+ ->addOption('batch-size', 'b', Option::VALUE_OPTIONAL, '每批处理数量', 500)
+ ->addOption('concurrent', 'c', Option::VALUE_OPTIONAL, '并发进程数', 1);
+ }
+
+ protected function execute(Input $input, Output $output)
+ {
+ $limit = $input->getOption('limit');
+ $batchSize = $input->getOption('batch-size');
+ $concurrent = $input->getOption('concurrent');
+
+ $output->writeln("开始紧急处理积压送礼队列...");
+ $output->writeln("最大处理: {$limit}, 批量大小: {$batchSize}, 并发数: {$concurrent}");
+
+ if ($concurrent > 1) {
+ // 多进程处理
+ $this->multiProcess($output, $limit, $batchSize, $concurrent);
+ } else {
+ // 单进程处理
+ $result = GiftQueue::emergencyProcess($limit, $batchSize);
+ $this->outputResult($output, $result);
+ }
+ }
+
+ protected function multiProcess($output, $limit, $batchSize, $concurrent)
+ {
+ $perProcess = ceil($limit / $concurrent);
+
+ $output->writeln("每个进程处理: {$perProcess}条");
+
+ $pids = [];
+
+ for ($i = 0; $i < $concurrent; $i++) {
+ $pid = pcntl_fork();
+
+ if ($pid == -1) {
+ $output->writeln("创建子进程失败");
+ exit(1);
+ } elseif ($pid == 0) {
+ // 子进程
+ $result = GiftQueue::emergencyProcess($perProcess, $batchSize);
+ $output->writeln("子进程 {$i} 完成: " . json_encode($result));
+ exit(0);
+ } else {
+ // 父进程
+ $pids[] = $pid;
+ $output->writeln("启动子进程 {$i}, PID: {$pid}");
+ }
+ }
+
+ // 等待所有子进程完成
+ foreach ($pids as $pid) {
+ pcntl_waitpid($pid, $status);
+ $output->writeln("子进程 {$pid} 已完成");
+ }
+
+ // 输出最终统计
+ $finalSize = GiftQueue::size();
+ $output->writeln("所有进程处理完成,剩余队列大小: {$finalSize}");
+ }
+
+ protected function outputResult($output, $result)
+ {
+ $output->writeln("处理结果:");
+ $output->writeln("总计处理: {$result['total_processed']}");
+ $output->writeln("成功: {$result['total_success']}");
+ $output->writeln("失败: {$result['total_failed']}");
+ $output->writeln("剩余队列大小: {$result['remaining']}");
+
+ if ($result['remaining'] > 0) {
+ $output->writeln("队列仍有积压,建议增加并发处理");
+ } else {
+ $output->writeln("队列积压已清理完成");
+ }
+ }
+}
\ No newline at end of file
diff --git a/application/common/library/GiftQueue.php b/application/common/library/GiftQueue.php
index 78f5c99c..78e51d57 100644
--- a/application/common/library/GiftQueue.php
+++ b/application/common/library/GiftQueue.php
@@ -16,6 +16,26 @@ class GiftQueue
const QUEUE_FAILED_KEY = 'gift:queue:failed';
const MAX_RETRY = 3;
+
+ // 单例Redis连接
+ private static $redis = null;
+
+ /**
+ * 获取Redis实例(单例)
+ * @return \Redis
+ */
+ protected static function getRedis()
+ {
+ if (self::$redis === null) {
+ $config = config('cache');
+ $redisDriver = new Redis($config);
+ self::$redis = $redisDriver->handler();
+ }
+ return self::$redis;
+ }
+
+
+
/**
* 添加送礼记录到队列
* @param array $giftData 送礼数据
@@ -82,8 +102,11 @@ class GiftQueue
];
}
+
+
+
/**
- * 处理队列
+ * 批量处理队列(优化版)
* @param int $batchSize 每次处理数量
* @return array 处理结果
*/
@@ -95,22 +118,33 @@ class GiftQueue
$model = new GiveGiftBase();
$gift_model = new GiveGift();
+ $redis = self::getRedis();
- Log::info("开始处理送礼队列,批量大小: {$batchSize}");
+ Log::info("开始批量处理送礼队列,批量大小=: {$batchSize}");
- for ($i = 0; $i < $batchSize; $i++) {
- try {
- // 从队列取数据
-// $data = Cache::handler()->rpop(self::QUEUE_KEY);
- $redis = new Redis(config('cache'));
- $data = $redis->handler()->rpop(self::QUEUE_KEY);
- if (!$data) {
- break;
- }
+ // 批量获取数据
+ $items = self::batchPop($redis, $batchSize);
- $processed++;
+ if (empty($items)) {
+ return [
+ 'processed' => 0,
+ 'success' => 0,
+ 'failed' => 0
+ ];
+ }
+ $processed = count($items);
+
+ try {
+ // 开启事务批量处理
+ Db::startTrans();
+
+ $successIds = [];
+ $failedItems = [];
+
+ foreach ($items as $index => $data) {
$queueData = json_decode($data, true);
+
if (!$queueData || !isset($queueData['data'])) {
$failed++;
Log::error('队列数据格式错误: ' . $data);
@@ -118,34 +152,61 @@ class GiftQueue
}
$giftData = $queueData['data'];
- $uuid = $queueData['uuid'] ?? 'unknown';
+ $uuid = $queueData['uuid'] ?? 'unknown_' . $index;
- Log::info("处理送礼记录: {$uuid}, 用户: {$giftData['user_id']}");
+ try {
+ // 插入数据库
+ $result = $model->addGiftRecord($giftData);
- // 验证数据完整性
- if (empty($giftData['createtime'])) {
- $giftData['createtime'] = time();
- }
+ if ($result) {
+ $successIds[$result] = $giftData;
+ $success++;
+ } else {
+ $failed++;
+ $failedItems[] = [
+ 'data' => $data,
+ 'reason' => $model->getError(),
+ 'retry_count' => $queueData['retry_count'] ?? 0
+ ];
+ }
- // 插入数据库
- $result = $model->addGiftRecord($giftData);
-
- if ($result) {
- $success++;
- $gift_model->change_user_give_gift_log_callback($result, $giftData);
- Log::info("送礼记录处理成功: {$uuid}, ID: {$result}");
- } else {
+ } catch (\Exception $e) {
$failed++;
- Log::error("送礼记录处理失败: {$uuid}, 错误: " . $model->getError());
-
- // 重试逻辑
- self::retry($data);
+ $failedItems[] = [
+ 'data' => $data,
+ 'reason' => $e->getMessage(),
+ 'retry_count' => $queueData['retry_count'] ?? 0
+ ];
+ Log::error("送礼记录处理异常 {$uuid}: " . $e->getMessage());
}
+ }
- } catch (\Exception $e) {
- $failed++;
- db::name('redis_error')->insert( ['related_id' => 0,'content' => $e->getMessage(),'remark' =>$e->getMessage()]);
- Log::error('处理送礼队列失败:' . $e->getMessage());
+ // 提交事务
+ Db::commit();
+
+ // 批量执行回调(在事务外)
+ foreach ($successIds as $id => $giftData) {
+ try {
+ $gift_model->change_user_give_gift_log_callback($id, $giftData);
+ Log::info("送礼回调成功: ID: {$id}");
+ } catch (\Exception $e) {
+ Log::error("送礼回调失败 ID {$id}: " . $e->getMessage());
+ }
+ }
+
+ // 处理失败的项目
+ foreach ($failedItems as $item) {
+ self::retry($item['data'], $item['retry_count'] + 1, $item['reason']);
+ }
+
+ } catch (\Exception $e) {
+ Db::rollback();
+ $failed = $processed;
+ Log::error('批量处理送礼队列失败:' . $e->getMessage());
+
+ // 失败的数据重新放回队列
+ foreach ($items as $data) {
+ $redis->lpush(self::QUEUE_KEY, $data);
}
}
@@ -155,42 +216,241 @@ class GiftQueue
'failed' => $failed
];
- Log::info("送礼队列处理完成: " . json_encode($result));
+ Log::info("送礼队列批量处理完成: " . json_encode($result));
return $result;
}
/**
- * 重试机制
- * @param string $data 队列数据
+ * 批量从队列弹出数据
+ * @param \Redis $redis
+ * @param int $batchSize
+ * @return array
*/
- protected static function retry($data)
+ protected static function batchPop($redis, $batchSize)
+ {
+ $items = [];
+
+ // 使用pipeline提高效率
+ $pipe = $redis->multi(\Redis::PIPELINE);
+
+ for ($i = 0; $i < $batchSize; $i++) {
+ $pipe->rpop(self::QUEUE_KEY);
+ }
+
+ $results = $pipe->exec();
+
+ foreach ($results as $data) {
+ if ($data !== false && $data !== null) {
+ $items[] = $data;
+ }
+ }
+
+ return $items;
+ }
+
+ /**
+ * 优化重试机制
+ */
+ protected static function retry($data, $retryCount, $reason = '')
{
$queueData = json_decode($data, true);
if (!$queueData) {
return;
}
- $queueData['retry_count'] = ($queueData['retry_count'] ?? 0) + 1;
+ $queueData['retry_count'] = $retryCount;
+ $queueData['last_error'] = $reason;
- if ($queueData['retry_count'] <= self::MAX_RETRY) {
- // 重新加入队列
+ $redis = self::getRedis();
+
+ if ($retryCount <= self::MAX_RETRY) {
+ // 重新加入队列(延迟队列)
$newData = json_encode($queueData, JSON_UNESCAPED_UNICODE);
-// Cache::handler()->lpush(self::QUEUE_KEY, $newData);
- $redis = new Redis(config('cache'));
- $redis->handler()->lpush(self::QUEUE_KEY, $newData);
- Log::info("重试送礼记录: {$queueData['uuid']}, 重试次数: {$queueData['retry_count']}");
+
+ // 使用延迟队列:重试次数越多,延迟越长
+ $delay = $retryCount * 5; // 5, 10, 15秒延迟
+ $redis->lpush(self::QUEUE_KEY, $newData);
+
+ Log::info("重试送礼记录: {$queueData['uuid']}, 重试次数: {$retryCount}, 延迟: {$delay}s");
} else {
// 记录到失败队列
$queueData['fail_time'] = time();
+ $queueData['fail_reason'] = $reason;
$failedData = json_encode($queueData, JSON_UNESCAPED_UNICODE);
-// Cache::handler()->lpush(self::QUEUE_FAILED_KEY, $failedData);
- $redis = new Redis(config('cache'));
- $redis->handler()->lpush(self::QUEUE_FAILED_KEY, $failedData);
- Log::error("送礼记录重试超过最大次数: {$queueData['uuid']}, 数据: " . json_encode($queueData['data']));
+
+ $redis->lpush(self::QUEUE_FAILED_KEY, $failedData);
+
+ Log::error("送礼记录重试超过最大次数: {$queueData['uuid']}, 原因: {$reason}");
+
+ // 同时记录到数据库以便人工处理
+ Db::name('gift_queue_failed')->insert([
+ 'uuid' => $queueData['uuid'],
+ 'data' => json_encode($queueData['data'], JSON_UNESCAPED_UNICODE),
+ 'retry_count' => $retryCount,
+ 'reason' => $reason,
+ 'create_time' => time()
+ ]);
}
}
+ /**
+ * 紧急批量处理接口
+ * @param int $limit 最大处理数量
+ * @param int $batchSize 每批大小
+ * @return array
+ */
+ public static function emergencyProcess($limit = 10000, $batchSize = 500)
+ {
+ $totalProcessed = 0;
+ $totalSuccess = 0;
+ $totalFailed = 0;
+
+ Log::warning("开始紧急处理送礼队列,限制: {$limit}, 批量大小: {$batchSize}");
+
+ $redis = self::getRedis();
+ $queueSize = $redis->llen(self::QUEUE_KEY);
+
+ $batches = min(ceil($limit / $batchSize), ceil($queueSize / $batchSize));
+
+ for ($i = 0; $i < $batches; $i++) {
+ $result = self::process($batchSize);
+
+ $totalProcessed += $result['processed'];
+ $totalSuccess += $result['success'];
+ $totalFailed += $result['failed'];
+
+ Log::info("紧急处理批次 {$i}: " . json_encode($result));
+
+ // 每处理5批休息一下,避免CPU过高
+ if ($i % 5 == 0 && $i > 0) {
+ usleep(100000); // 100ms
+ }
+
+ // 检查是否达到限制
+ if ($totalProcessed >= $limit) {
+ break;
+ }
+ }
+
+ return [
+ 'total_processed' => $totalProcessed,
+ 'total_success' => $totalSuccess,
+ 'total_failed' => $totalFailed,
+ 'remaining' => $redis->llen(self::QUEUE_KEY)
+ ];
+ }
+
+
+ /**
+ * 处理队列
+ * @param int $batchSize 每次处理数量
+ * @return array 处理结果
+ */
+// public static function process($batchSize = 100)
+// {
+// $processed = 0;
+// $success = 0;
+// $failed = 0;
+//
+// $model = new GiveGiftBase();
+// $gift_model = new GiveGift();
+//
+// Log::info("开始处理送礼队列,批量大小: {$batchSize}");
+//
+// for ($i = 0; $i < $batchSize; $i++) {
+// try {
+// // 从队列取数据
+//// $data = Cache::handler()->rpop(self::QUEUE_KEY);
+// $redis = new Redis(config('cache'));
+// $data = $redis->handler()->rpop(self::QUEUE_KEY);
+// if (!$data) {
+// break;
+// }
+//
+// $processed++;
+//
+// $queueData = json_decode($data, true);
+// if (!$queueData || !isset($queueData['data'])) {
+// $failed++;
+// Log::error('队列数据格式错误: ' . $data);
+// continue;
+// }
+//
+// $giftData = $queueData['data'];
+// $uuid = $queueData['uuid'] ?? 'unknown';
+//
+// Log::info("处理送礼记录: {$uuid}, 用户: {$giftData['user_id']}");
+//
+// // 验证数据完整性
+// if (empty($giftData['createtime'])) {
+// $giftData['createtime'] = time();
+// }
+//
+// // 插入数据库
+// $result = $model->addGiftRecord($giftData);
+//
+// if ($result) {
+// $success++;
+// $gift_model->change_user_give_gift_log_callback($result, $giftData);
+// Log::info("送礼记录处理成功: {$uuid}, ID: {$result}");
+// } else {
+// $failed++;
+// Log::error("送礼记录处理失败: {$uuid}, 错误: " . $model->getError());
+//
+// // 重试逻辑
+// self::retry($data);
+// }
+//
+// } catch (\Exception $e) {
+// $failed++;
+// db::name('redis_error')->insert( ['related_id' => 0,'content' => $e->getMessage(),'remark' =>$e->getMessage()]);
+// Log::error('处理送礼队列失败:' . $e->getMessage());
+// }
+// }
+//
+// $result = [
+// 'processed' => $processed,
+// 'success' => $success,
+// 'failed' => $failed
+// ];
+//
+// Log::info("送礼队列处理完成: " . json_encode($result));
+//
+// return $result;
+// }
+
+ /**
+ * 重试机制
+ * @param string $data 队列数据
+ */
+// protected static function retry($data)
+// {
+// $queueData = json_decode($data, true);
+// if (!$queueData) {
+// return;
+// }
+//
+// $queueData['retry_count'] = ($queueData['retry_count'] ?? 0) + 1;
+//
+// if ($queueData['retry_count'] <= self::MAX_RETRY) {
+// // 重新加入队列
+// $newData = json_encode($queueData, JSON_UNESCAPED_UNICODE);
+//// Cache::handler()->lpush(self::QUEUE_KEY, $newData);
+// $redis = new Redis(config('cache'));
+// $redis->handler()->lpush(self::QUEUE_KEY, $newData);
+// Log::info("重试送礼记录: {$queueData['uuid']}, 重试次数: {$queueData['retry_count']}");
+// } else {
+// // 记录到失败队列
+// $queueData['fail_time'] = time();
+// $failedData = json_encode($queueData, JSON_UNESCAPED_UNICODE);
+//// Cache::handler()->lpush(self::QUEUE_FAILED_KEY, $failedData);
+// $redis = new Redis(config('cache'));
+// $redis->handler()->lpush(self::QUEUE_FAILED_KEY, $failedData);
+// Log::error("送礼记录重试超过最大次数: {$queueData['uuid']}, 数据: " . json_encode($queueData['data']));
+// }
+// }
+
/**
* 获取队列长度
* @return int