setName('giftwall:migration') ->setDescription('迁移送礼数据到礼物墙'); } protected function execute(Input $input, Output $output) { $output->writeln('开始迁移送礼数据到礼物墙...'); // 第一步:创建临时表存储汇总数据 $this->createTempTable($output); // 第二步:分批处理原始数据 $this->processInBatches($output); // 第三步:合并临时表数据到目标表 $this->mergeToTargetTable($output); // 第四步:清理临时表 $this->cleanupTempTable($output); $output->writeln('迁移完成!'); } /** * 创建临时汇总表 */ protected function createTempTable(Output $output) { $output->writeln('创建临时汇总表...'); $sql = "CREATE TABLE IF NOT EXISTS `temp_gift_wall_summary` ( `user_id` int NOT NULL COMMENT '收礼用户', `gift_id` int NOT NULL COMMENT '礼物ID', `total_count` int NOT NULL DEFAULT '0' COMMENT '收到此礼物的总数', `give_user_ids` text COMMENT '送礼用户ID,逗号分隔', PRIMARY KEY (`user_id`, `gift_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4"; Db::execute($sql); // 清空临时表(如果存在旧数据) Db::execute('TRUNCATE TABLE temp_gift_wall_summary'); $output->writeln('临时表创建完成'); } /** * 分批处理原始数据 */ protected function processInBatches(Output $output) { $output->writeln('开始分批处理原始数据...'); // 获取总记录数 $total = Db::table('fa_vs_give_gift')->count(); $output->writeln("总记录数: {$total}"); // 计算总批次数 $totalBatches = ceil($total / $this->batchSize); for ($batch = 0; $batch < $totalBatches; $batch++) { $output->writeln("正在处理第 " . ($batch + 1) . "/{$totalBatches} 批..."); $offset = $batch * $this->batchSize; // 分批查询原始数据 $records = Db::table('fa_vs_give_gift') ->field('user_id, gift_id, gift_user, number') ->limit($offset, $this->batchSize) ->select(); if (empty($records)) { continue; } // 处理这批数据 $this->processBatchData($records); // 释放内存 unset($records); // 记录进度 $processed = min(($batch + 1) * $this->batchSize, $total); $output->writeln("已处理: {$processed}/{$total}"); // 小休息一下,避免服务器压力过大 usleep(100000); // 0.1秒 } } /** * 处理一批数据 */ protected function processBatchData($records) { // 按收礼用户和礼物ID分组 $groupedData = []; foreach ($records as $record) { $key = $record['gift_user'] . '_' . $record['gift_id']; if (!isset($groupedData[$key])) { $groupedData[$key] = [ 'user_id' => $record['gift_user'], 'gift_id' => $record['gift_id'], 'total_count' => 0, 'give_users' => [] ]; } // 累加礼物数量 $groupedData[$key]['total_count'] += $record['number']; // 记录送礼用户(去重会在最后处理) $groupedData[$key]['give_users'][] = $record['user_id']; } // 批量插入或更新到临时表 $this->batchUpdateTempTable($groupedData); } /** * 批量更新临时表 */ protected function batchUpdateTempTable($groupedData) { if (empty($groupedData)) { return; } // 开启事务 Db::startTrans(); try { foreach ($groupedData as $data) { // 去重送礼用户ID $uniqueGiveUsers = array_unique($data['give_users']); $giveUserIdsStr = implode(',', $uniqueGiveUsers); // 检查是否已存在该记录 $exists = Db::table('temp_gift_wall_summary') ->where('user_id', $data['user_id']) ->where('gift_id', $data['gift_id']) ->find(); if ($exists) { // 更新现有记录 $newCount = $exists['total_count'] + $data['total_count']; // 合并送礼用户ID(去重) $existingUsers = explode(',', $exists['give_user_ids']); $allUsers = array_merge($existingUsers, $uniqueGiveUsers); $allUsers = array_unique($allUsers); $mergedGiveUserIds = implode(',', $allUsers); Db::table('temp_gift_wall_summary') ->where('user_id', $data['user_id']) ->where('gift_id', $data['gift_id']) ->update([ 'total_count' => $newCount, 'give_user_ids' => $mergedGiveUserIds ]); } else { // 插入新记录 Db::table('temp_gift_wall_summary') ->insert([ 'user_id' => $data['user_id'], 'gift_id' => $data['gift_id'], 'total_count' => $data['total_count'], 'give_user_ids' => $giveUserIdsStr ]); } } Db::commit(); } catch (\Exception $e) { Db::rollback(); throw $e; } } /** * 合并临时表数据到目标表 */ protected function mergeToTargetTable(Output $output) { $output->writeln('开始合并数据到目标表...'); // 获取临时表总记录数 $total = Db::table('temp_gift_wall_summary')->count(); $output->writeln("需要合并的记录数: {$total}"); // 分批合并 $batchSize = 5000; $totalBatches = ceil($total / $batchSize); for ($batch = 0; $batch < $totalBatches; $batch++) { $output->writeln("正在合并第 " . ($batch + 1) . "/{$totalBatches} 批..."); $offset = $batch * $batchSize; // 分批获取临时表数据 $records = Db::table('temp_gift_wall_summary') ->limit($offset, $batchSize) ->select(); if (empty($records)) { continue; } // 批量插入或更新到目标表 foreach ($records as $record) { // 检查目标表是否已存在该记录 $exists = Db::table('fa_user_gift_wall') ->where('user_id', $record['user_id']) ->where('gift_id', $record['gift_id']) ->find(); if ($exists) { // 更新现有记录 $newCount = $exists['count'] + $record['total_count']; // 合并送礼用户ID(去重) $existingUsers = explode(',', $exists['give_user_ids']); $newUsers = explode(',', $record['give_user_ids']); $allUsers = array_merge($existingUsers, $newUsers); $allUsers = array_unique($allUsers); $mergedGiveUserIds = implode(',', $allUsers); Db::table('fa_user_gift_wall') ->where('id', $exists['id']) ->update([ 'count' => $newCount, 'give_user_ids' => $mergedGiveUserIds, 'updatetime' => time() ]); } else { // 插入新记录 Db::table('fa_user_gift_wall') ->insert([ 'user_id' => $record['user_id'], 'gift_id' => $record['gift_id'], 'count' => $record['total_count'], 'give_user_ids' => $record['give_user_ids'], 'updatetime' => time() ]); } } // 释放内存 unset($records); // 小休息一下 usleep(50000); // 0.05秒 } $output->writeln('数据合并完成'); } /** * 清理临时表 */ protected function cleanupTempTable(Output $output) { $output->writeln('清理临时表...'); Db::execute('DROP TABLE IF EXISTS temp_gift_wall_summary'); $output->writeln('临时表已清理'); } }