db_backup_upload_service.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. import os
  2. import glob
  3. import json
  4. import logging
  5. from datetime import datetime
  6. from typing import Set
  7. from app.core.minio import minio_storage
  8. from app.core.scheduler import scheduler
  9. from apscheduler.triggers.cron import CronTrigger
  10. logger = logging.getLogger(__name__)
  11. # 备份目录路径(项目根目录的 backups 文件夹)
  12. # 在 Docker 容器中,需要挂载这个目录
  13. BACKUP_DIR = os.getenv("DB_BACKUP_DIR", "/app/backups")
  14. # 记录已上传文件的文件路径
  15. UPLOADED_RECORD_FILE = os.path.join(BACKUP_DIR, ".uploaded_backups.json")
  16. class DatabaseBackupUploadService:
  17. @staticmethod
  18. def _load_uploaded_records() -> Set[str]:
  19. """加载已上传的文件记录"""
  20. if not os.path.exists(UPLOADED_RECORD_FILE):
  21. return set()
  22. try:
  23. with open(UPLOADED_RECORD_FILE, 'r', encoding='utf-8') as f:
  24. data = json.load(f)
  25. return set(data.get('uploaded_files', []))
  26. except Exception as e:
  27. logger.error(f"Failed to load uploaded records: {e}")
  28. return set()
  29. @staticmethod
  30. def _save_uploaded_record(filename: str):
  31. """保存已上传的文件记录"""
  32. records = DatabaseBackupUploadService._load_uploaded_records()
  33. records.add(filename)
  34. try:
  35. os.makedirs(os.path.dirname(UPLOADED_RECORD_FILE), exist_ok=True)
  36. with open(UPLOADED_RECORD_FILE, 'w', encoding='utf-8') as f:
  37. json.dump({'uploaded_files': list(records)}, f, indent=2)
  38. except Exception as e:
  39. logger.error(f"Failed to save uploaded record: {e}")
  40. @staticmethod
  41. def _get_backup_files() -> list:
  42. """获取备份目录中的所有 .sql.gz 文件"""
  43. if not os.path.exists(BACKUP_DIR):
  44. logger.warning(f"Backup directory does not exist: {BACKUP_DIR}")
  45. return []
  46. pattern = os.path.join(BACKUP_DIR, "*.sql.gz")
  47. files = glob.glob(pattern)
  48. # 按修改时间排序,最新的在前
  49. files.sort(key=os.path.getmtime, reverse=True)
  50. return files
  51. @staticmethod
  52. def _is_file_stable(file_path: str, wait_seconds: int = 60) -> bool:
  53. """
  54. 检查文件是否稳定(不再变化)
  55. 如果文件在 wait_seconds 秒内没有变化,认为备份已完成
  56. """
  57. try:
  58. stat = os.stat(file_path)
  59. current_time = datetime.now().timestamp()
  60. file_mtime = stat.st_mtime
  61. # 如果文件最后修改时间距离现在超过 wait_seconds 秒,认为稳定
  62. return (current_time - file_mtime) >= wait_seconds
  63. except Exception as e:
  64. logger.error(f"Failed to check file stability: {e}")
  65. return False
  66. @staticmethod
  67. def upload_new_backups():
  68. """检查并上传新的备份文件"""
  69. logger.info("Starting database backup upload check...")
  70. if not minio_storage.client:
  71. logger.error("MinIO client not available, skipping upload")
  72. return
  73. uploaded_records = DatabaseBackupUploadService._load_uploaded_records()
  74. backup_files = DatabaseBackupUploadService._get_backup_files()
  75. uploaded_count = 0
  76. skipped_count = 0
  77. failed_count = 0
  78. for file_path in backup_files:
  79. filename = os.path.basename(file_path)
  80. # 跳过已上传的文件
  81. if filename in uploaded_records:
  82. skipped_count += 1
  83. continue
  84. # 检查文件是否稳定(备份是否完成)
  85. if not DatabaseBackupUploadService._is_file_stable(file_path):
  86. logger.info(f"File {filename} is still being written, skipping...")
  87. continue
  88. # 上传文件
  89. try:
  90. logger.info(f"Uploading database backup: {filename}")
  91. object_name = minio_storage.upload_db_backup(file_path, filename)
  92. DatabaseBackupUploadService._save_uploaded_record(filename)
  93. uploaded_count += 1
  94. logger.info(f"Successfully uploaded {filename} to MinIO: {object_name}")
  95. except Exception as e:
  96. logger.error(f"Failed to upload {filename}: {e}")
  97. failed_count += 1
  98. logger.info(
  99. f"Backup upload check completed. "
  100. f"Uploaded: {uploaded_count}, Skipped: {skipped_count}, Failed: {failed_count}"
  101. )
  102. @staticmethod
  103. def init_scheduler():
  104. """初始化定时任务,每小时检查一次"""
  105. # 每小时的第5分钟执行(避免与备份时间冲突)
  106. trigger = CronTrigger(minute=5)
  107. scheduler.add_job(
  108. DatabaseBackupUploadService.upload_new_backups,
  109. trigger=trigger,
  110. id="db_backup_upload_job",
  111. replace_existing=True
  112. )
  113. logger.info("Database backup upload scheduler initialized (runs every hour at :05)")