package services import ( "archive/zip" "context" "encoding/json" "fmt" "log" "os" "os/exec" "path/filepath" "strings" "sync" "time" "ems-backend/models" "github.com/google/uuid" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/robfig/cron/v3" "github.com/xuri/excelize/v2" ) type BackupService struct { Cron *cron.Cron Job cron.EntryID Lock sync.Mutex } var GlobalBackupService *BackupService func InitBackupService() { GlobalBackupService = &BackupService{ Cron: cron.New(), } GlobalBackupService.Cron.Start() GlobalBackupService.LoadAndSchedule() } func (s *BackupService) LoadAndSchedule() { config, err := s.GetConfig() if err != nil { log.Printf("Failed to load backup config: %v", err) return } if (config.Enabled || config.ResourceEnabled) && config.Time != "" { s.ScheduleBackup(config.Time) } else { s.StopSchedule() } } func (s *BackupService) StopSchedule() { if s.Job != 0 { s.Cron.Remove(s.Job) s.Job = 0 } } func (s *BackupService) ScheduleBackup(timeStr string) { s.StopSchedule() // timeStr is "HH:mm" // cron spec: "minute hour * * *" t, err := time.Parse("15:04", timeStr) if err != nil { log.Printf("Invalid time format: %v", err) return } spec := fmt.Sprintf("%d %d * * *", t.Minute(), t.Hour()) id, err := s.Cron.AddFunc(spec, func() { log.Println("Starting scheduled backup...") config, _ := s.GetConfig() if config.Enabled { s.PerformBackup() } if config.ResourceEnabled { s.PerformResourceBackup() } }) if err != nil { log.Printf("Failed to schedule backup: %v", err) return } s.Job = id log.Printf("Backup scheduled for %s (spec: %s)", timeStr, spec) } func (s *BackupService) GetConfig() (*models.BackupConfig, error) { var config models.BackupConfig // Defaults config.KeepDays = 7 var sysConfig models.SysConfig // Load Schedule if err := models.DB.Where("config_key = ?", "sys.backup.config").First(&sysConfig).Error; err == nil { json.Unmarshal([]byte(sysConfig.ConfigValue), &config) } return &config, nil } func (s *BackupService) SaveConfig(config *models.BackupConfig) error { data, _ := json.Marshal(config) // Upsert var sysConfig models.SysConfig err := models.DB.Where("config_key = ?", "sys.backup.config").First(&sysConfig).Error if err != nil { sysConfig = models.SysConfig{ ConfigKey: "sys.backup.config", ConfigValue: string(data), ConfigType: "Y", Remark: "Backup Configuration", } if err := models.DB.Create(&sysConfig).Error; err != nil { return err } } else { sysConfig.ConfigValue = string(data) if err := models.DB.Save(&sysConfig).Error; err != nil { return err } } // Reschedule if config.Enabled || config.ResourceEnabled { s.ScheduleBackup(config.Time) } else { s.StopSchedule() } return nil } func (s *BackupService) PerformBackup() { s.Lock.Lock() defer s.Lock.Unlock() startTime := time.Now() backupLog := models.BackupLog{ ID: uuid.New(), // Explicitly generate UUID StartTime: startTime, Status: "RUNNING", UploadStatus: "PENDING", } if err := models.DB.Create(&backupLog).Error; err != nil { log.Printf("[ERROR] Failed to create backup log: %v", err) // Proceeding might be futile if we can't update status, but let's try } else { log.Printf("[INFO] Created backup log: %s", backupLog.ID) } // 1. Prepare Paths backupDir := "backups" if _, err := os.Stat(backupDir); os.IsNotExist(err) { os.MkdirAll(backupDir, 0755) } fileName := fmt.Sprintf("ems_backup_%s.sql", startTime.Format("20060102_150405")) filePath := filepath.Join(backupDir, fileName) // 2. Run pg_dump // Env vars for pg_dump pgUser := os.Getenv("POSTGRES_USER") pgHost := os.Getenv("POSTGRES_HOST") pgPort := os.Getenv("POSTGRES_PORT") pgDB := os.Getenv("POSTGRES_DB") pgPass := os.Getenv("POSTGRES_PASSWORD") // Or DB_PASSWORD // Ensure vars are set if pgHost == "" { pgHost = "postgres" } if pgUser == "" { pgUser = "ems" } if pgDB == "" { pgDB = "ems" } if pgPort == "" { pgPort = "5432" } // Construct command cmd := exec.Command("pg_dump", "-h", pgHost, "-p", pgPort, "-U", pgUser, "-d", pgDB, "-f", filePath) cmd.Env = append(os.Environ(), fmt.Sprintf("PGPASSWORD=%s", pgPass)) log.Printf("Executing backup command: %s", cmd.String()) output, err := cmd.CombinedOutput() if err != nil { errMsg := fmt.Sprintf("pg_dump failed: %s, output: %s", err, string(output)) log.Println(errMsg) backupLog.Status = "FAILED" backupLog.Message = errMsg backupLog.EndTime = time.Now() models.DB.Save(&backupLog) return } // Check file size info, err := os.Stat(filePath) if err == nil { backupLog.Size = info.Size() } backupLog.FileName = fileName backupLog.Status = "SUCCESS" backupLog.Message = "Backup created locally." models.DB.Save(&backupLog) // 3. Upload to MinIO config, _ := s.GetConfig() if config.Endpoint != "" && config.Bucket != "" { err := s.uploadToMinIO(filePath, fileName, config) if err != nil { log.Printf("MinIO upload failed: %v", err) backupLog.Message += fmt.Sprintf(" Upload failed: %v", err) backupLog.UploadStatus = "FAILED" } else { backupLog.UploadStatus = "UPLOADED" backupLog.Message += " Uploaded to MinIO." // Optional: Remove local file after successful upload to save space // os.Remove(filePath) } models.DB.Save(&backupLog) } } // Helper function to clean endpoint func cleanEndpoint(endpoint string) (string, bool) { useSSL := false if strings.HasPrefix(endpoint, "https://") { useSSL = true endpoint = strings.TrimPrefix(endpoint, "https://") } else { endpoint = strings.TrimPrefix(endpoint, "http://") } // Remove trailing slash if any endpoint = strings.TrimRight(endpoint, "/") return endpoint, useSSL } func (s *BackupService) uploadToMinIO(filePath, objectName string, config *models.BackupConfig) error { ctx := context.Background() endpoint, useSSL := cleanEndpoint(config.Endpoint) log.Printf("[DEBUG] Connecting to MinIO: Endpoint=%s, UseSSL=%v, Bucket=%s", endpoint, useSSL, config.Bucket) // Initialize minio client object. minioClient, err := minio.New(endpoint, &minio.Options{ Creds: credentials.NewStaticV4(config.AccessKey, config.SecretKey, ""), Secure: useSSL, }) if err != nil { log.Printf("[ERROR] Failed to create MinIO client: %v", err) return err } // Make bucket if not exists exists, err := minioClient.BucketExists(ctx, config.Bucket) if err != nil { log.Printf("[ERROR] Failed to check bucket existence: %v", err) return err } if !exists { log.Printf("[INFO] Bucket %s does not exist, creating it...", config.Bucket) err = minioClient.MakeBucket(ctx, config.Bucket, minio.MakeBucketOptions{}) if err != nil { log.Printf("[ERROR] Failed to create bucket: %v", err) return err } } // Upload the file log.Printf("[INFO] Uploading file %s to bucket %s object %s", filePath, config.Bucket, objectName) info, err := minioClient.FPutObject(ctx, config.Bucket, objectName, filePath, minio.PutObjectOptions{}) if err != nil { log.Printf("[ERROR] Failed to upload object: %v", err) return err } log.Printf("[SUCCESS] Successfully uploaded %s of size %d", objectName, info.Size) return nil } func (s *BackupService) GetLogs() ([]models.BackupLog, error) { logs := make([]models.BackupLog, 0) err := models.DB.Order("start_time desc").Limit(20).Find(&logs).Error if err != nil { log.Printf("[ERROR] GetLogs error: %v", err) } else { log.Printf("[DEBUG] GetLogs found %d records", len(logs)) } return logs, err } // TestConnection tests the MinIO connection with the provided config func (s *BackupService) TestConnection(config *models.BackupConfig) error { ctx := context.Background() endpoint, useSSL := cleanEndpoint(config.Endpoint) log.Printf("[DEBUG] Testing MinIO Connection: Endpoint=%s, UseSSL=%v, AccessKey=***%s", endpoint, useSSL, getLast4(config.AccessKey)) // Initialize minio client object. minioClient, err := minio.New(endpoint, &minio.Options{ Creds: credentials.NewStaticV4(config.AccessKey, config.SecretKey, ""), Secure: useSSL, }) if err != nil { log.Printf("[ERROR] Failed to create MinIO client during test: %v", err) return fmt.Errorf("failed to create minio client: %v", err) } // Try to list buckets to verify credentials and endpoint log.Println("[DEBUG] Listing buckets to verify connection...") buckets, err := minioClient.ListBuckets(ctx) if err != nil { log.Printf("[ERROR] Failed to list buckets: %v", err) return fmt.Errorf("failed to connect to minio: %v", err) } log.Printf("[DEBUG] List buckets success. Found %d buckets.", len(buckets)) // Also check if specific bucket exists if provided if config.Bucket != "" { log.Printf("[DEBUG] Checking specific bucket: %s", config.Bucket) exists, err := minioClient.BucketExists(ctx, config.Bucket) if err != nil { log.Printf("[ERROR] Failed to check bucket existence: %v", err) return fmt.Errorf("failed to check bucket existence: %v", err) } if !exists { log.Printf("[WARN] Bucket %s does not exist.", config.Bucket) // For test connection, maybe just returning "success but bucket missing" or "success" is enough. // Just return nil (success) if we can talk to MinIO. } else { log.Printf("[DEBUG] Bucket %s exists.", config.Bucket) } } return nil } func getLast4(s string) string { if len(s) > 4 { return s[len(s)-4:] } return s } // DownloadFile retrieves a file from MinIO and returns it as a stream func (s *BackupService) DownloadFile(logID string) (*minio.Object, string, error) { // 1. Get Log to find filename var backupLog models.BackupLog if err := models.DB.Where("id = ?", logID).First(&backupLog).Error; err != nil { return nil, "", fmt.Errorf("backup log not found: %v", err) } // 2. Get Config to find bucket/creds config, err := s.GetConfig() if err != nil { return nil, "", err } if config.Endpoint == "" || config.Bucket == "" { return nil, "", fmt.Errorf("minio not configured") } endpoint, useSSL := cleanEndpoint(config.Endpoint) // 3. Init Client minioClient, err := minio.New(endpoint, &minio.Options{ Creds: credentials.NewStaticV4(config.AccessKey, config.SecretKey, ""), Secure: useSSL, }) if err != nil { return nil, "", err } // 4. Get Object object, err := minioClient.GetObject(context.Background(), config.Bucket, backupLog.FileName, minio.GetObjectOptions{}) if err != nil { return nil, "", err } // Verify object exists/readable _, err = object.Stat() if err != nil { return nil, "", fmt.Errorf("failed to stat object in minio: %v", err) } return object, backupLog.FileName, nil } func (s *BackupService) PerformResourceBackup() { s.Lock.Lock() defer s.Lock.Unlock() startTime := time.Now() // 使用 _resource_ 前缀区分数据库备份 fileName := fmt.Sprintf("ems_resource_%s.zip", startTime.Format("20060102_150405")) backupLog := models.BackupLog{ ID: uuid.New(), StartTime: startTime, Status: "RUNNING", UploadStatus: "PENDING", FileName: fileName, } if err := models.DB.Create(&backupLog).Error; err != nil { log.Printf("[ERROR] Failed to create resource backup log: %v", err) } // 1. 查询数据 (资源与物联中心的四个部分) var sources []models.IntegrationSource var devices []models.Device var templates []models.EquipmentCleaningFormulaTemplate var locations []models.SysLocation // 查询数据 models.DB.Find(&sources) models.DB.Find(&devices) models.DB.Find(&templates) models.DB.Find(&locations) // 填充 DeviceCount (IntegrationSource 需要此字段用于导出,虽然后端导入时不一定用) for i := range sources { var count int64 models.DB.Model(&models.Device{}).Where("source_id = ?", sources[i].ID).Count(&count) sources[i].DeviceCount = count } // 2. 创建 Zip 文件 backupDir := "backups" if _, err := os.Stat(backupDir); os.IsNotExist(err) { os.MkdirAll(backupDir, 0755) } filePath := filepath.Join(backupDir, fileName) zipFile, err := os.Create(filePath) if err != nil { s.logBackupError(&backupLog, fmt.Sprintf("Failed to create zip file: %v", err)) return } // defer will be called when function returns defer zipFile.Close() archive := zip.NewWriter(zipFile) defer archive.Close() // 辅助函数: 写入 Excel 到 Zip writeExcel := func(filename string, f *excelize.File) error { w, err := archive.Create(filename) if err != nil { return err } // WriteTo 将 Excel 文件内容写入 Zip 中的文件流 if _, err := f.WriteTo(w); err != nil { return err } return nil } // 3. 生成 Excel 文件并写入 Zip // 3.1 Integration Sources (数据源) // 格式参考前端: 名称, 驱动类型, 配置信息, 状态, 设备数量 fSources := excelize.NewFile() fSources.SetSheetName("Sheet1", "数据源列表") headers := []string{"名称", "驱动类型", "配置信息", "状态", "设备数量"} for i, h := range headers { cell, _ := excelize.CoordinatesToCellName(i+1, 1) fSources.SetCellValue("数据源列表", cell, h) } for i, src := range sources { row := i + 2 status := "离线" if src.Status == "ONLINE" { status = "在线" } fSources.SetCellValue("数据源列表", fmt.Sprintf("A%d", row), src.Name) fSources.SetCellValue("数据源列表", fmt.Sprintf("B%d", row), src.DriverType) fSources.SetCellValue("数据源列表", fmt.Sprintf("C%d", row), string(src.Config)) // JSON String fSources.SetCellValue("数据源列表", fmt.Sprintf("D%d", row), status) fSources.SetCellValue("数据源列表", fmt.Sprintf("E%d", row), src.DeviceCount) } if err := writeExcel("integration_sources.xlsx", fSources); err != nil { s.logBackupError(&backupLog, fmt.Sprintf("Failed to write sources excel: %v", err)) return } // 3.2 Devices (设备) // 格式参考前端: Name, DeviceType, ExternalID, LocationID, Status, SourceID, AttributeMapping fDevices := excelize.NewFile() fDevices.SetSheetName("Sheet1", "Devices") headersDev := []string{"Name", "DeviceType", "ExternalID", "LocationID", "Status", "SourceID", "AttributeMapping"} for i, h := range headersDev { cell, _ := excelize.CoordinatesToCellName(i+1, 1) fDevices.SetCellValue("Devices", cell, h) } for i, d := range devices { row := i + 2 // AttributeMapping 是 JSONB 类型,转为字符串 fDevices.SetCellValue("Devices", fmt.Sprintf("A%d", row), d.Name) fDevices.SetCellValue("Devices", fmt.Sprintf("B%d", row), d.DeviceType) fDevices.SetCellValue("Devices", fmt.Sprintf("C%d", row), d.ExternalID) fDevices.SetCellValue("Devices", fmt.Sprintf("D%d", row), d.LocationID) fDevices.SetCellValue("Devices", fmt.Sprintf("E%d", row), d.Status) fDevices.SetCellValue("Devices", fmt.Sprintf("F%d", row), d.SourceID) fDevices.SetCellValue("Devices", fmt.Sprintf("G%d", row), string(d.AttributeMapping)) } if err := writeExcel("devices.xlsx", fDevices); err != nil { s.logBackupError(&backupLog, fmt.Sprintf("Failed to write devices excel: %v", err)) return } // 3.3 Cleaning Templates (清洗模版) // 格式参考前端: 模板名称, 设备类型, 描述, 清洗公式配置 fTpl := excelize.NewFile() fTpl.SetSheetName("Sheet1", "Sheet1") headersTpl := []string{"模板名称", "设备类型", "描述", "清洗公式配置"} for i, h := range headersTpl { cell, _ := excelize.CoordinatesToCellName(i+1, 1) fTpl.SetCellValue("Sheet1", cell, h) } for i, t := range templates { row := i + 2 fTpl.SetCellValue("Sheet1", fmt.Sprintf("A%d", row), t.Name) fTpl.SetCellValue("Sheet1", fmt.Sprintf("B%d", row), t.EquipmentType) fTpl.SetCellValue("Sheet1", fmt.Sprintf("C%d", row), t.Description) fTpl.SetCellValue("Sheet1", fmt.Sprintf("D%d", row), string(t.Formula)) } if err := writeExcel("cleaning_templates.xlsx", fTpl); err != nil { s.logBackupError(&backupLog, fmt.Sprintf("Failed to write templates excel: %v", err)) return } // 3.4 Locations (位置) // 前端暂无明确导出,使用通用字段 fLoc := excelize.NewFile() fLoc.SetSheetName("Sheet1", "Locations") headersLoc := []string{"ID", "Name", "ParentID", "Type"} for i, h := range headersLoc { cell, _ := excelize.CoordinatesToCellName(i+1, 1) fLoc.SetCellValue("Locations", cell, h) } for i, l := range locations { row := i + 2 fLoc.SetCellValue("Locations", fmt.Sprintf("A%d", row), l.ID) fLoc.SetCellValue("Locations", fmt.Sprintf("B%d", row), l.Name) fLoc.SetCellValue("Locations", fmt.Sprintf("C%d", row), l.ParentID) fLoc.SetCellValue("Locations", fmt.Sprintf("D%d", row), l.Type) } if err := writeExcel("sys_locations.xlsx", fLoc); err != nil { s.logBackupError(&backupLog, fmt.Sprintf("Failed to write locations excel: %v", err)) return } // 关闭 Zip Writer 以确保所有数据写入文件 if err := archive.Close(); err != nil { s.logBackupError(&backupLog, fmt.Sprintf("Failed to close archive: %v", err)) return } // Note: zipFile.Close() is deferred // 3. 更新日志并上传 info, err := os.Stat(filePath) if err == nil { backupLog.Size = info.Size() } backupLog.FilePath = fileName // 保存文件名作为相对路径 backupLog.Status = "SUCCESS" backupLog.Message = "Resource backup (Excel) created successfully." backupLog.EndTime = time.Now() models.DB.Save(&backupLog) // 4. 上传到 MinIO (复用现有的上传逻辑) config, _ := s.GetConfig() if config.Endpoint != "" && config.Bucket != "" { err := s.uploadToMinIO(filePath, fileName, config) if err != nil { log.Printf("MinIO upload failed: %v", err) backupLog.Message += fmt.Sprintf(" Upload failed: %v", err) backupLog.UploadStatus = "FAILED" } else { backupLog.UploadStatus = "UPLOADED" backupLog.Message += " Uploaded to MinIO." } models.DB.Save(&backupLog) } } // 辅助方法: 记录错误 func (s *BackupService) logBackupError(log *models.BackupLog, msg string) { log.Status = "FAILED" log.Message = msg log.EndTime = time.Now() models.DB.Save(log) }