金币扣除使用行级锁 和礼物墙处理
This commit is contained in:
@@ -237,7 +237,13 @@ class UserWallet extends Model
|
||||
//减少用户金币类型
|
||||
$out_types = [4,7,10,17,20,24,25,32,42,50,56,59];
|
||||
|
||||
$user_coin = Db::name('user_wallet_coin')->where(['user_id' => $user_id])->value('coin');
|
||||
$user_wallet = Db::name('user_wallet_coin')->where(['user_id' => $user_id])->lock(true)->value('coin');
|
||||
if (!$user_wallet) {
|
||||
return false;
|
||||
}
|
||||
$user_coin = $user_wallet['coin'];
|
||||
$original_coin = $user_coin; // 保存原始值用于日志
|
||||
|
||||
if(in_array($change_type,$in_types)){
|
||||
$update_coin = $user_coin + $money;
|
||||
}elseif(in_array($change_type,$out_types)){
|
||||
@@ -257,7 +263,7 @@ class UserWallet extends Model
|
||||
'user_id' => $user_id,
|
||||
'room_id' => $room_id,
|
||||
'coin' => $money,
|
||||
'before' => $user_coin,
|
||||
'before' => $original_coin,
|
||||
'after' => $update_coin,
|
||||
'change_type' => $change_type,
|
||||
'remarks' => $remarks,
|
||||
|
||||
@@ -22,5 +22,6 @@ return [
|
||||
'process:gift_queue' => 'app\common\command\ProcessGiftQueue',
|
||||
'queue:monitor' => 'app\common\command\QueueMonitor',
|
||||
'migrate:money_log' => 'app\common\command\MigrateMoneyLog',
|
||||
'giftwall:migration' => 'app\common\command\GiftWallMigration',
|
||||
|
||||
];
|
||||
|
||||
287
application/common/command/GiftWallMigration.php
Normal file
287
application/common/command/GiftWallMigration.php
Normal file
@@ -0,0 +1,287 @@
|
||||
<?php
|
||||
|
||||
namespace app\common\command;
|
||||
|
||||
use think\console\Command;
|
||||
use think\console\Input;
|
||||
use think\console\Output;
|
||||
use think\Db;
|
||||
|
||||
class GiftWallMigration extends Command
|
||||
{
|
||||
// 每批处理的数据量
|
||||
protected $batchSize = 10000;
|
||||
|
||||
protected function configure()
|
||||
{
|
||||
$this->setName('giftwall:migration')
|
||||
->setDescription('迁移送礼数据到礼物墙');
|
||||
}
|
||||
|
||||
protected function execute(Input $input, Output $output)
|
||||
{
|
||||
$output->writeln('开始迁移送礼数据到礼物墙...');
|
||||
|
||||
// 第一步:创建临时表存储汇总数据
|
||||
$this->createTempTable($output);
|
||||
|
||||
// 第二步:分批处理原始数据
|
||||
$this->processInBatches($output);
|
||||
|
||||
// 第三步:合并临时表数据到目标表
|
||||
$this->mergeToTargetTable($output);
|
||||
|
||||
// 第四步:清理临时表
|
||||
$this->cleanupTempTable($output);
|
||||
|
||||
$output->writeln('迁移完成!');
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建临时汇总表
|
||||
*/
|
||||
protected function createTempTable(Output $output)
|
||||
{
|
||||
$output->writeln('创建临时汇总表...');
|
||||
|
||||
$sql = "CREATE TABLE IF NOT EXISTS `temp_gift_wall_summary` (
|
||||
`user_id` int NOT NULL COMMENT '收礼用户',
|
||||
`gift_id` int NOT NULL COMMENT '礼物ID',
|
||||
`total_count` int NOT NULL DEFAULT '0' COMMENT '收到此礼物的总数',
|
||||
`give_user_ids` text COMMENT '送礼用户ID,逗号分隔',
|
||||
PRIMARY KEY (`user_id`, `gift_id`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
|
||||
|
||||
Db::execute($sql);
|
||||
|
||||
// 清空临时表(如果存在旧数据)
|
||||
Db::execute('TRUNCATE TABLE temp_gift_wall_summary');
|
||||
|
||||
$output->writeln('临时表创建完成');
|
||||
}
|
||||
|
||||
/**
|
||||
* 分批处理原始数据
|
||||
*/
|
||||
protected function processInBatches(Output $output)
|
||||
{
|
||||
$output->writeln('开始分批处理原始数据...');
|
||||
|
||||
// 获取总记录数
|
||||
$total = Db::table('fa_vs_give_gift')->count();
|
||||
$output->writeln("总记录数: {$total}");
|
||||
|
||||
// 计算总批次数
|
||||
$totalBatches = ceil($total / $this->batchSize);
|
||||
|
||||
for ($batch = 0; $batch < $totalBatches; $batch++) {
|
||||
$output->writeln("正在处理第 " . ($batch + 1) . "/{$totalBatches} 批...");
|
||||
|
||||
$offset = $batch * $this->batchSize;
|
||||
|
||||
// 分批查询原始数据
|
||||
$records = Db::table('fa_vs_give_gift')
|
||||
->field('user_id, gift_id, gift_user, number')
|
||||
->limit($offset, $this->batchSize)
|
||||
->select();
|
||||
|
||||
if (empty($records)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 处理这批数据
|
||||
$this->processBatchData($records);
|
||||
|
||||
// 释放内存
|
||||
unset($records);
|
||||
|
||||
// 记录进度
|
||||
$processed = min(($batch + 1) * $this->batchSize, $total);
|
||||
$output->writeln("已处理: {$processed}/{$total}");
|
||||
|
||||
// 小休息一下,避免服务器压力过大
|
||||
usleep(100000); // 0.1秒
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理一批数据
|
||||
*/
|
||||
protected function processBatchData($records)
|
||||
{
|
||||
// 按收礼用户和礼物ID分组
|
||||
$groupedData = [];
|
||||
|
||||
foreach ($records as $record) {
|
||||
$key = $record['gift_user'] . '_' . $record['gift_id'];
|
||||
|
||||
if (!isset($groupedData[$key])) {
|
||||
$groupedData[$key] = [
|
||||
'user_id' => $record['gift_user'],
|
||||
'gift_id' => $record['gift_id'],
|
||||
'total_count' => 0,
|
||||
'give_users' => []
|
||||
];
|
||||
}
|
||||
|
||||
// 累加礼物数量
|
||||
$groupedData[$key]['total_count'] += $record['number'];
|
||||
|
||||
// 记录送礼用户(去重会在最后处理)
|
||||
$groupedData[$key]['give_users'][] = $record['user_id'];
|
||||
}
|
||||
|
||||
// 批量插入或更新到临时表
|
||||
$this->batchUpdateTempTable($groupedData);
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量更新临时表
|
||||
*/
|
||||
protected function batchUpdateTempTable($groupedData)
|
||||
{
|
||||
if (empty($groupedData)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 开启事务
|
||||
Db::startTrans();
|
||||
|
||||
try {
|
||||
foreach ($groupedData as $data) {
|
||||
// 去重送礼用户ID
|
||||
$uniqueGiveUsers = array_unique($data['give_users']);
|
||||
$giveUserIdsStr = implode(',', $uniqueGiveUsers);
|
||||
|
||||
// 检查是否已存在该记录
|
||||
$exists = Db::table('temp_gift_wall_summary')
|
||||
->where('user_id', $data['user_id'])
|
||||
->where('gift_id', $data['gift_id'])
|
||||
->find();
|
||||
|
||||
if ($exists) {
|
||||
// 更新现有记录
|
||||
$newCount = $exists['total_count'] + $data['total_count'];
|
||||
|
||||
// 合并送礼用户ID(去重)
|
||||
$existingUsers = explode(',', $exists['give_user_ids']);
|
||||
$allUsers = array_merge($existingUsers, $uniqueGiveUsers);
|
||||
$allUsers = array_unique($allUsers);
|
||||
$mergedGiveUserIds = implode(',', $allUsers);
|
||||
|
||||
Db::table('temp_gift_wall_summary')
|
||||
->where('user_id', $data['user_id'])
|
||||
->where('gift_id', $data['gift_id'])
|
||||
->update([
|
||||
'total_count' => $newCount,
|
||||
'give_user_ids' => $mergedGiveUserIds
|
||||
]);
|
||||
} else {
|
||||
// 插入新记录
|
||||
Db::table('temp_gift_wall_summary')
|
||||
->insert([
|
||||
'user_id' => $data['user_id'],
|
||||
'gift_id' => $data['gift_id'],
|
||||
'total_count' => $data['total_count'],
|
||||
'give_user_ids' => $giveUserIdsStr
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
Db::commit();
|
||||
} catch (\Exception $e) {
|
||||
Db::rollback();
|
||||
throw $e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 合并临时表数据到目标表
|
||||
*/
|
||||
protected function mergeToTargetTable(Output $output)
|
||||
{
|
||||
$output->writeln('开始合并数据到目标表...');
|
||||
|
||||
// 获取临时表总记录数
|
||||
$total = Db::table('temp_gift_wall_summary')->count();
|
||||
$output->writeln("需要合并的记录数: {$total}");
|
||||
|
||||
// 分批合并
|
||||
$batchSize = 5000;
|
||||
$totalBatches = ceil($total / $batchSize);
|
||||
|
||||
for ($batch = 0; $batch < $totalBatches; $batch++) {
|
||||
$output->writeln("正在合并第 " . ($batch + 1) . "/{$totalBatches} 批...");
|
||||
|
||||
$offset = $batch * $batchSize;
|
||||
|
||||
// 分批获取临时表数据
|
||||
$records = Db::table('temp_gift_wall_summary')
|
||||
->limit($offset, $batchSize)
|
||||
->select();
|
||||
|
||||
if (empty($records)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 批量插入或更新到目标表
|
||||
foreach ($records as $record) {
|
||||
// 检查目标表是否已存在该记录
|
||||
$exists = Db::table('fa_user_gift_wall')
|
||||
->where('user_id', $record['user_id'])
|
||||
->where('gift_id', $record['gift_id'])
|
||||
->find();
|
||||
|
||||
if ($exists) {
|
||||
// 更新现有记录
|
||||
$newCount = $exists['count'] + $record['total_count'];
|
||||
|
||||
// 合并送礼用户ID(去重)
|
||||
$existingUsers = explode(',', $exists['give_user_ids']);
|
||||
$newUsers = explode(',', $record['give_user_ids']);
|
||||
$allUsers = array_merge($existingUsers, $newUsers);
|
||||
$allUsers = array_unique($allUsers);
|
||||
$mergedGiveUserIds = implode(',', $allUsers);
|
||||
|
||||
Db::table('fa_user_gift_wall')
|
||||
->where('id', $exists['id'])
|
||||
->update([
|
||||
'count' => $newCount,
|
||||
'give_user_ids' => $mergedGiveUserIds,
|
||||
'updatetime' => time()
|
||||
]);
|
||||
} else {
|
||||
// 插入新记录
|
||||
Db::table('fa_user_gift_wall')
|
||||
->insert([
|
||||
'user_id' => $record['user_id'],
|
||||
'gift_id' => $record['gift_id'],
|
||||
'count' => $record['total_count'],
|
||||
'give_user_ids' => $record['give_user_ids'],
|
||||
'updatetime' => time()
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
// 释放内存
|
||||
unset($records);
|
||||
|
||||
// 小休息一下
|
||||
usleep(50000); // 0.05秒
|
||||
}
|
||||
|
||||
$output->writeln('数据合并完成');
|
||||
}
|
||||
|
||||
/**
|
||||
* 清理临时表
|
||||
*/
|
||||
protected function cleanupTempTable(Output $output)
|
||||
{
|
||||
$output->writeln('清理临时表...');
|
||||
|
||||
Db::execute('DROP TABLE IF EXISTS temp_gift_wall_summary');
|
||||
|
||||
$output->writeln('临时表已清理');
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user