分表数据
This commit is contained in:
225
application/common/command/GiftMigration.php
Normal file
225
application/common/command/GiftMigration.php
Normal file
@@ -0,0 +1,225 @@
|
||||
<?php
|
||||
// application/common/command/GiftMigration.php
|
||||
|
||||
namespace app\common\command;
|
||||
|
||||
use think\console\Command;
|
||||
use think\console\Input;
|
||||
use think\console\Output;
|
||||
use think\console\input\Argument;
|
||||
use think\console\input\Option;
|
||||
use think\Log;
|
||||
use think\Db;
|
||||
use app\common\library\GiftTableManager;
|
||||
use app\common\library\GiftDataMigrator;
|
||||
|
||||
class GiftMigration extends Command
|
||||
{
|
||||
protected function configure()
|
||||
{
|
||||
$this->setName('gift:migration')
|
||||
->setDescription('送礼数据分表迁移工具')
|
||||
->addArgument('action', Argument::REQUIRED, '操作类型: init-tables, migrate, progress, verify, clean')
|
||||
->addOption('month', 'm', Option::VALUE_OPTIONAL, '指定年月,格式: 202401')
|
||||
->addOption('batch-size', 'b', Option::VALUE_OPTIONAL, '每批迁移数量', 1000)
|
||||
->addOption('start-id', 's', Option::VALUE_OPTIONAL, '起始ID', 0)
|
||||
->addOption('months', null, Option::VALUE_OPTIONAL, '创建几个月分表', 12);
|
||||
}
|
||||
|
||||
protected function execute(Input $input, Output $output)
|
||||
{
|
||||
$action = $input->getArgument('action');
|
||||
$month = $input->getOption('month');
|
||||
$batchSize = $input->getOption('batch-size');
|
||||
$startId = $input->getOption('start-id');
|
||||
$months = $input->getOption('months');
|
||||
|
||||
Log::info("执行送礼数据迁移命令,动作: {$action}, 月份: {$month}");
|
||||
|
||||
switch ($action) {
|
||||
case 'init-tables':
|
||||
$this->initTables($output, $months);
|
||||
break;
|
||||
|
||||
case 'migrate':
|
||||
$this->migrateData($output, $month, $batchSize, $startId);
|
||||
break;
|
||||
|
||||
case 'progress':
|
||||
$this->showProgress($output);
|
||||
break;
|
||||
|
||||
case 'verify':
|
||||
$this->verifyData($output, $month);
|
||||
break;
|
||||
|
||||
case 'clean':
|
||||
$this->cleanOldData($output);
|
||||
break;
|
||||
|
||||
default:
|
||||
$output->writeln('<error>未知操作类型</error>');
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 初始化分表
|
||||
*/
|
||||
protected function initTables(Output $output, $months)
|
||||
{
|
||||
$output->writeln('开始初始化分表...');
|
||||
|
||||
$results = GiftTableManager::initTables(null, $months);
|
||||
|
||||
$successCount = 0;
|
||||
$failCount = 0;
|
||||
|
||||
foreach ($results as $tableMonth => $success) {
|
||||
if ($success) {
|
||||
$output->writeln("<info>创建表 fa_vs_give_gift_{$tableMonth} 成功</info>");
|
||||
$successCount++;
|
||||
} else {
|
||||
$output->writeln("<error>创建表 fa_vs_give_gift_{$tableMonth} 失败</error>");
|
||||
$failCount++;
|
||||
}
|
||||
}
|
||||
|
||||
$output->writeln("分表初始化完成,成功: {$successCount}, 失败: {$failCount}");
|
||||
}
|
||||
|
||||
/**
|
||||
* 迁移数据
|
||||
*/
|
||||
protected function migrateData(Output $output, $month, $batchSize, $startId)
|
||||
{
|
||||
if ($month) {
|
||||
// 迁移指定月份
|
||||
$output->writeln("开始迁移 {$month} 数据...");
|
||||
$result = GiftDataMigrator::migrateMonthData($month, $batchSize, $startId);
|
||||
|
||||
if ($result['success']) {
|
||||
$output->writeln("<info>{$month} 数据迁移成功</info>");
|
||||
$output->writeln("迁移总数: " . $result['total']);
|
||||
$output->writeln("最后ID: " . $result['last_id']);
|
||||
} else {
|
||||
$output->writeln("<error>{$month} 数据迁移失败: " . $result['message'] . "</error>");
|
||||
}
|
||||
} else {
|
||||
// 迁移所有数据
|
||||
$output->writeln('开始迁移所有数据...');
|
||||
$result = GiftDataMigrator::migrateAllData($batchSize);
|
||||
|
||||
if ($result['success']) {
|
||||
$output->writeln("<info>所有数据迁移成功</info>");
|
||||
|
||||
// 显示每个月份的结果
|
||||
foreach ($result['results'] as $month => $monthResult) {
|
||||
$status = $monthResult['success'] ? '成功' : '失败';
|
||||
$output->writeln("{$month}: {$status}, 数量: " . ($monthResult['total'] ?? 0));
|
||||
}
|
||||
} else {
|
||||
$output->writeln("<error>数据迁移失败: " . $result['message'] . "</error>");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 显示迁移进度
|
||||
*/
|
||||
protected function showProgress(Output $output)
|
||||
{
|
||||
$progress = GiftDataMigrator::getMigrationProgress();
|
||||
|
||||
$output->writeln("数据迁移进度:");
|
||||
$output->writeln("原始表数据量: " . $progress['total']);
|
||||
$output->writeln("已迁移数据量: " . $progress['migrated']);
|
||||
$output->writeln("迁移进度: " . $progress['progress'] . "%");
|
||||
$output->writeln("剩余数据: " . $progress['remaining']);
|
||||
|
||||
if ($progress['total'] > 0 && $progress['migrated'] > 0) {
|
||||
$output->writeln("<info>迁移进度正常</info>");
|
||||
} else {
|
||||
$output->writeln("<comment>暂无迁移进度数据</comment>");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 验证数据
|
||||
*/
|
||||
protected function verifyData(Output $output, $month)
|
||||
{
|
||||
if (!$month) {
|
||||
// 验证所有月份
|
||||
$tables = GiftTableManager::getAllTables();
|
||||
foreach ($tables as $table) {
|
||||
$month = substr($table['table_name'], -6);
|
||||
$this->verifySingleMonth($output, $month);
|
||||
}
|
||||
} else {
|
||||
$this->verifySingleMonth($output, $month);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 验证单个月份数据
|
||||
*/
|
||||
protected function verifySingleMonth(Output $output, $month)
|
||||
{
|
||||
$output->writeln("验证 {$month} 数据一致性...");
|
||||
$result = GiftDataMigrator::verifyMigration($month);
|
||||
|
||||
$output->writeln("原始表数据量: " . $result['source_count']);
|
||||
$output->writeln("目标表数据量: " . $result['target_count']);
|
||||
$output->writeln("数据量匹配: " . ($result['count_match'] ? '<info>是</info>' : '<error>否</error>'));
|
||||
|
||||
$output->writeln("原始表总价值: " . $result['source_total_price']);
|
||||
$output->writeln("目标表总价值: " . $result['target_total_price']);
|
||||
$output->writeln("总价值匹配: " . ($result['price_match'] ? '<info>是</info>' : '<error>否</error>'));
|
||||
|
||||
if ($result['count_match'] && $result['price_match']) {
|
||||
$output->writeln("<info>{$month} 数据验证通过</info>");
|
||||
} else {
|
||||
$output->writeln("<error>{$month} 数据验证失败</error>");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 清理旧数据(迁移完成后)
|
||||
*/
|
||||
protected function cleanOldData(Output $output)
|
||||
{
|
||||
$output->writeln("警告:此操作将清理原始表数据,请确认迁移已完成且验证通过!");
|
||||
$output->writeln("输入 'yes' 继续: ");
|
||||
|
||||
$handle = fopen("php://stdin", "r");
|
||||
$line = fgets($handle);
|
||||
fclose($handle);
|
||||
|
||||
if (trim($line) != 'yes') {
|
||||
$output->writeln("<comment>操作取消</comment>");
|
||||
return;
|
||||
}
|
||||
|
||||
// 备份原始表
|
||||
$output->writeln("正在备份原始表...");
|
||||
try {
|
||||
Db::execute("DROP TABLE IF EXISTS `fa_vs_give_gift_old`");
|
||||
Db::execute("CREATE TABLE `fa_vs_give_gift_old` LIKE `fa_vs_give_gift`");
|
||||
Db::execute("INSERT INTO `fa_vs_give_gift_old` SELECT * FROM `fa_vs_give_gift`");
|
||||
$output->writeln("<info>原始表备份完成</info>");
|
||||
} catch (\Exception $e) {
|
||||
$output->writeln("<error>备份原始表失败: " . $e->getMessage() . "</error>");
|
||||
return;
|
||||
}
|
||||
|
||||
// 清空原始表
|
||||
$output->writeln("正在清空原始表...");
|
||||
try {
|
||||
Db::execute("TRUNCATE TABLE `fa_vs_give_gift`");
|
||||
$output->writeln("<info>原始表已清空</info>");
|
||||
} catch (\Exception $e) {
|
||||
$output->writeln("<error>清空原始表失败: " . $e->getMessage() . "</error>");
|
||||
}
|
||||
}
|
||||
}
|
||||
111
application/common/command/ProcessGiftQueue.php
Normal file
111
application/common/command/ProcessGiftQueue.php
Normal file
@@ -0,0 +1,111 @@
|
||||
<?php
|
||||
// application/common/command/ProcessGiftQueue.php
|
||||
|
||||
namespace app\common\command;
|
||||
|
||||
use think\console\Command;
|
||||
use think\console\Input;
|
||||
use think\console\Output;
|
||||
use think\console\input\Argument;
|
||||
use think\console\input\Option;
|
||||
use think\facade\Log;
|
||||
use app\common\library\GiftQueue;
|
||||
|
||||
class ProcessGiftQueue extends Command
|
||||
{
|
||||
protected function configure()
|
||||
{
|
||||
$this->setName('process:gift_queue')
|
||||
->setDescription('处理送礼队列')
|
||||
->addArgument('mode', Argument::OPTIONAL, '运行模式: once, daemon', 'once')
|
||||
->addOption('batch-size', 'b', Option::VALUE_OPTIONAL, '每次处理数量', 100)
|
||||
->addOption('sleep-time', 's', Option::VALUE_OPTIONAL, '处理间隔(毫秒)', 100)
|
||||
->addOption('max-runtime', 't', Option::VALUE_OPTIONAL, '最大运行时间(秒)', 0);
|
||||
}
|
||||
|
||||
protected function execute(Input $input, Output $output)
|
||||
{
|
||||
$mode = $input->getArgument('mode');
|
||||
$batchSize = $input->getOption('batch-size');
|
||||
$sleepTime = $input->getOption('sleep-time');
|
||||
$maxRuntime = $input->getOption('max-runtime');
|
||||
|
||||
$output->writeln("开始处理送礼队列...");
|
||||
$output->writeln("模式: {$mode}, 批量大小: {$batchSize}, 间隔: {$sleepTime}ms");
|
||||
|
||||
$startTime = time();
|
||||
$totalProcessed = 0;
|
||||
$totalSuccess = 0;
|
||||
$totalFailed = 0;
|
||||
|
||||
// 检查Redis连接
|
||||
try {
|
||||
$queueSize = GiftQueue::size();
|
||||
$output->writeln("当前队列大小: {$queueSize}");
|
||||
} catch (\Exception $e) {
|
||||
$output->writeln("<error>Redis连接失败: " . $e->getMessage() . "</error>");
|
||||
return;
|
||||
}
|
||||
|
||||
do {
|
||||
$queueSize = GiftQueue::size();
|
||||
|
||||
if ($queueSize == 0) {
|
||||
if ($mode == 'once') {
|
||||
$output->writeln("队列为空,处理完成");
|
||||
break;
|
||||
} else {
|
||||
$output->writeln("队列为空,等待新数据...");
|
||||
sleep(5);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
$output->writeln("当前队列大小: {$queueSize}");
|
||||
|
||||
// 处理队列
|
||||
$result = GiftQueue::process($batchSize);
|
||||
|
||||
$totalProcessed += $result['processed'];
|
||||
$totalSuccess += $result['success'];
|
||||
$totalFailed += $result['failed'];
|
||||
|
||||
$output->writeln(sprintf(
|
||||
"处理完成:已处理 %d,成功 %d,失败 %d",
|
||||
$result['processed'],
|
||||
$result['success'],
|
||||
$result['failed']
|
||||
));
|
||||
|
||||
// 休眠
|
||||
usleep($sleepTime * 1000);
|
||||
|
||||
// 检查运行时间限制
|
||||
if ($maxRuntime > 0 && (time() - $startTime) > $maxRuntime) {
|
||||
$output->writeln("达到最大运行时间,停止处理");
|
||||
break;
|
||||
}
|
||||
|
||||
// 单次模式处理完成后退出
|
||||
if ($mode == 'once' && $result['processed'] < $batchSize) {
|
||||
$output->writeln("队列处理完成");
|
||||
break;
|
||||
}
|
||||
|
||||
} while ($mode == 'daemon');
|
||||
|
||||
$output->writeln("队列处理统计:");
|
||||
$output->writeln("总计处理: {$totalProcessed}");
|
||||
$output->writeln("成功: {$totalSuccess}");
|
||||
$output->writeln("失败: {$totalFailed}");
|
||||
$output->writeln("剩余队列大小: " . GiftQueue::size());
|
||||
|
||||
Log::info(sprintf(
|
||||
"送礼队列处理统计: 总计 %d, 成功 %d, 失败 %d, 剩余 %d",
|
||||
$totalProcessed,
|
||||
$totalSuccess,
|
||||
$totalFailed,
|
||||
GiftQueue::size()
|
||||
));
|
||||
}
|
||||
}
|
||||
64
application/common/command/QueueMonitor.php
Normal file
64
application/common/command/QueueMonitor.php
Normal file
@@ -0,0 +1,64 @@
|
||||
<?php
|
||||
// application/common/command/QueueMonitor.php
|
||||
|
||||
namespace app\common\command;
|
||||
|
||||
use think\console\Command;
|
||||
use think\console\Input;
|
||||
use think\console\Output;
|
||||
use think\console\input\Option;
|
||||
use app\common\library\GiftQueue;
|
||||
|
||||
class QueueMonitor extends Command
|
||||
{
|
||||
protected function configure()
|
||||
{
|
||||
$this->setName('queue:monitor')
|
||||
->setDescription('监控队列状态')
|
||||
->addOption('interval', 'i', Option::VALUE_OPTIONAL, '监控间隔(秒)', 10);
|
||||
}
|
||||
|
||||
protected function execute(Input $input, Output $output)
|
||||
{
|
||||
$interval = $input->getOption('interval');
|
||||
|
||||
$output->writeln("开始监控送礼队列状态,间隔: {$interval}秒");
|
||||
$output->writeln("按 Ctrl+C 停止监控");
|
||||
|
||||
$history = [];
|
||||
$maxHistory = 60;
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
$stats = GiftQueue::stats();
|
||||
$timestamp = date('H:i:s');
|
||||
|
||||
$output->writeln("[{$timestamp}] 队列大小: {$stats['queue_size']}, 失败队列: {$stats['failed_size']}, 状态: {$stats['status']}");
|
||||
|
||||
// 记录历史数据
|
||||
$history[] = [
|
||||
'time' => $timestamp,
|
||||
'size' => $stats['queue_size'],
|
||||
'status' => $stats['status']
|
||||
];
|
||||
|
||||
if (count($history) > $maxHistory) {
|
||||
array_shift($history);
|
||||
}
|
||||
|
||||
// 检查队列是否积压
|
||||
if ($stats['queue_size'] > 1000) {
|
||||
$output->writeln("<error>警告:队列积压严重!</error>");
|
||||
} elseif ($stats['queue_size'] > 100) {
|
||||
$output->writeln("<comment>注意:队列正在积压</comment>");
|
||||
}
|
||||
|
||||
sleep($interval);
|
||||
|
||||
} catch (\Exception $e) {
|
||||
$output->writeln("<error>监控异常: " . $e->getMessage() . "</error>");
|
||||
sleep($interval);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user