$giftData, 'queue_time' => time(), 'retry_count' => 0, 'uuid' => uniqid('gift_', true) ]; // 序列化数据 $data = json_encode($queueData, JSON_UNESCAPED_UNICODE); // 使用Redis列表存储 // Cache::handler()->lpush(self::QUEUE_KEY, $data); $redis = new Redis(config('cache')); $redis->handler()->lpush(self::QUEUE_KEY, $data); Log::info("送礼记录已加入队列: {$queueData['uuid']}, 用户: {$giftData['user_id']}, 收礼人: {$giftData['gift_user']}"); return true; } catch (\Exception $e) { Log::error('送礼队列添加失败:' . $e->getMessage()); db::name('redis_error')->insert( ['related_id' => $giftData['id'],'content' => json_encode($giftData),'remark' =>$e->getMessage()]); return false; } } /** * 批量添加送礼记录到队列 * @param array $giftList 送礼数据列表 * @return array 处理结果 */ public static function pushBatch($giftList) { $success = 0; $failed = 0; foreach ($giftList as $giftData) { if (self::push($giftData)) { $success++; } else { $failed++; } } return [ 'success' => $success, 'failed' => $failed, 'total' => count($giftList) ]; } /** * 处理队列 * @param int $batchSize 每次处理数量 * @return array 处理结果 */ public static function process($batchSize = 100) { $processed = 0; $success = 0; $failed = 0; $model = new GiveGiftBase(); $gift_model = new GiveGift(); Log::info("开始处理送礼队列,批量大小: {$batchSize}"); for ($i = 0; $i < $batchSize; $i++) { try { // 从队列取数据 // $data = Cache::handler()->rpop(self::QUEUE_KEY); $redis = new Redis(config('cache')); $data = $redis->handler()->rpop(self::QUEUE_KEY); if (!$data) { break; } $processed++; $queueData = json_decode($data, true); if (!$queueData || !isset($queueData['data'])) { $failed++; Log::error('队列数据格式错误: ' . $data); continue; } $giftData = $queueData['data']; $uuid = $queueData['uuid'] ?? 'unknown'; Log::info("处理送礼记录: {$uuid}, 用户: {$giftData['user_id']}"); // 验证数据完整性 if (empty($giftData['createtime'])) { $giftData['createtime'] = time(); } // 插入数据库 $result = $model->addGiftRecord($giftData); if ($result) { $success++; $gift_model->change_user_give_gift_log_callback($result, $giftData); Log::info("送礼记录处理成功: {$uuid}, ID: {$result}"); } else { $failed++; Log::error("送礼记录处理失败: {$uuid}, 错误: " . $model->getError()); // 重试逻辑 self::retry($data); } } catch (\Exception $e) { $failed++; Log::error('处理送礼队列失败:' . $e->getMessage()); } } $result = [ 'processed' => $processed, 'success' => $success, 'failed' => $failed ]; Log::info("送礼队列处理完成: " . json_encode($result)); return $result; } /** * 重试机制 * @param string $data 队列数据 */ protected static function retry($data) { $queueData = json_decode($data, true); if (!$queueData) { return; } $queueData['retry_count'] = ($queueData['retry_count'] ?? 0) + 1; if ($queueData['retry_count'] <= self::MAX_RETRY) { // 重新加入队列 $newData = json_encode($queueData, JSON_UNESCAPED_UNICODE); // Cache::handler()->lpush(self::QUEUE_KEY, $newData); $redis = new Redis(config('cache')); $redis->handler()->lpush(self::QUEUE_KEY, $newData); Log::info("重试送礼记录: {$queueData['uuid']}, 重试次数: {$queueData['retry_count']}"); } else { // 记录到失败队列 $queueData['fail_time'] = time(); $failedData = json_encode($queueData, JSON_UNESCAPED_UNICODE); // Cache::handler()->lpush(self::QUEUE_FAILED_KEY, $failedData); $redis = new Redis(config('cache')); $redis->handler()->lpush(self::QUEUE_FAILED_KEY, $failedData); Log::error("送礼记录重试超过最大次数: {$queueData['uuid']}, 数据: " . json_encode($queueData['data'])); db::name('redis_error')->insert( ['related_id' => 0,'content' => json_encode($queueData['data']),'remark' =>'送礼记录重试超过最大次数']); } } /** * 获取队列长度 * @return int */ public static function size() { try { // return Cache::handler()->llen(self::QUEUE_KEY); $redis = new Redis(config('cache')); return $redis->handler()->lLen(self::QUEUE_KEY); } catch (\Exception $e) { Log::error('获取队列长度失败: ' . $e->getMessage()); return 0; } } /** * 获取失败队列长度 * @return int */ public static function failedSize() { try { // return Cache::handler()->llen(self::QUEUE_FAILED_KEY); $redis = new Redis(config('cache')); return $redis->handler()->lLen(self::QUEUE_FAILED_KEY); } catch (\Exception $e) { Log::error('获取失败队列长度失败: ' . $e->getMessage()); return 0; } } /** * 获取队列统计信息 * @return array */ public static function stats() { return [ 'queue_size' => self::size(), 'failed_size' => self::failedSize(), 'status' => self::size() > 1000 ? '繁忙' : (self::size() > 100 ? '正常' : '空闲') ]; } /** * 清空队列 * @return bool */ public static function clear() { try { // Cache::handler()->del(self::QUEUE_KEY); $redis = new Redis(config('cache')); $redis->handler()->del(self::QUEUE_KEY); Log::info('送礼队列已清空'); return true; } catch (\Exception $e) { Log::error('清空队列失败: ' . $e->getMessage()); return false; } } /** * 清理失败队列 * @return bool */ public static function clearFailed() { try { // Cache::handler()->del(self::QUEUE_FAILED_KEY); $redis = new Redis(config('cache')); $redis->handler()->del(self::QUEUE_FAILED_KEY); Log::info('送礼失败队列已清空'); return true; } catch (\Exception $e) { Log::error('清空失败队列失败: ' . $e->getMessage()); return false; } } }