diff --git a/application/command.php b/application/command.php
index 9fec6828..563fc512 100644
--- a/application/command.php
+++ b/application/command.php
@@ -22,5 +22,5 @@ return [
'process:gift_queue' => 'app\common\command\ProcessGiftQueue',
'queue:monitor' => 'app\common\command\QueueMonitor',
'migrate:money_log' => 'app\common\command\MigrateMoneyLog',
- 'emergency:gift_queue' => 'app\common\command\EmergencyGiftQueue',
+
];
diff --git a/application/common/command/EmergencyGiftQueue.php b/application/common/command/EmergencyGiftQueue.php
deleted file mode 100644
index 34fd56e0..00000000
--- a/application/common/command/EmergencyGiftQueue.php
+++ /dev/null
@@ -1,94 +0,0 @@
-setName('emergency:gift_queue')
- ->setDescription('紧急处理积压送礼队列')
- ->addOption('limit', 'l', Option::VALUE_OPTIONAL, '最大处理数量', 10000)
- ->addOption('batch-size', 'b', Option::VALUE_OPTIONAL, '每批处理数量', 500)
- ->addOption('concurrent', 'c', Option::VALUE_OPTIONAL, '并发进程数', 1);
- }
-
- protected function execute(Input $input, Output $output)
- {
- $limit = $input->getOption('limit');
- $batchSize = $input->getOption('batch-size');
- $concurrent = $input->getOption('concurrent');
-
- $output->writeln("开始紧急处理积压送礼队列...");
- $output->writeln("最大处理: {$limit}, 批量大小: {$batchSize}, 并发数: {$concurrent}");
-
- if ($concurrent > 1) {
- // 多进程处理
- $this->multiProcess($output, $limit, $batchSize, $concurrent);
- } else {
- // 单进程处理
- $result = GiftQueue::emergencyProcess($limit, $batchSize);
- $this->outputResult($output, $result);
- }
- }
-
- protected function multiProcess($output, $limit, $batchSize, $concurrent)
- {
- $perProcess = ceil($limit / $concurrent);
-
- $output->writeln("每个进程处理: {$perProcess}条");
-
- $pids = [];
-
- for ($i = 0; $i < $concurrent; $i++) {
- $pid = pcntl_fork();
-
- if ($pid == -1) {
- $output->writeln("创建子进程失败");
- exit(1);
- } elseif ($pid == 0) {
- // 子进程
- $result = GiftQueue::emergencyProcess($perProcess, $batchSize);
- $output->writeln("子进程 {$i} 完成: " . json_encode($result));
- exit(0);
- } else {
- // 父进程
- $pids[] = $pid;
- $output->writeln("启动子进程 {$i}, PID: {$pid}");
- }
- }
-
- // 等待所有子进程完成
- foreach ($pids as $pid) {
- pcntl_waitpid($pid, $status);
- $output->writeln("子进程 {$pid} 已完成");
- }
-
- // 输出最终统计
- $finalSize = GiftQueue::size();
- $output->writeln("所有进程处理完成,剩余队列大小: {$finalSize}");
- }
-
- protected function outputResult($output, $result)
- {
- $output->writeln("处理结果:");
- $output->writeln("总计处理: {$result['total_processed']}");
- $output->writeln("成功: {$result['total_success']}");
- $output->writeln("失败: {$result['total_failed']}");
- $output->writeln("剩余队列大小: {$result['remaining']}");
-
- if ($result['remaining'] > 0) {
- $output->writeln("队列仍有积压,建议增加并发处理");
- } else {
- $output->writeln("队列积压已清理完成");
- }
- }
-}
\ No newline at end of file
diff --git a/application/common/library/GiftQueue.php b/application/common/library/GiftQueue.php
index 78e51d57..78f5c99c 100644
--- a/application/common/library/GiftQueue.php
+++ b/application/common/library/GiftQueue.php
@@ -16,26 +16,6 @@ class GiftQueue
const QUEUE_FAILED_KEY = 'gift:queue:failed';
const MAX_RETRY = 3;
-
- // 单例Redis连接
- private static $redis = null;
-
- /**
- * 获取Redis实例(单例)
- * @return \Redis
- */
- protected static function getRedis()
- {
- if (self::$redis === null) {
- $config = config('cache');
- $redisDriver = new Redis($config);
- self::$redis = $redisDriver->handler();
- }
- return self::$redis;
- }
-
-
-
/**
* 添加送礼记录到队列
* @param array $giftData 送礼数据
@@ -102,11 +82,8 @@ class GiftQueue
];
}
-
-
-
/**
- * 批量处理队列(优化版)
+ * 处理队列
* @param int $batchSize 每次处理数量
* @return array 处理结果
*/
@@ -118,33 +95,22 @@ class GiftQueue
$model = new GiveGiftBase();
$gift_model = new GiveGift();
- $redis = self::getRedis();
- Log::info("开始批量处理送礼队列,批量大小=: {$batchSize}");
+ Log::info("开始处理送礼队列,批量大小: {$batchSize}");
- // 批量获取数据
- $items = self::batchPop($redis, $batchSize);
+ for ($i = 0; $i < $batchSize; $i++) {
+ try {
+ // 从队列取数据
+// $data = Cache::handler()->rpop(self::QUEUE_KEY);
+ $redis = new Redis(config('cache'));
+ $data = $redis->handler()->rpop(self::QUEUE_KEY);
+ if (!$data) {
+ break;
+ }
- if (empty($items)) {
- return [
- 'processed' => 0,
- 'success' => 0,
- 'failed' => 0
- ];
- }
+ $processed++;
- $processed = count($items);
-
- try {
- // 开启事务批量处理
- Db::startTrans();
-
- $successIds = [];
- $failedItems = [];
-
- foreach ($items as $index => $data) {
$queueData = json_decode($data, true);
-
if (!$queueData || !isset($queueData['data'])) {
$failed++;
Log::error('队列数据格式错误: ' . $data);
@@ -152,61 +118,34 @@ class GiftQueue
}
$giftData = $queueData['data'];
- $uuid = $queueData['uuid'] ?? 'unknown_' . $index;
+ $uuid = $queueData['uuid'] ?? 'unknown';
- try {
- // 插入数据库
- $result = $model->addGiftRecord($giftData);
+ Log::info("处理送礼记录: {$uuid}, 用户: {$giftData['user_id']}");
- if ($result) {
- $successIds[$result] = $giftData;
- $success++;
- } else {
- $failed++;
- $failedItems[] = [
- 'data' => $data,
- 'reason' => $model->getError(),
- 'retry_count' => $queueData['retry_count'] ?? 0
- ];
- }
+ // 验证数据完整性
+ if (empty($giftData['createtime'])) {
+ $giftData['createtime'] = time();
+ }
- } catch (\Exception $e) {
+ // 插入数据库
+ $result = $model->addGiftRecord($giftData);
+
+ if ($result) {
+ $success++;
+ $gift_model->change_user_give_gift_log_callback($result, $giftData);
+ Log::info("送礼记录处理成功: {$uuid}, ID: {$result}");
+ } else {
$failed++;
- $failedItems[] = [
- 'data' => $data,
- 'reason' => $e->getMessage(),
- 'retry_count' => $queueData['retry_count'] ?? 0
- ];
- Log::error("送礼记录处理异常 {$uuid}: " . $e->getMessage());
+ Log::error("送礼记录处理失败: {$uuid}, 错误: " . $model->getError());
+
+ // 重试逻辑
+ self::retry($data);
}
- }
- // 提交事务
- Db::commit();
-
- // 批量执行回调(在事务外)
- foreach ($successIds as $id => $giftData) {
- try {
- $gift_model->change_user_give_gift_log_callback($id, $giftData);
- Log::info("送礼回调成功: ID: {$id}");
- } catch (\Exception $e) {
- Log::error("送礼回调失败 ID {$id}: " . $e->getMessage());
- }
- }
-
- // 处理失败的项目
- foreach ($failedItems as $item) {
- self::retry($item['data'], $item['retry_count'] + 1, $item['reason']);
- }
-
- } catch (\Exception $e) {
- Db::rollback();
- $failed = $processed;
- Log::error('批量处理送礼队列失败:' . $e->getMessage());
-
- // 失败的数据重新放回队列
- foreach ($items as $data) {
- $redis->lpush(self::QUEUE_KEY, $data);
+ } catch (\Exception $e) {
+ $failed++;
+ db::name('redis_error')->insert( ['related_id' => 0,'content' => $e->getMessage(),'remark' =>$e->getMessage()]);
+ Log::error('处理送礼队列失败:' . $e->getMessage());
}
}
@@ -216,241 +155,42 @@ class GiftQueue
'failed' => $failed
];
- Log::info("送礼队列批量处理完成: " . json_encode($result));
+ Log::info("送礼队列处理完成: " . json_encode($result));
return $result;
}
/**
- * 批量从队列弹出数据
- * @param \Redis $redis
- * @param int $batchSize
- * @return array
+ * 重试机制
+ * @param string $data 队列数据
*/
- protected static function batchPop($redis, $batchSize)
- {
- $items = [];
-
- // 使用pipeline提高效率
- $pipe = $redis->multi(\Redis::PIPELINE);
-
- for ($i = 0; $i < $batchSize; $i++) {
- $pipe->rpop(self::QUEUE_KEY);
- }
-
- $results = $pipe->exec();
-
- foreach ($results as $data) {
- if ($data !== false && $data !== null) {
- $items[] = $data;
- }
- }
-
- return $items;
- }
-
- /**
- * 优化重试机制
- */
- protected static function retry($data, $retryCount, $reason = '')
+ protected static function retry($data)
{
$queueData = json_decode($data, true);
if (!$queueData) {
return;
}
- $queueData['retry_count'] = $retryCount;
- $queueData['last_error'] = $reason;
+ $queueData['retry_count'] = ($queueData['retry_count'] ?? 0) + 1;
- $redis = self::getRedis();
-
- if ($retryCount <= self::MAX_RETRY) {
- // 重新加入队列(延迟队列)
+ if ($queueData['retry_count'] <= self::MAX_RETRY) {
+ // 重新加入队列
$newData = json_encode($queueData, JSON_UNESCAPED_UNICODE);
-
- // 使用延迟队列:重试次数越多,延迟越长
- $delay = $retryCount * 5; // 5, 10, 15秒延迟
- $redis->lpush(self::QUEUE_KEY, $newData);
-
- Log::info("重试送礼记录: {$queueData['uuid']}, 重试次数: {$retryCount}, 延迟: {$delay}s");
+// Cache::handler()->lpush(self::QUEUE_KEY, $newData);
+ $redis = new Redis(config('cache'));
+ $redis->handler()->lpush(self::QUEUE_KEY, $newData);
+ Log::info("重试送礼记录: {$queueData['uuid']}, 重试次数: {$queueData['retry_count']}");
} else {
// 记录到失败队列
$queueData['fail_time'] = time();
- $queueData['fail_reason'] = $reason;
$failedData = json_encode($queueData, JSON_UNESCAPED_UNICODE);
-
- $redis->lpush(self::QUEUE_FAILED_KEY, $failedData);
-
- Log::error("送礼记录重试超过最大次数: {$queueData['uuid']}, 原因: {$reason}");
-
- // 同时记录到数据库以便人工处理
- Db::name('gift_queue_failed')->insert([
- 'uuid' => $queueData['uuid'],
- 'data' => json_encode($queueData['data'], JSON_UNESCAPED_UNICODE),
- 'retry_count' => $retryCount,
- 'reason' => $reason,
- 'create_time' => time()
- ]);
+// Cache::handler()->lpush(self::QUEUE_FAILED_KEY, $failedData);
+ $redis = new Redis(config('cache'));
+ $redis->handler()->lpush(self::QUEUE_FAILED_KEY, $failedData);
+ Log::error("送礼记录重试超过最大次数: {$queueData['uuid']}, 数据: " . json_encode($queueData['data']));
}
}
- /**
- * 紧急批量处理接口
- * @param int $limit 最大处理数量
- * @param int $batchSize 每批大小
- * @return array
- */
- public static function emergencyProcess($limit = 10000, $batchSize = 500)
- {
- $totalProcessed = 0;
- $totalSuccess = 0;
- $totalFailed = 0;
-
- Log::warning("开始紧急处理送礼队列,限制: {$limit}, 批量大小: {$batchSize}");
-
- $redis = self::getRedis();
- $queueSize = $redis->llen(self::QUEUE_KEY);
-
- $batches = min(ceil($limit / $batchSize), ceil($queueSize / $batchSize));
-
- for ($i = 0; $i < $batches; $i++) {
- $result = self::process($batchSize);
-
- $totalProcessed += $result['processed'];
- $totalSuccess += $result['success'];
- $totalFailed += $result['failed'];
-
- Log::info("紧急处理批次 {$i}: " . json_encode($result));
-
- // 每处理5批休息一下,避免CPU过高
- if ($i % 5 == 0 && $i > 0) {
- usleep(100000); // 100ms
- }
-
- // 检查是否达到限制
- if ($totalProcessed >= $limit) {
- break;
- }
- }
-
- return [
- 'total_processed' => $totalProcessed,
- 'total_success' => $totalSuccess,
- 'total_failed' => $totalFailed,
- 'remaining' => $redis->llen(self::QUEUE_KEY)
- ];
- }
-
-
- /**
- * 处理队列
- * @param int $batchSize 每次处理数量
- * @return array 处理结果
- */
-// public static function process($batchSize = 100)
-// {
-// $processed = 0;
-// $success = 0;
-// $failed = 0;
-//
-// $model = new GiveGiftBase();
-// $gift_model = new GiveGift();
-//
-// Log::info("开始处理送礼队列,批量大小: {$batchSize}");
-//
-// for ($i = 0; $i < $batchSize; $i++) {
-// try {
-// // 从队列取数据
-//// $data = Cache::handler()->rpop(self::QUEUE_KEY);
-// $redis = new Redis(config('cache'));
-// $data = $redis->handler()->rpop(self::QUEUE_KEY);
-// if (!$data) {
-// break;
-// }
-//
-// $processed++;
-//
-// $queueData = json_decode($data, true);
-// if (!$queueData || !isset($queueData['data'])) {
-// $failed++;
-// Log::error('队列数据格式错误: ' . $data);
-// continue;
-// }
-//
-// $giftData = $queueData['data'];
-// $uuid = $queueData['uuid'] ?? 'unknown';
-//
-// Log::info("处理送礼记录: {$uuid}, 用户: {$giftData['user_id']}");
-//
-// // 验证数据完整性
-// if (empty($giftData['createtime'])) {
-// $giftData['createtime'] = time();
-// }
-//
-// // 插入数据库
-// $result = $model->addGiftRecord($giftData);
-//
-// if ($result) {
-// $success++;
-// $gift_model->change_user_give_gift_log_callback($result, $giftData);
-// Log::info("送礼记录处理成功: {$uuid}, ID: {$result}");
-// } else {
-// $failed++;
-// Log::error("送礼记录处理失败: {$uuid}, 错误: " . $model->getError());
-//
-// // 重试逻辑
-// self::retry($data);
-// }
-//
-// } catch (\Exception $e) {
-// $failed++;
-// db::name('redis_error')->insert( ['related_id' => 0,'content' => $e->getMessage(),'remark' =>$e->getMessage()]);
-// Log::error('处理送礼队列失败:' . $e->getMessage());
-// }
-// }
-//
-// $result = [
-// 'processed' => $processed,
-// 'success' => $success,
-// 'failed' => $failed
-// ];
-//
-// Log::info("送礼队列处理完成: " . json_encode($result));
-//
-// return $result;
-// }
-
- /**
- * 重试机制
- * @param string $data 队列数据
- */
-// protected static function retry($data)
-// {
-// $queueData = json_decode($data, true);
-// if (!$queueData) {
-// return;
-// }
-//
-// $queueData['retry_count'] = ($queueData['retry_count'] ?? 0) + 1;
-//
-// if ($queueData['retry_count'] <= self::MAX_RETRY) {
-// // 重新加入队列
-// $newData = json_encode($queueData, JSON_UNESCAPED_UNICODE);
-//// Cache::handler()->lpush(self::QUEUE_KEY, $newData);
-// $redis = new Redis(config('cache'));
-// $redis->handler()->lpush(self::QUEUE_KEY, $newData);
-// Log::info("重试送礼记录: {$queueData['uuid']}, 重试次数: {$queueData['retry_count']}");
-// } else {
-// // 记录到失败队列
-// $queueData['fail_time'] = time();
-// $failedData = json_encode($queueData, JSON_UNESCAPED_UNICODE);
-//// Cache::handler()->lpush(self::QUEUE_FAILED_KEY, $failedData);
-// $redis = new Redis(config('cache'));
-// $redis->handler()->lpush(self::QUEUE_FAILED_KEY, $failedData);
-// Log::error("送礼记录重试超过最大次数: {$queueData['uuid']}, 数据: " . json_encode($queueData['data']));
-// }
-// }
-
/**
* 获取队列长度
* @return int