import os import glob import json import logging from datetime import datetime from typing import Set from app.core.minio import minio_storage from app.core.scheduler import scheduler from apscheduler.triggers.cron import CronTrigger logger = logging.getLogger(__name__) # 备份目录路径(项目根目录的 backups 文件夹) # 在 Docker 容器中,需要挂载这个目录 BACKUP_DIR = os.getenv("DB_BACKUP_DIR", "/app/backups") # 记录已上传文件的文件路径 UPLOADED_RECORD_FILE = os.path.join(BACKUP_DIR, ".uploaded_backups.json") class DatabaseBackupUploadService: @staticmethod def _load_uploaded_records() -> Set[str]: """加载已上传的文件记录""" if not os.path.exists(UPLOADED_RECORD_FILE): return set() try: with open(UPLOADED_RECORD_FILE, 'r', encoding='utf-8') as f: data = json.load(f) return set(data.get('uploaded_files', [])) except Exception as e: logger.error(f"Failed to load uploaded records: {e}") return set() @staticmethod def _save_uploaded_record(filename: str): """保存已上传的文件记录""" records = DatabaseBackupUploadService._load_uploaded_records() records.add(filename) try: os.makedirs(os.path.dirname(UPLOADED_RECORD_FILE), exist_ok=True) with open(UPLOADED_RECORD_FILE, 'w', encoding='utf-8') as f: json.dump({'uploaded_files': list(records)}, f, indent=2) except Exception as e: logger.error(f"Failed to save uploaded record: {e}") @staticmethod def _get_backup_files() -> list: """获取备份目录中的所有 .sql.gz 文件""" if not os.path.exists(BACKUP_DIR): logger.warning(f"Backup directory does not exist: {BACKUP_DIR}") return [] pattern = os.path.join(BACKUP_DIR, "*.sql.gz") files = glob.glob(pattern) # 按修改时间排序,最新的在前 files.sort(key=os.path.getmtime, reverse=True) return files @staticmethod def _is_file_stable(file_path: str, wait_seconds: int = 60) -> bool: """ 检查文件是否稳定(不再变化) 如果文件在 wait_seconds 秒内没有变化,认为备份已完成 """ try: stat = os.stat(file_path) current_time = datetime.now().timestamp() file_mtime = stat.st_mtime # 如果文件最后修改时间距离现在超过 wait_seconds 秒,认为稳定 return (current_time - file_mtime) >= wait_seconds except Exception as e: logger.error(f"Failed to check file stability: {e}") return False @staticmethod def upload_new_backups(): """检查并上传新的备份文件""" logger.info("Starting database backup upload check...") if not minio_storage.client: logger.error("MinIO client not available, skipping upload") return uploaded_records = DatabaseBackupUploadService._load_uploaded_records() backup_files = DatabaseBackupUploadService._get_backup_files() uploaded_count = 0 skipped_count = 0 failed_count = 0 for file_path in backup_files: filename = os.path.basename(file_path) # 跳过已上传的文件 if filename in uploaded_records: skipped_count += 1 continue # 检查文件是否稳定(备份是否完成) if not DatabaseBackupUploadService._is_file_stable(file_path): logger.info(f"File {filename} is still being written, skipping...") continue # 上传文件 try: logger.info(f"Uploading database backup: {filename}") object_name = minio_storage.upload_db_backup(file_path, filename) DatabaseBackupUploadService._save_uploaded_record(filename) uploaded_count += 1 logger.info(f"Successfully uploaded {filename} to MinIO: {object_name}") except Exception as e: logger.error(f"Failed to upload {filename}: {e}") failed_count += 1 logger.info( f"Backup upload check completed. " f"Uploaded: {uploaded_count}, Skipped: {skipped_count}, Failed: {failed_count}" ) @staticmethod def init_scheduler(): """初始化定时任务,每小时检查一次""" # 每小时的第5分钟执行(避免与备份时间冲突) trigger = CronTrigger(minute=5) scheduler.add_job( DatabaseBackupUploadService.upload_new_backups, trigger=trigger, id="db_backup_upload_job", replace_existing=True ) logger.info("Database backup upload scheduler initialized (runs every hour at :05)")