Files
yusheng-php/application/common/library/GiftQueue.php
2026-01-13 20:27:18 +08:00

534 lines
16 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<?php
// application/common/library/GiftQueue.php
namespace app\common\library;
use think\Cache;
use think\Db;
use think\Log;
use app\common\model\GiveGiftBase;
use think\cache\driver\Redis;
use app\api\model\GiveGift;
class GiftQueue
{
const QUEUE_KEY = 'gift:queue';
const QUEUE_FAILED_KEY = 'gift:queue:failed';
const MAX_RETRY = 3;
// 单例Redis连接
private static $redis = null;
/**
* 获取Redis实例单例
* @return \Redis
*/
protected static function getRedis()
{
if (self::$redis === null) {
$config = config('cache');
$redisDriver = new Redis($config);
self::$redis = $redisDriver->handler();
}
return self::$redis;
}
/**
* 添加送礼记录到队列
* @param array $giftData 送礼数据
* @return bool
*/
public static function push($giftData)
{
try {
// 验证必要字段
if (empty($giftData['user_id']) || empty($giftData['gift_user'])) {
Log::error('送礼队列数据不完整: ' . json_encode($giftData));
return false;
}
// 添加队列元数据
$queueData = [
'data' => $giftData,
'queue_time' => time(),
'retry_count' => 0,
'uuid' => uniqid('gift_', true)
];
// 序列化数据
$data = json_encode($queueData, JSON_UNESCAPED_UNICODE);
// 使用Redis列表存储
// Cache::handler()->lpush(self::QUEUE_KEY, $data);
$redis = new Redis(config('cache'));
$redis->handler()->lpush(self::QUEUE_KEY, $data);
Log::info("送礼记录已加入队列: {$queueData['uuid']}, 用户: {$giftData['user_id']}, 收礼人: {$giftData['gift_user']}");
return true;
} catch (\Exception $e) {
Log::error('送礼队列添加失败:' . $e->getMessage());
db::name('redis_error')->insert( ['related_id' => $giftData['id'],'content' => json_encode($giftData),'remark' =>$e->getMessage()]);
return false;
}
}
/**
* 批量添加送礼记录到队列
* @param array $giftList 送礼数据列表
* @return array 处理结果
*/
public static function pushBatch($giftList)
{
$success = 0;
$failed = 0;
foreach ($giftList as $giftData) {
if (self::push($giftData)) {
$success++;
} else {
$failed++;
}
}
return [
'success' => $success,
'failed' => $failed,
'total' => count($giftList)
];
}
/**
* 批量处理队列(优化版)
* @param int $batchSize 每次处理数量
* @return array 处理结果
*/
public static function process($batchSize = 100)
{
$processed = 0;
$success = 0;
$failed = 0;
$model = new GiveGiftBase();
$gift_model = new GiveGift();
$redis = self::getRedis();
Log::info("开始批量处理送礼队列,批量大小=: {$batchSize}");
// 批量获取数据
$items = self::batchPop($redis, $batchSize);
if (empty($items)) {
return [
'processed' => 0,
'success' => 0,
'failed' => 0
];
}
$processed = count($items);
try {
// 开启事务批量处理
Db::startTrans();
$successIds = [];
$failedItems = [];
foreach ($items as $index => $data) {
$queueData = json_decode($data, true);
if (!$queueData || !isset($queueData['data'])) {
$failed++;
Log::error('队列数据格式错误: ' . $data);
continue;
}
$giftData = $queueData['data'];
$uuid = $queueData['uuid'] ?? 'unknown_' . $index;
try {
// 插入数据库
$result = $model->addGiftRecord($giftData);
if ($result) {
$successIds[$result] = $giftData;
$success++;
} else {
$failed++;
$failedItems[] = [
'data' => $data,
'reason' => $model->getError(),
'retry_count' => $queueData['retry_count'] ?? 0
];
}
} catch (\Exception $e) {
$failed++;
$failedItems[] = [
'data' => $data,
'reason' => $e->getMessage(),
'retry_count' => $queueData['retry_count'] ?? 0
];
Log::error("送礼记录处理异常 {$uuid}: " . $e->getMessage());
}
}
// 提交事务
Db::commit();
// 批量执行回调(在事务外)
foreach ($successIds as $id => $giftData) {
try {
$gift_model->change_user_give_gift_log_callback($id, $giftData);
Log::info("送礼回调成功: ID: {$id}");
} catch (\Exception $e) {
Log::error("送礼回调失败 ID {$id}: " . $e->getMessage());
}
}
// 处理失败的项目
foreach ($failedItems as $item) {
self::retry($item['data'], $item['retry_count'] + 1, $item['reason']);
}
} catch (\Exception $e) {
Db::rollback();
$failed = $processed;
Log::error('批量处理送礼队列失败:' . $e->getMessage());
// 失败的数据重新放回队列
foreach ($items as $data) {
$redis->lpush(self::QUEUE_KEY, $data);
}
}
$result = [
'processed' => $processed,
'success' => $success,
'failed' => $failed
];
Log::info("送礼队列批量处理完成: " . json_encode($result));
return $result;
}
/**
* 批量从队列弹出数据
* @param \Redis $redis
* @param int $batchSize
* @return array
*/
protected static function batchPop($redis, $batchSize)
{
$items = [];
// 使用pipeline提高效率
$pipe = $redis->multi(\Redis::PIPELINE);
for ($i = 0; $i < $batchSize; $i++) {
$pipe->rpop(self::QUEUE_KEY);
}
$results = $pipe->exec();
foreach ($results as $data) {
if ($data !== false && $data !== null) {
$items[] = $data;
}
}
return $items;
}
/**
* 优化重试机制
*/
protected static function retry($data, $retryCount, $reason = '')
{
$queueData = json_decode($data, true);
if (!$queueData) {
return;
}
$queueData['retry_count'] = $retryCount;
$queueData['last_error'] = $reason;
$redis = self::getRedis();
if ($retryCount <= self::MAX_RETRY) {
// 重新加入队列(延迟队列)
$newData = json_encode($queueData, JSON_UNESCAPED_UNICODE);
// 使用延迟队列:重试次数越多,延迟越长
$delay = $retryCount * 5; // 5, 10, 15秒延迟
$redis->lpush(self::QUEUE_KEY, $newData);
Log::info("重试送礼记录: {$queueData['uuid']}, 重试次数: {$retryCount}, 延迟: {$delay}s");
} else {
// 记录到失败队列
$queueData['fail_time'] = time();
$queueData['fail_reason'] = $reason;
$failedData = json_encode($queueData, JSON_UNESCAPED_UNICODE);
$redis->lpush(self::QUEUE_FAILED_KEY, $failedData);
Log::error("送礼记录重试超过最大次数: {$queueData['uuid']}, 原因: {$reason}");
// 同时记录到数据库以便人工处理
Db::name('gift_queue_failed')->insert([
'uuid' => $queueData['uuid'],
'data' => json_encode($queueData['data'], JSON_UNESCAPED_UNICODE),
'retry_count' => $retryCount,
'reason' => $reason,
'create_time' => time()
]);
}
}
/**
* 紧急批量处理接口
* @param int $limit 最大处理数量
* @param int $batchSize 每批大小
* @return array
*/
public static function emergencyProcess($limit = 10000, $batchSize = 500)
{
$totalProcessed = 0;
$totalSuccess = 0;
$totalFailed = 0;
Log::warning("开始紧急处理送礼队列,限制: {$limit}, 批量大小: {$batchSize}");
$redis = self::getRedis();
$queueSize = $redis->llen(self::QUEUE_KEY);
$batches = min(ceil($limit / $batchSize), ceil($queueSize / $batchSize));
for ($i = 0; $i < $batches; $i++) {
$result = self::process($batchSize);
$totalProcessed += $result['processed'];
$totalSuccess += $result['success'];
$totalFailed += $result['failed'];
Log::info("紧急处理批次 {$i}: " . json_encode($result));
// 每处理5批休息一下避免CPU过高
if ($i % 5 == 0 && $i > 0) {
usleep(100000); // 100ms
}
// 检查是否达到限制
if ($totalProcessed >= $limit) {
break;
}
}
return [
'total_processed' => $totalProcessed,
'total_success' => $totalSuccess,
'total_failed' => $totalFailed,
'remaining' => $redis->llen(self::QUEUE_KEY)
];
}
/**
* 处理队列
* @param int $batchSize 每次处理数量
* @return array 处理结果
*/
// public static function process($batchSize = 100)
// {
// $processed = 0;
// $success = 0;
// $failed = 0;
//
// $model = new GiveGiftBase();
// $gift_model = new GiveGift();
//
// Log::info("开始处理送礼队列,批量大小: {$batchSize}");
//
// for ($i = 0; $i < $batchSize; $i++) {
// try {
// // 从队列取数据
//// $data = Cache::handler()->rpop(self::QUEUE_KEY);
// $redis = new Redis(config('cache'));
// $data = $redis->handler()->rpop(self::QUEUE_KEY);
// if (!$data) {
// break;
// }
//
// $processed++;
//
// $queueData = json_decode($data, true);
// if (!$queueData || !isset($queueData['data'])) {
// $failed++;
// Log::error('队列数据格式错误: ' . $data);
// continue;
// }
//
// $giftData = $queueData['data'];
// $uuid = $queueData['uuid'] ?? 'unknown';
//
// Log::info("处理送礼记录: {$uuid}, 用户: {$giftData['user_id']}");
//
// // 验证数据完整性
// if (empty($giftData['createtime'])) {
// $giftData['createtime'] = time();
// }
//
// // 插入数据库
// $result = $model->addGiftRecord($giftData);
//
// if ($result) {
// $success++;
// $gift_model->change_user_give_gift_log_callback($result, $giftData);
// Log::info("送礼记录处理成功: {$uuid}, ID: {$result}");
// } else {
// $failed++;
// Log::error("送礼记录处理失败: {$uuid}, 错误: " . $model->getError());
//
// // 重试逻辑
// self::retry($data);
// }
//
// } catch (\Exception $e) {
// $failed++;
// db::name('redis_error')->insert( ['related_id' => 0,'content' => $e->getMessage(),'remark' =>$e->getMessage()]);
// Log::error('处理送礼队列失败:' . $e->getMessage());
// }
// }
//
// $result = [
// 'processed' => $processed,
// 'success' => $success,
// 'failed' => $failed
// ];
//
// Log::info("送礼队列处理完成: " . json_encode($result));
//
// return $result;
// }
/**
* 重试机制
* @param string $data 队列数据
*/
// protected static function retry($data)
// {
// $queueData = json_decode($data, true);
// if (!$queueData) {
// return;
// }
//
// $queueData['retry_count'] = ($queueData['retry_count'] ?? 0) + 1;
//
// if ($queueData['retry_count'] <= self::MAX_RETRY) {
// // 重新加入队列
// $newData = json_encode($queueData, JSON_UNESCAPED_UNICODE);
//// Cache::handler()->lpush(self::QUEUE_KEY, $newData);
// $redis = new Redis(config('cache'));
// $redis->handler()->lpush(self::QUEUE_KEY, $newData);
// Log::info("重试送礼记录: {$queueData['uuid']}, 重试次数: {$queueData['retry_count']}");
// } else {
// // 记录到失败队列
// $queueData['fail_time'] = time();
// $failedData = json_encode($queueData, JSON_UNESCAPED_UNICODE);
//// Cache::handler()->lpush(self::QUEUE_FAILED_KEY, $failedData);
// $redis = new Redis(config('cache'));
// $redis->handler()->lpush(self::QUEUE_FAILED_KEY, $failedData);
// Log::error("送礼记录重试超过最大次数: {$queueData['uuid']}, 数据: " . json_encode($queueData['data']));
// }
// }
/**
* 获取队列长度
* @return int
*/
public static function size()
{
try {
// return Cache::handler()->llen(self::QUEUE_KEY);
$redis = new Redis(config('cache'));
return $redis->handler()->lLen(self::QUEUE_KEY);
} catch (\Exception $e) {
Log::error('获取队列长度失败: ' . $e->getMessage());
return 0;
}
}
/**
* 获取失败队列长度
* @return int
*/
public static function failedSize()
{
try {
// return Cache::handler()->llen(self::QUEUE_FAILED_KEY);
$redis = new Redis(config('cache'));
return $redis->handler()->lLen(self::QUEUE_FAILED_KEY);
} catch (\Exception $e) {
Log::error('获取失败队列长度失败: ' . $e->getMessage());
return 0;
}
}
/**
* 获取队列统计信息
* @return array
*/
public static function stats()
{
return [
'queue_size' => self::size(),
'failed_size' => self::failedSize(),
'status' => self::size() > 1000 ? '繁忙' : (self::size() > 100 ? '正常' : '空闲')
];
}
/**
* 清空队列
* @return bool
*/
public static function clear()
{
try {
// Cache::handler()->del(self::QUEUE_KEY);
$redis = new Redis(config('cache'));
$redis->handler()->del(self::QUEUE_KEY);
Log::info('送礼队列已清空');
return true;
} catch (\Exception $e) {
Log::error('清空队列失败: ' . $e->getMessage());
return false;
}
}
/**
* 清理失败队列
* @return bool
*/
public static function clearFailed()
{
try {
// Cache::handler()->del(self::QUEUE_FAILED_KEY);
$redis = new Redis(config('cache'));
$redis->handler()->del(self::QUEUE_FAILED_KEY);
Log::info('送礼失败队列已清空');
return true;
} catch (\Exception $e) {
Log::error('清空失败队列失败: ' . $e->getMessage());
return false;
}
}
}