274 lines
8.3 KiB
PHP
274 lines
8.3 KiB
PHP
<?php
|
|
// application/common/library/GiftQueue.php
|
|
|
|
namespace app\common\library;
|
|
|
|
use think\Cache;
|
|
use think\Db;
|
|
use think\Log;
|
|
use app\common\model\GiveGiftBase;
|
|
use think\cache\driver\Redis;
|
|
use app\api\model\GiveGift;
|
|
|
|
class GiftQueue
|
|
{
|
|
const QUEUE_KEY = 'gift:queue';
|
|
const QUEUE_FAILED_KEY = 'gift:queue:failed';
|
|
const MAX_RETRY = 3;
|
|
|
|
/**
|
|
* 添加送礼记录到队列
|
|
* @param array $giftData 送礼数据
|
|
* @return bool
|
|
*/
|
|
public static function push($giftData)
|
|
{
|
|
try {
|
|
// 验证必要字段
|
|
if (empty($giftData['user_id']) || empty($giftData['gift_user'])) {
|
|
Log::error('送礼队列数据不完整: ' . json_encode($giftData));
|
|
return false;
|
|
}
|
|
|
|
// 添加队列元数据
|
|
$queueData = [
|
|
'data' => $giftData,
|
|
'queue_time' => time(),
|
|
'retry_count' => 0,
|
|
'uuid' => uniqid('gift_', true)
|
|
];
|
|
|
|
// 序列化数据
|
|
$data = json_encode($queueData, JSON_UNESCAPED_UNICODE);
|
|
|
|
// 使用Redis列表存储
|
|
// Cache::handler()->lpush(self::QUEUE_KEY, $data);
|
|
$redis = new Redis(config('cache'));
|
|
$redis->handler()->lpush(self::QUEUE_KEY, $data);
|
|
|
|
|
|
Log::info("送礼记录已加入队列: {$queueData['uuid']}, 用户: {$giftData['user_id']}, 收礼人: {$giftData['gift_user']}");
|
|
|
|
return true;
|
|
} catch (\Exception $e) {
|
|
Log::error('送礼队列添加失败:' . $e->getMessage());
|
|
db::name('redis_error')->insert( ['related_id' => $giftData['id'],'content' => json_encode($giftData),'remark' =>$e->getMessage()]);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 批量添加送礼记录到队列
|
|
* @param array $giftList 送礼数据列表
|
|
* @return array 处理结果
|
|
*/
|
|
public static function pushBatch($giftList)
|
|
{
|
|
$success = 0;
|
|
$failed = 0;
|
|
|
|
foreach ($giftList as $giftData) {
|
|
if (self::push($giftData)) {
|
|
$success++;
|
|
} else {
|
|
$failed++;
|
|
}
|
|
}
|
|
|
|
return [
|
|
'success' => $success,
|
|
'failed' => $failed,
|
|
'total' => count($giftList)
|
|
];
|
|
}
|
|
|
|
/**
|
|
* 处理队列
|
|
* @param int $batchSize 每次处理数量
|
|
* @return array 处理结果
|
|
*/
|
|
public static function process($batchSize = 100)
|
|
{
|
|
$processed = 0;
|
|
$success = 0;
|
|
$failed = 0;
|
|
|
|
$model = new GiveGiftBase();
|
|
$gift_model = new GiveGift();
|
|
|
|
Log::info("开始处理送礼队列,批量大小: {$batchSize}");
|
|
|
|
for ($i = 0; $i < $batchSize; $i++) {
|
|
try {
|
|
// 从队列取数据
|
|
// $data = Cache::handler()->rpop(self::QUEUE_KEY);
|
|
$redis = new Redis(config('cache'));
|
|
$data = $redis->handler()->rpop(self::QUEUE_KEY);
|
|
if (!$data) {
|
|
break;
|
|
}
|
|
|
|
$processed++;
|
|
|
|
$queueData = json_decode($data, true);
|
|
if (!$queueData || !isset($queueData['data'])) {
|
|
$failed++;
|
|
Log::error('队列数据格式错误: ' . $data);
|
|
continue;
|
|
}
|
|
|
|
$giftData = $queueData['data'];
|
|
$uuid = $queueData['uuid'] ?? 'unknown';
|
|
|
|
Log::info("处理送礼记录: {$uuid}, 用户: {$giftData['user_id']}");
|
|
|
|
// 验证数据完整性
|
|
if (empty($giftData['createtime'])) {
|
|
$giftData['createtime'] = time();
|
|
}
|
|
|
|
// 插入数据库
|
|
$result = $model->addGiftRecord($giftData);
|
|
|
|
if ($result) {
|
|
$success++;
|
|
$gift_model->change_user_give_gift_log_callback($result, $giftData);
|
|
Log::info("送礼记录处理成功: {$uuid}, ID: {$result}");
|
|
} else {
|
|
$failed++;
|
|
Log::error("送礼记录处理失败: {$uuid}, 错误: " . $model->getError());
|
|
|
|
// 重试逻辑
|
|
self::retry($data);
|
|
}
|
|
|
|
} catch (\Exception $e) {
|
|
$failed++;
|
|
Log::error('处理送礼队列失败:' . $e->getMessage());
|
|
}
|
|
}
|
|
|
|
$result = [
|
|
'processed' => $processed,
|
|
'success' => $success,
|
|
'failed' => $failed
|
|
];
|
|
|
|
Log::info("送礼队列处理完成: " . json_encode($result));
|
|
|
|
return $result;
|
|
}
|
|
|
|
/**
|
|
* 重试机制
|
|
* @param string $data 队列数据
|
|
*/
|
|
protected static function retry($data)
|
|
{
|
|
$queueData = json_decode($data, true);
|
|
if (!$queueData) {
|
|
return;
|
|
}
|
|
|
|
$queueData['retry_count'] = ($queueData['retry_count'] ?? 0) + 1;
|
|
|
|
if ($queueData['retry_count'] <= self::MAX_RETRY) {
|
|
// 重新加入队列
|
|
$newData = json_encode($queueData, JSON_UNESCAPED_UNICODE);
|
|
// Cache::handler()->lpush(self::QUEUE_KEY, $newData);
|
|
$redis = new Redis(config('cache'));
|
|
$redis->handler()->lpush(self::QUEUE_KEY, $newData);
|
|
Log::info("重试送礼记录: {$queueData['uuid']}, 重试次数: {$queueData['retry_count']}");
|
|
} else {
|
|
// 记录到失败队列
|
|
$queueData['fail_time'] = time();
|
|
$failedData = json_encode($queueData, JSON_UNESCAPED_UNICODE);
|
|
// Cache::handler()->lpush(self::QUEUE_FAILED_KEY, $failedData);
|
|
$redis = new Redis(config('cache'));
|
|
$redis->handler()->lpush(self::QUEUE_FAILED_KEY, $failedData);
|
|
Log::error("送礼记录重试超过最大次数: {$queueData['uuid']}, 数据: " . json_encode($queueData['data']));
|
|
db::name('redis_error')->insert( ['related_id' => 0,'content' => json_encode($queueData['data']),'remark' =>'送礼记录重试超过最大次数']);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 获取队列长度
|
|
* @return int
|
|
*/
|
|
public static function size()
|
|
{
|
|
try {
|
|
// return Cache::handler()->llen(self::QUEUE_KEY);
|
|
$redis = new Redis(config('cache'));
|
|
return $redis->handler()->lLen(self::QUEUE_KEY);
|
|
} catch (\Exception $e) {
|
|
Log::error('获取队列长度失败: ' . $e->getMessage());
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 获取失败队列长度
|
|
* @return int
|
|
*/
|
|
public static function failedSize()
|
|
{
|
|
try {
|
|
// return Cache::handler()->llen(self::QUEUE_FAILED_KEY);
|
|
$redis = new Redis(config('cache'));
|
|
return $redis->handler()->lLen(self::QUEUE_FAILED_KEY);
|
|
} catch (\Exception $e) {
|
|
Log::error('获取失败队列长度失败: ' . $e->getMessage());
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 获取队列统计信息
|
|
* @return array
|
|
*/
|
|
public static function stats()
|
|
{
|
|
return [
|
|
'queue_size' => self::size(),
|
|
'failed_size' => self::failedSize(),
|
|
'status' => self::size() > 1000 ? '繁忙' : (self::size() > 100 ? '正常' : '空闲')
|
|
];
|
|
}
|
|
|
|
/**
|
|
* 清空队列
|
|
* @return bool
|
|
*/
|
|
public static function clear()
|
|
{
|
|
try {
|
|
// Cache::handler()->del(self::QUEUE_KEY);
|
|
$redis = new Redis(config('cache'));
|
|
$redis->handler()->del(self::QUEUE_KEY);
|
|
Log::info('送礼队列已清空');
|
|
return true;
|
|
} catch (\Exception $e) {
|
|
Log::error('清空队列失败: ' . $e->getMessage());
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 清理失败队列
|
|
* @return bool
|
|
*/
|
|
public static function clearFailed()
|
|
{
|
|
try {
|
|
// Cache::handler()->del(self::QUEUE_FAILED_KEY);
|
|
$redis = new Redis(config('cache'));
|
|
$redis->handler()->del(self::QUEUE_FAILED_KEY);
|
|
Log::info('送礼失败队列已清空');
|
|
return true;
|
|
} catch (\Exception $e) {
|
|
Log::error('清空失败队列失败: ' . $e->getMessage());
|
|
return false;
|
|
}
|
|
}
|
|
} |