287 lines
9.1 KiB
PHP
287 lines
9.1 KiB
PHP
<?php
|
||
|
||
namespace app\common\command;
|
||
|
||
use think\console\Command;
|
||
use think\console\Input;
|
||
use think\console\Output;
|
||
use think\Db;
|
||
|
||
class GiftWallMigration extends Command
|
||
{
|
||
// 每批处理的数据量
|
||
protected $batchSize = 10000;
|
||
|
||
protected function configure()
|
||
{
|
||
$this->setName('giftwall:migration')
|
||
->setDescription('迁移送礼数据到礼物墙');
|
||
}
|
||
|
||
protected function execute(Input $input, Output $output)
|
||
{
|
||
$output->writeln('开始迁移送礼数据到礼物墙...');
|
||
|
||
// 第一步:创建临时表存储汇总数据
|
||
$this->createTempTable($output);
|
||
|
||
// 第二步:分批处理原始数据
|
||
$this->processInBatches($output);
|
||
|
||
// 第三步:合并临时表数据到目标表
|
||
$this->mergeToTargetTable($output);
|
||
|
||
// 第四步:清理临时表
|
||
$this->cleanupTempTable($output);
|
||
|
||
$output->writeln('迁移完成!');
|
||
}
|
||
|
||
/**
|
||
* 创建临时汇总表
|
||
*/
|
||
protected function createTempTable(Output $output)
|
||
{
|
||
$output->writeln('创建临时汇总表...');
|
||
|
||
$sql = "CREATE TABLE IF NOT EXISTS `temp_gift_wall_summary` (
|
||
`user_id` int NOT NULL COMMENT '收礼用户',
|
||
`gift_id` int NOT NULL COMMENT '礼物ID',
|
||
`total_count` int NOT NULL DEFAULT '0' COMMENT '收到此礼物的总数',
|
||
`give_user_ids` text COMMENT '送礼用户ID,逗号分隔',
|
||
PRIMARY KEY (`user_id`, `gift_id`)
|
||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
|
||
|
||
Db::execute($sql);
|
||
|
||
// 清空临时表(如果存在旧数据)
|
||
Db::execute('TRUNCATE TABLE temp_gift_wall_summary');
|
||
|
||
$output->writeln('临时表创建完成');
|
||
}
|
||
|
||
/**
|
||
* 分批处理原始数据
|
||
*/
|
||
protected function processInBatches(Output $output)
|
||
{
|
||
$output->writeln('开始分批处理原始数据...');
|
||
|
||
// 获取总记录数
|
||
$total = Db::table('fa_vs_give_gift')->count();
|
||
$output->writeln("总记录数: {$total}");
|
||
|
||
// 计算总批次数
|
||
$totalBatches = ceil($total / $this->batchSize);
|
||
|
||
for ($batch = 0; $batch < $totalBatches; $batch++) {
|
||
$output->writeln("正在处理第 " . ($batch + 1) . "/{$totalBatches} 批...");
|
||
|
||
$offset = $batch * $this->batchSize;
|
||
|
||
// 分批查询原始数据
|
||
$records = Db::table('fa_vs_give_gift')
|
||
->field('user_id, gift_id, gift_user, number')
|
||
->limit($offset, $this->batchSize)
|
||
->select();
|
||
|
||
if (empty($records)) {
|
||
continue;
|
||
}
|
||
|
||
// 处理这批数据
|
||
$this->processBatchData($records);
|
||
|
||
// 释放内存
|
||
unset($records);
|
||
|
||
// 记录进度
|
||
$processed = min(($batch + 1) * $this->batchSize, $total);
|
||
$output->writeln("已处理: {$processed}/{$total}");
|
||
|
||
// 小休息一下,避免服务器压力过大
|
||
usleep(100000); // 0.1秒
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 处理一批数据
|
||
*/
|
||
protected function processBatchData($records)
|
||
{
|
||
// 按收礼用户和礼物ID分组
|
||
$groupedData = [];
|
||
|
||
foreach ($records as $record) {
|
||
$key = $record['gift_user'] . '_' . $record['gift_id'];
|
||
|
||
if (!isset($groupedData[$key])) {
|
||
$groupedData[$key] = [
|
||
'user_id' => $record['gift_user'],
|
||
'gift_id' => $record['gift_id'],
|
||
'total_count' => 0,
|
||
'give_users' => []
|
||
];
|
||
}
|
||
|
||
// 累加礼物数量
|
||
$groupedData[$key]['total_count'] += $record['number'];
|
||
|
||
// 记录送礼用户(去重会在最后处理)
|
||
$groupedData[$key]['give_users'][] = $record['user_id'];
|
||
}
|
||
|
||
// 批量插入或更新到临时表
|
||
$this->batchUpdateTempTable($groupedData);
|
||
}
|
||
|
||
/**
|
||
* 批量更新临时表
|
||
*/
|
||
protected function batchUpdateTempTable($groupedData)
|
||
{
|
||
if (empty($groupedData)) {
|
||
return;
|
||
}
|
||
|
||
// 开启事务
|
||
Db::startTrans();
|
||
|
||
try {
|
||
foreach ($groupedData as $data) {
|
||
// 去重送礼用户ID
|
||
$uniqueGiveUsers = array_unique($data['give_users']);
|
||
$giveUserIdsStr = implode(',', $uniqueGiveUsers);
|
||
|
||
// 检查是否已存在该记录
|
||
$exists = Db::table('temp_gift_wall_summary')
|
||
->where('user_id', $data['user_id'])
|
||
->where('gift_id', $data['gift_id'])
|
||
->find();
|
||
|
||
if ($exists) {
|
||
// 更新现有记录
|
||
$newCount = $exists['total_count'] + $data['total_count'];
|
||
|
||
// 合并送礼用户ID(去重)
|
||
$existingUsers = explode(',', $exists['give_user_ids']);
|
||
$allUsers = array_merge($existingUsers, $uniqueGiveUsers);
|
||
$allUsers = array_unique($allUsers);
|
||
$mergedGiveUserIds = implode(',', $allUsers);
|
||
|
||
Db::table('temp_gift_wall_summary')
|
||
->where('user_id', $data['user_id'])
|
||
->where('gift_id', $data['gift_id'])
|
||
->update([
|
||
'total_count' => $newCount,
|
||
'give_user_ids' => $mergedGiveUserIds
|
||
]);
|
||
} else {
|
||
// 插入新记录
|
||
Db::table('temp_gift_wall_summary')
|
||
->insert([
|
||
'user_id' => $data['user_id'],
|
||
'gift_id' => $data['gift_id'],
|
||
'total_count' => $data['total_count'],
|
||
'give_user_ids' => $giveUserIdsStr
|
||
]);
|
||
}
|
||
}
|
||
|
||
Db::commit();
|
||
} catch (\Exception $e) {
|
||
Db::rollback();
|
||
throw $e;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 合并临时表数据到目标表
|
||
*/
|
||
protected function mergeToTargetTable(Output $output)
|
||
{
|
||
$output->writeln('开始合并数据到目标表...');
|
||
|
||
// 获取临时表总记录数
|
||
$total = Db::table('temp_gift_wall_summary')->count();
|
||
$output->writeln("需要合并的记录数: {$total}");
|
||
|
||
// 分批合并
|
||
$batchSize = 5000;
|
||
$totalBatches = ceil($total / $batchSize);
|
||
|
||
for ($batch = 0; $batch < $totalBatches; $batch++) {
|
||
$output->writeln("正在合并第 " . ($batch + 1) . "/{$totalBatches} 批...");
|
||
|
||
$offset = $batch * $batchSize;
|
||
|
||
// 分批获取临时表数据
|
||
$records = Db::table('temp_gift_wall_summary')
|
||
->limit($offset, $batchSize)
|
||
->select();
|
||
|
||
if (empty($records)) {
|
||
continue;
|
||
}
|
||
|
||
// 批量插入或更新到目标表
|
||
foreach ($records as $record) {
|
||
// 检查目标表是否已存在该记录
|
||
$exists = Db::table('fa_user_gift_wall')
|
||
->where('user_id', $record['user_id'])
|
||
->where('gift_id', $record['gift_id'])
|
||
->find();
|
||
|
||
if ($exists) {
|
||
// 更新现有记录
|
||
$newCount = $exists['count'] + $record['total_count'];
|
||
|
||
// 合并送礼用户ID(去重)
|
||
$existingUsers = explode(',', $exists['give_user_ids']);
|
||
$newUsers = explode(',', $record['give_user_ids']);
|
||
$allUsers = array_merge($existingUsers, $newUsers);
|
||
$allUsers = array_unique($allUsers);
|
||
$mergedGiveUserIds = implode(',', $allUsers);
|
||
|
||
Db::table('fa_user_gift_wall')
|
||
->where('id', $exists['id'])
|
||
->update([
|
||
'count' => $newCount,
|
||
'give_user_ids' => $mergedGiveUserIds,
|
||
'updatetime' => time()
|
||
]);
|
||
} else {
|
||
// 插入新记录
|
||
Db::table('fa_user_gift_wall')
|
||
->insert([
|
||
'user_id' => $record['user_id'],
|
||
'gift_id' => $record['gift_id'],
|
||
'count' => $record['total_count'],
|
||
'give_user_ids' => $record['give_user_ids'],
|
||
'updatetime' => time()
|
||
]);
|
||
}
|
||
}
|
||
|
||
// 释放内存
|
||
unset($records);
|
||
|
||
// 小休息一下
|
||
usleep(50000); // 0.05秒
|
||
}
|
||
|
||
$output->writeln('数据合并完成');
|
||
}
|
||
|
||
/**
|
||
* 清理临时表
|
||
*/
|
||
protected function cleanupTempTable(Output $output)
|
||
{
|
||
$output->writeln('清理临时表...');
|
||
|
||
Db::execute('DROP TABLE IF EXISTS temp_gift_wall_summary');
|
||
|
||
$output->writeln('临时表已清理');
|
||
}
|
||
} |