| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590 |
- package services
- import (
- "archive/zip"
- "context"
- "encoding/json"
- "fmt"
- "log"
- "os"
- "os/exec"
- "path/filepath"
- "strings"
- "sync"
- "time"
- "ems-backend/models"
- "github.com/google/uuid"
- "github.com/minio/minio-go/v7"
- "github.com/minio/minio-go/v7/pkg/credentials"
- "github.com/robfig/cron/v3"
- "github.com/xuri/excelize/v2"
- )
- type BackupService struct {
- Cron *cron.Cron
- Job cron.EntryID
- Lock sync.Mutex
- }
- var GlobalBackupService *BackupService
- func InitBackupService() {
- GlobalBackupService = &BackupService{
- Cron: cron.New(),
- }
- GlobalBackupService.Cron.Start()
- GlobalBackupService.LoadAndSchedule()
- }
- func (s *BackupService) LoadAndSchedule() {
- config, err := s.GetConfig()
- if err != nil {
- log.Printf("Failed to load backup config: %v", err)
- return
- }
- if (config.Enabled || config.ResourceEnabled) && config.Time != "" {
- s.ScheduleBackup(config.Time)
- } else {
- s.StopSchedule()
- }
- }
- func (s *BackupService) StopSchedule() {
- if s.Job != 0 {
- s.Cron.Remove(s.Job)
- s.Job = 0
- }
- }
- func (s *BackupService) ScheduleBackup(timeStr string) {
- s.StopSchedule()
- // timeStr is "HH:mm"
- // cron spec: "minute hour * * *"
- t, err := time.Parse("15:04", timeStr)
- if err != nil {
- log.Printf("Invalid time format: %v", err)
- return
- }
- spec := fmt.Sprintf("%d %d * * *", t.Minute(), t.Hour())
- id, err := s.Cron.AddFunc(spec, func() {
- log.Println("Starting scheduled backup...")
- config, _ := s.GetConfig()
- if config.Enabled {
- s.PerformBackup()
- }
- if config.ResourceEnabled {
- s.PerformResourceBackup()
- }
- })
- if err != nil {
- log.Printf("Failed to schedule backup: %v", err)
- return
- }
- s.Job = id
- log.Printf("Backup scheduled for %s (spec: %s)", timeStr, spec)
- }
- func (s *BackupService) GetConfig() (*models.BackupConfig, error) {
- var config models.BackupConfig
- // Defaults
- config.KeepDays = 7
- var sysConfig models.SysConfig
- // Load Schedule
- if err := models.DB.Where("config_key = ?", "sys.backup.config").First(&sysConfig).Error; err == nil {
- json.Unmarshal([]byte(sysConfig.ConfigValue), &config)
- }
- return &config, nil
- }
- func (s *BackupService) SaveConfig(config *models.BackupConfig) error {
- data, _ := json.Marshal(config)
-
- // Upsert
- var sysConfig models.SysConfig
- err := models.DB.Where("config_key = ?", "sys.backup.config").First(&sysConfig).Error
- if err != nil {
- sysConfig = models.SysConfig{
- ConfigKey: "sys.backup.config",
- ConfigValue: string(data),
- ConfigType: "Y",
- Remark: "Backup Configuration",
- }
- if err := models.DB.Create(&sysConfig).Error; err != nil {
- return err
- }
- } else {
- sysConfig.ConfigValue = string(data)
- if err := models.DB.Save(&sysConfig).Error; err != nil {
- return err
- }
- }
- // Reschedule
- if config.Enabled || config.ResourceEnabled {
- s.ScheduleBackup(config.Time)
- } else {
- s.StopSchedule()
- }
- return nil
- }
- func (s *BackupService) PerformBackup() {
- s.Lock.Lock()
- defer s.Lock.Unlock()
- startTime := time.Now()
- backupLog := models.BackupLog{
- ID: uuid.New(), // Explicitly generate UUID
- StartTime: startTime,
- Status: "RUNNING",
- UploadStatus: "PENDING",
- }
- if err := models.DB.Create(&backupLog).Error; err != nil {
- log.Printf("[ERROR] Failed to create backup log: %v", err)
- // Proceeding might be futile if we can't update status, but let's try
- } else {
- log.Printf("[INFO] Created backup log: %s", backupLog.ID)
- }
- // 1. Prepare Paths
- backupDir := "backups"
- if _, err := os.Stat(backupDir); os.IsNotExist(err) {
- os.MkdirAll(backupDir, 0755)
- }
- fileName := fmt.Sprintf("ems_backup_%s.sql", startTime.Format("20060102_150405"))
- filePath := filepath.Join(backupDir, fileName)
- // 2. Run pg_dump
- // Env vars for pg_dump
- pgUser := os.Getenv("POSTGRES_USER")
- pgHost := os.Getenv("POSTGRES_HOST")
- pgPort := os.Getenv("POSTGRES_PORT")
- pgDB := os.Getenv("POSTGRES_DB")
- pgPass := os.Getenv("POSTGRES_PASSWORD") // Or DB_PASSWORD
- // Ensure vars are set
- if pgHost == "" { pgHost = "postgres" }
- if pgUser == "" { pgUser = "ems" }
- if pgDB == "" { pgDB = "ems" }
- if pgPort == "" { pgPort = "5432" }
- // Construct command
- cmd := exec.Command("pg_dump", "-h", pgHost, "-p", pgPort, "-U", pgUser, "-d", pgDB, "-f", filePath)
- cmd.Env = append(os.Environ(), fmt.Sprintf("PGPASSWORD=%s", pgPass))
- log.Printf("Executing backup command: %s", cmd.String())
- output, err := cmd.CombinedOutput()
- if err != nil {
- errMsg := fmt.Sprintf("pg_dump failed: %s, output: %s", err, string(output))
- log.Println(errMsg)
- backupLog.Status = "FAILED"
- backupLog.Message = errMsg
- backupLog.EndTime = time.Now()
- models.DB.Save(&backupLog)
- return
- }
- // Check file size
- info, err := os.Stat(filePath)
- if err == nil {
- backupLog.Size = info.Size()
- }
- backupLog.FileName = fileName
- backupLog.Status = "SUCCESS"
- backupLog.Message = "Backup created locally."
- models.DB.Save(&backupLog)
- // 3. Upload to MinIO
- config, _ := s.GetConfig()
- if config.Endpoint != "" && config.Bucket != "" {
- err := s.uploadToMinIO(filePath, fileName, config)
- if err != nil {
- log.Printf("MinIO upload failed: %v", err)
- backupLog.Message += fmt.Sprintf(" Upload failed: %v", err)
- backupLog.UploadStatus = "FAILED"
- } else {
- backupLog.UploadStatus = "UPLOADED"
- backupLog.Message += " Uploaded to MinIO."
-
- // Optional: Remove local file after successful upload to save space
- // os.Remove(filePath)
- }
- models.DB.Save(&backupLog)
- }
- }
- // Helper function to clean endpoint
- func cleanEndpoint(endpoint string) (string, bool) {
- useSSL := false
- if strings.HasPrefix(endpoint, "https://") {
- useSSL = true
- endpoint = strings.TrimPrefix(endpoint, "https://")
- } else {
- endpoint = strings.TrimPrefix(endpoint, "http://")
- }
- // Remove trailing slash if any
- endpoint = strings.TrimRight(endpoint, "/")
- return endpoint, useSSL
- }
- func (s *BackupService) uploadToMinIO(filePath, objectName string, config *models.BackupConfig) error {
- ctx := context.Background()
-
- endpoint, useSSL := cleanEndpoint(config.Endpoint)
- log.Printf("[DEBUG] Connecting to MinIO: Endpoint=%s, UseSSL=%v, Bucket=%s", endpoint, useSSL, config.Bucket)
- // Initialize minio client object.
- minioClient, err := minio.New(endpoint, &minio.Options{
- Creds: credentials.NewStaticV4(config.AccessKey, config.SecretKey, ""),
- Secure: useSSL,
- })
- if err != nil {
- log.Printf("[ERROR] Failed to create MinIO client: %v", err)
- return err
- }
- // Make bucket if not exists
- exists, err := minioClient.BucketExists(ctx, config.Bucket)
- if err != nil {
- log.Printf("[ERROR] Failed to check bucket existence: %v", err)
- return err
- }
- if !exists {
- log.Printf("[INFO] Bucket %s does not exist, creating it...", config.Bucket)
- err = minioClient.MakeBucket(ctx, config.Bucket, minio.MakeBucketOptions{})
- if err != nil {
- log.Printf("[ERROR] Failed to create bucket: %v", err)
- return err
- }
- }
- // Upload the file
- log.Printf("[INFO] Uploading file %s to bucket %s object %s", filePath, config.Bucket, objectName)
- info, err := minioClient.FPutObject(ctx, config.Bucket, objectName, filePath, minio.PutObjectOptions{})
- if err != nil {
- log.Printf("[ERROR] Failed to upload object: %v", err)
- return err
- }
- log.Printf("[SUCCESS] Successfully uploaded %s of size %d", objectName, info.Size)
- return nil
- }
- func (s *BackupService) GetLogs() ([]models.BackupLog, error) {
- logs := make([]models.BackupLog, 0)
- err := models.DB.Order("start_time desc").Limit(20).Find(&logs).Error
- if err != nil {
- log.Printf("[ERROR] GetLogs error: %v", err)
- } else {
- log.Printf("[DEBUG] GetLogs found %d records", len(logs))
- }
- return logs, err
- }
- // TestConnection tests the MinIO connection with the provided config
- func (s *BackupService) TestConnection(config *models.BackupConfig) error {
- ctx := context.Background()
-
- endpoint, useSSL := cleanEndpoint(config.Endpoint)
- log.Printf("[DEBUG] Testing MinIO Connection: Endpoint=%s, UseSSL=%v, AccessKey=***%s", endpoint, useSSL, getLast4(config.AccessKey))
- // Initialize minio client object.
- minioClient, err := minio.New(endpoint, &minio.Options{
- Creds: credentials.NewStaticV4(config.AccessKey, config.SecretKey, ""),
- Secure: useSSL,
- })
- if err != nil {
- log.Printf("[ERROR] Failed to create MinIO client during test: %v", err)
- return fmt.Errorf("failed to create minio client: %v", err)
- }
- // Try to list buckets to verify credentials and endpoint
- log.Println("[DEBUG] Listing buckets to verify connection...")
- buckets, err := minioClient.ListBuckets(ctx)
- if err != nil {
- log.Printf("[ERROR] Failed to list buckets: %v", err)
- return fmt.Errorf("failed to connect to minio: %v", err)
- }
- log.Printf("[DEBUG] List buckets success. Found %d buckets.", len(buckets))
- // Also check if specific bucket exists if provided
- if config.Bucket != "" {
- log.Printf("[DEBUG] Checking specific bucket: %s", config.Bucket)
- exists, err := minioClient.BucketExists(ctx, config.Bucket)
- if err != nil {
- log.Printf("[ERROR] Failed to check bucket existence: %v", err)
- return fmt.Errorf("failed to check bucket existence: %v", err)
- }
- if !exists {
- log.Printf("[WARN] Bucket %s does not exist.", config.Bucket)
- // For test connection, maybe just returning "success but bucket missing" or "success" is enough.
- // Just return nil (success) if we can talk to MinIO.
- } else {
- log.Printf("[DEBUG] Bucket %s exists.", config.Bucket)
- }
- }
- return nil
- }
- func getLast4(s string) string {
- if len(s) > 4 {
- return s[len(s)-4:]
- }
- return s
- }
- // DownloadFile retrieves a file from MinIO and returns it as a stream
- func (s *BackupService) DownloadFile(logID string) (*minio.Object, string, error) {
- // 1. Get Log to find filename
- var backupLog models.BackupLog
- if err := models.DB.Where("id = ?", logID).First(&backupLog).Error; err != nil {
- return nil, "", fmt.Errorf("backup log not found: %v", err)
- }
- // 2. Get Config to find bucket/creds
- config, err := s.GetConfig()
- if err != nil {
- return nil, "", err
- }
- if config.Endpoint == "" || config.Bucket == "" {
- return nil, "", fmt.Errorf("minio not configured")
- }
- endpoint, useSSL := cleanEndpoint(config.Endpoint)
-
- // 3. Init Client
- minioClient, err := minio.New(endpoint, &minio.Options{
- Creds: credentials.NewStaticV4(config.AccessKey, config.SecretKey, ""),
- Secure: useSSL,
- })
- if err != nil {
- return nil, "", err
- }
- // 4. Get Object
- object, err := minioClient.GetObject(context.Background(), config.Bucket, backupLog.FileName, minio.GetObjectOptions{})
- if err != nil {
- return nil, "", err
- }
- // Verify object exists/readable
- _, err = object.Stat()
- if err != nil {
- return nil, "", fmt.Errorf("failed to stat object in minio: %v", err)
- }
- return object, backupLog.FileName, nil
- }
- func (s *BackupService) PerformResourceBackup() {
- s.Lock.Lock()
- defer s.Lock.Unlock()
- startTime := time.Now()
- // 使用 _resource_ 前缀区分数据库备份
- fileName := fmt.Sprintf("ems_resource_%s.zip", startTime.Format("20060102_150405"))
-
- backupLog := models.BackupLog{
- ID: uuid.New(),
- StartTime: startTime,
- Status: "RUNNING",
- UploadStatus: "PENDING",
- FileName: fileName,
- }
- if err := models.DB.Create(&backupLog).Error; err != nil {
- log.Printf("[ERROR] Failed to create resource backup log: %v", err)
- }
- // 1. 查询数据 (资源与物联中心的四个部分)
- var sources []models.IntegrationSource
- var devices []models.Device
- var templates []models.EquipmentCleaningFormulaTemplate
- var locations []models.SysLocation
- // 查询数据
- models.DB.Find(&sources)
- models.DB.Find(&devices)
- models.DB.Find(&templates)
- models.DB.Find(&locations)
- // 填充 DeviceCount (IntegrationSource 需要此字段用于导出,虽然后端导入时不一定用)
- for i := range sources {
- var count int64
- models.DB.Model(&models.Device{}).Where("source_id = ?", sources[i].ID).Count(&count)
- sources[i].DeviceCount = count
- }
- // 2. 创建 Zip 文件
- backupDir := "backups"
- if _, err := os.Stat(backupDir); os.IsNotExist(err) {
- os.MkdirAll(backupDir, 0755)
- }
- filePath := filepath.Join(backupDir, fileName)
- zipFile, err := os.Create(filePath)
- if err != nil {
- s.logBackupError(&backupLog, fmt.Sprintf("Failed to create zip file: %v", err))
- return
- }
- // defer will be called when function returns
- defer zipFile.Close()
- archive := zip.NewWriter(zipFile)
- defer archive.Close()
- // 辅助函数: 写入 Excel 到 Zip
- writeExcel := func(filename string, f *excelize.File) error {
- w, err := archive.Create(filename)
- if err != nil {
- return err
- }
- // WriteTo 将 Excel 文件内容写入 Zip 中的文件流
- if _, err := f.WriteTo(w); err != nil {
- return err
- }
- return nil
- }
- // 3. 生成 Excel 文件并写入 Zip
- // 3.1 Integration Sources (数据源)
- // 格式参考前端: 名称, 驱动类型, 配置信息, 状态, 设备数量
- fSources := excelize.NewFile()
- fSources.SetSheetName("Sheet1", "数据源列表")
- headers := []string{"名称", "驱动类型", "配置信息", "状态", "设备数量"}
- for i, h := range headers {
- cell, _ := excelize.CoordinatesToCellName(i+1, 1)
- fSources.SetCellValue("数据源列表", cell, h)
- }
- for i, src := range sources {
- row := i + 2
- status := "离线"
- if src.Status == "ONLINE" {
- status = "在线"
- }
-
- fSources.SetCellValue("数据源列表", fmt.Sprintf("A%d", row), src.Name)
- fSources.SetCellValue("数据源列表", fmt.Sprintf("B%d", row), src.DriverType)
- fSources.SetCellValue("数据源列表", fmt.Sprintf("C%d", row), string(src.Config)) // JSON String
- fSources.SetCellValue("数据源列表", fmt.Sprintf("D%d", row), status)
- fSources.SetCellValue("数据源列表", fmt.Sprintf("E%d", row), src.DeviceCount)
- }
- if err := writeExcel("integration_sources.xlsx", fSources); err != nil {
- s.logBackupError(&backupLog, fmt.Sprintf("Failed to write sources excel: %v", err))
- return
- }
- // 3.2 Devices (设备)
- // 格式参考前端: Name, DeviceType, ExternalID, LocationID, Status, SourceID, AttributeMapping
- fDevices := excelize.NewFile()
- fDevices.SetSheetName("Sheet1", "Devices")
- headersDev := []string{"Name", "DeviceType", "ExternalID", "LocationID", "Status", "SourceID", "AttributeMapping"}
- for i, h := range headersDev {
- cell, _ := excelize.CoordinatesToCellName(i+1, 1)
- fDevices.SetCellValue("Devices", cell, h)
- }
- for i, d := range devices {
- row := i + 2
- // AttributeMapping 是 JSONB 类型,转为字符串
- fDevices.SetCellValue("Devices", fmt.Sprintf("A%d", row), d.Name)
- fDevices.SetCellValue("Devices", fmt.Sprintf("B%d", row), d.DeviceType)
- fDevices.SetCellValue("Devices", fmt.Sprintf("C%d", row), d.ExternalID)
- fDevices.SetCellValue("Devices", fmt.Sprintf("D%d", row), d.LocationID)
- fDevices.SetCellValue("Devices", fmt.Sprintf("E%d", row), d.Status)
- fDevices.SetCellValue("Devices", fmt.Sprintf("F%d", row), d.SourceID)
- fDevices.SetCellValue("Devices", fmt.Sprintf("G%d", row), string(d.AttributeMapping))
- }
- if err := writeExcel("devices.xlsx", fDevices); err != nil {
- s.logBackupError(&backupLog, fmt.Sprintf("Failed to write devices excel: %v", err))
- return
- }
- // 3.3 Cleaning Templates (清洗模版)
- // 格式参考前端: 模板名称, 设备类型, 描述, 清洗公式配置
- fTpl := excelize.NewFile()
- fTpl.SetSheetName("Sheet1", "Sheet1")
- headersTpl := []string{"模板名称", "设备类型", "描述", "清洗公式配置"}
- for i, h := range headersTpl {
- cell, _ := excelize.CoordinatesToCellName(i+1, 1)
- fTpl.SetCellValue("Sheet1", cell, h)
- }
- for i, t := range templates {
- row := i + 2
- fTpl.SetCellValue("Sheet1", fmt.Sprintf("A%d", row), t.Name)
- fTpl.SetCellValue("Sheet1", fmt.Sprintf("B%d", row), t.EquipmentType)
- fTpl.SetCellValue("Sheet1", fmt.Sprintf("C%d", row), t.Description)
- fTpl.SetCellValue("Sheet1", fmt.Sprintf("D%d", row), string(t.Formula))
- }
- if err := writeExcel("cleaning_templates.xlsx", fTpl); err != nil {
- s.logBackupError(&backupLog, fmt.Sprintf("Failed to write templates excel: %v", err))
- return
- }
- // 3.4 Locations (位置)
- // 前端暂无明确导出,使用通用字段
- fLoc := excelize.NewFile()
- fLoc.SetSheetName("Sheet1", "Locations")
- headersLoc := []string{"ID", "Name", "ParentID", "Type"}
- for i, h := range headersLoc {
- cell, _ := excelize.CoordinatesToCellName(i+1, 1)
- fLoc.SetCellValue("Locations", cell, h)
- }
- for i, l := range locations {
- row := i + 2
- fLoc.SetCellValue("Locations", fmt.Sprintf("A%d", row), l.ID)
- fLoc.SetCellValue("Locations", fmt.Sprintf("B%d", row), l.Name)
- fLoc.SetCellValue("Locations", fmt.Sprintf("C%d", row), l.ParentID)
- fLoc.SetCellValue("Locations", fmt.Sprintf("D%d", row), l.Type)
- }
- if err := writeExcel("sys_locations.xlsx", fLoc); err != nil {
- s.logBackupError(&backupLog, fmt.Sprintf("Failed to write locations excel: %v", err))
- return
- }
- // 关闭 Zip Writer 以确保所有数据写入文件
- if err := archive.Close(); err != nil {
- s.logBackupError(&backupLog, fmt.Sprintf("Failed to close archive: %v", err))
- return
- }
- // Note: zipFile.Close() is deferred
- // 3. 更新日志并上传
- info, err := os.Stat(filePath)
- if err == nil {
- backupLog.Size = info.Size()
- }
- backupLog.FilePath = fileName // 保存文件名作为相对路径
- backupLog.Status = "SUCCESS"
- backupLog.Message = "Resource backup (Excel) created successfully."
- backupLog.EndTime = time.Now()
- models.DB.Save(&backupLog)
- // 4. 上传到 MinIO (复用现有的上传逻辑)
- config, _ := s.GetConfig()
- if config.Endpoint != "" && config.Bucket != "" {
- err := s.uploadToMinIO(filePath, fileName, config)
- if err != nil {
- log.Printf("MinIO upload failed: %v", err)
- backupLog.Message += fmt.Sprintf(" Upload failed: %v", err)
- backupLog.UploadStatus = "FAILED"
- } else {
- backupLog.UploadStatus = "UPLOADED"
- backupLog.Message += " Uploaded to MinIO."
- }
- models.DB.Save(&backupLog)
- }
- }
- // 辅助方法: 记录错误
- func (s *BackupService) logBackupError(log *models.BackupLog, msg string) {
- log.Status = "FAILED"
- log.Message = msg
- log.EndTime = time.Now()
- models.DB.Save(log)
- }
|