| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- import os
- import glob
- import json
- import logging
- from datetime import datetime
- from typing import Set
- from app.core.minio import minio_storage
- from app.core.scheduler import scheduler
- from apscheduler.triggers.cron import CronTrigger
- logger = logging.getLogger(__name__)
- # 备份目录路径(项目根目录的 backups 文件夹)
- # 在 Docker 容器中,需要挂载这个目录
- BACKUP_DIR = os.getenv("DB_BACKUP_DIR", "/app/backups")
- # 记录已上传文件的文件路径
- UPLOADED_RECORD_FILE = os.path.join(BACKUP_DIR, ".uploaded_backups.json")
- class DatabaseBackupUploadService:
- @staticmethod
- def _load_uploaded_records() -> Set[str]:
- """加载已上传的文件记录"""
- if not os.path.exists(UPLOADED_RECORD_FILE):
- return set()
-
- try:
- with open(UPLOADED_RECORD_FILE, 'r', encoding='utf-8') as f:
- data = json.load(f)
- return set(data.get('uploaded_files', []))
- except Exception as e:
- logger.error(f"Failed to load uploaded records: {e}")
- return set()
-
- @staticmethod
- def _save_uploaded_record(filename: str):
- """保存已上传的文件记录"""
- records = DatabaseBackupUploadService._load_uploaded_records()
- records.add(filename)
-
- try:
- os.makedirs(os.path.dirname(UPLOADED_RECORD_FILE), exist_ok=True)
- with open(UPLOADED_RECORD_FILE, 'w', encoding='utf-8') as f:
- json.dump({'uploaded_files': list(records)}, f, indent=2)
- except Exception as e:
- logger.error(f"Failed to save uploaded record: {e}")
-
- @staticmethod
- def _get_backup_files() -> list:
- """获取备份目录中的所有 .sql.gz 文件"""
- if not os.path.exists(BACKUP_DIR):
- logger.warning(f"Backup directory does not exist: {BACKUP_DIR}")
- return []
-
- pattern = os.path.join(BACKUP_DIR, "*.sql.gz")
- files = glob.glob(pattern)
- # 按修改时间排序,最新的在前
- files.sort(key=os.path.getmtime, reverse=True)
- return files
-
- @staticmethod
- def _is_file_stable(file_path: str, wait_seconds: int = 60) -> bool:
- """
- 检查文件是否稳定(不再变化)
- 如果文件在 wait_seconds 秒内没有变化,认为备份已完成
- """
- try:
- stat = os.stat(file_path)
- current_time = datetime.now().timestamp()
- file_mtime = stat.st_mtime
-
- # 如果文件最后修改时间距离现在超过 wait_seconds 秒,认为稳定
- return (current_time - file_mtime) >= wait_seconds
- except Exception as e:
- logger.error(f"Failed to check file stability: {e}")
- return False
-
- @staticmethod
- def upload_new_backups():
- """检查并上传新的备份文件"""
- logger.info("Starting database backup upload check...")
-
- if not minio_storage.client:
- logger.error("MinIO client not available, skipping upload")
- return
-
- uploaded_records = DatabaseBackupUploadService._load_uploaded_records()
- backup_files = DatabaseBackupUploadService._get_backup_files()
-
- uploaded_count = 0
- skipped_count = 0
- failed_count = 0
-
- for file_path in backup_files:
- filename = os.path.basename(file_path)
-
- # 跳过已上传的文件
- if filename in uploaded_records:
- skipped_count += 1
- continue
-
- # 检查文件是否稳定(备份是否完成)
- if not DatabaseBackupUploadService._is_file_stable(file_path):
- logger.info(f"File {filename} is still being written, skipping...")
- continue
-
- # 上传文件
- try:
- logger.info(f"Uploading database backup: {filename}")
- object_name = minio_storage.upload_db_backup(file_path, filename)
- DatabaseBackupUploadService._save_uploaded_record(filename)
- uploaded_count += 1
- logger.info(f"Successfully uploaded {filename} to MinIO: {object_name}")
- except Exception as e:
- logger.error(f"Failed to upload {filename}: {e}")
- failed_count += 1
-
- logger.info(
- f"Backup upload check completed. "
- f"Uploaded: {uploaded_count}, Skipped: {skipped_count}, Failed: {failed_count}"
- )
-
- @staticmethod
- def init_scheduler():
- """初始化定时任务,每小时检查一次"""
- # 每小时的第5分钟执行(避免与备份时间冲突)
- trigger = CronTrigger(minute=5)
-
- scheduler.add_job(
- DatabaseBackupUploadService.upload_new_backups,
- trigger=trigger,
- id="db_backup_upload_job",
- replace_existing=True
- )
- logger.info("Database backup upload scheduler initialized (runs every hour at :05)")
|