package services import ( "ems-backend/db" "ems-backend/models" "ems-backend/utils" "encoding/json" "log" "strconv" "time" "strings" "sync" "github.com/robfig/cron/v3" ) var GlobalCollector *CollectorService type CollectorService struct { cron *cron.Cron DebugMode bool mu sync.RWMutex IntervalSeconds float64 runningSources sync.Map } func NewCollectorService() *CollectorService { GlobalCollector = &CollectorService{ cron: cron.New(cron.WithSeconds()), IntervalSeconds: 5.0, } return GlobalCollector } func (s *CollectorService) SetDebugMode(enabled bool) { s.mu.Lock() defer s.mu.Unlock() s.DebugMode = enabled log.Printf("Collector Debug Mode set to: %v", enabled) } func (s *CollectorService) IsDebug() bool { s.mu.RLock() defer s.mu.RUnlock() return s.DebugMode } func (s *CollectorService) Start() { // 1. 尝试从数据库加载配置 var config models.SysConfig spec := "*/5 * * * * *" // 默认 5秒 if err := models.DB.Where("config_key = ?", "collection_frequency").First(&config).Error; err == nil { if config.ConfigValue != "" { spec = config.ConfigValue } } else { // 如果不存在,初始化默认值到数据库 models.DB.Create(&models.SysConfig{ ConfigKey: "collection_frequency", ConfigValue: spec, ConfigType: "Y", Remark: "数据采集频率 (Cron表达式, 默认: */5 * * * * *)", }) } s.updateIntervalFromSpec(spec) _, err := s.cron.AddFunc(spec, s.collectJob) if err != nil { log.Printf("配置的频率格式错误 '%s', 使用默认值. Error: %v", spec, err) spec = "*/5 * * * * *" s.updateIntervalFromSpec(spec) s.cron.AddFunc(spec, s.collectJob) } s.cron.Start() log.Printf("Data Collector Service started (spec: %s, interval: %.1fs)", spec, s.IntervalSeconds) } // 新增: 重启服务以应用新频率 func (s *CollectorService) Restart(spec string) error { s.mu.Lock() defer s.mu.Unlock() s.cron.Stop() // 重新创建 Cron 实例以清除旧任务 s.cron = cron.New(cron.WithSeconds()) _, err := s.cron.AddFunc(spec, s.collectJob) if err != nil { // 如果新规则错误,恢复默认并返回错误 log.Printf("Invalid cron spec '%s', reverting to default: %v", spec, err) defaultSpec := "*/5 * * * * *" s.updateIntervalFromSpec(defaultSpec) s.cron.AddFunc(defaultSpec, s.collectJob) s.cron.Start() return err } s.updateIntervalFromSpec(spec) s.cron.Start() log.Printf("Data Collector restarted with new spec: %s, interval: %.1fs", spec, s.IntervalSeconds) return nil } func (s *CollectorService) updateIntervalFromSpec(spec string) { // 简单解析 */N,如果失败则保留默认 5.0 // 仅支持 */N * * * * * 格式的秒级解析,其他情况默认5s (或者不变更) if strings.HasPrefix(spec, "*/") { parts := strings.Split(spec, " ") if len(parts) > 0 { divParts := strings.Split(parts[0], "/") if len(divParts) == 2 { if val, err := strconv.ParseFloat(divParts[1], 64); err == nil { s.IntervalSeconds = val return } } } } // 如果无法解析,或者是 "0 * ..." 等格式,暂时默认5s,或者保持不变 // 为了安全起见,如果不确定,设置为 5.0 // 实际生产中应该记录上次采集时间来计算 Delta s.IntervalSeconds = 5.0 } func (s *CollectorService) Stop() { s.cron.Stop() } func (s *CollectorService) collectJob() { log.Println("Starting collection cycle...") // 1. Fetch all active devices with source var devices []models.Device if err := models.DB.Where("status != ?", "INACTIVE").Find(&devices).Error; err != nil { log.Printf("Error fetching devices: %v\n", err) return } if len(devices) == 0 { return } // 2. Group by SourceID deviceGroups := make(map[string][]models.Device) for _, d := range devices { if d.SourceID.String() != "" && d.SourceID.String() != "00000000-0000-0000-0000-000000000000" { deviceGroups[d.SourceID.String()] = append(deviceGroups[d.SourceID.String()], d) } } // 3. Process each group for sourceID, devs := range deviceGroups { // 并发控制:如果该源的上一次采集还在运行,则跳过本次 if _, loaded := s.runningSources.LoadOrStore(sourceID, true); loaded { log.Printf("Warning: Collection for source %s is still running, skipping this cycle.", sourceID) continue } go s.processSourceGroup(sourceID, devs) } } func (s *CollectorService) processSourceGroup(sourceID string, devices []models.Device) { // 任务结束时移除标记 defer s.runningSources.Delete(sourceID) // 记录开始时间 start := time.Now() // Fetch Source Config var source models.IntegrationSource if err := models.DB.First(&source, "id = ?", sourceID).Error; err != nil { log.Printf("Source %s not found: %v\n", sourceID, err) return } if source.DriverType != "HOME_ASSISTANT" { return } // 1. Prepare Request entityIDs := make([]string, 0) // Helper structs type Target struct { DeviceID string Metric string LocationID string Formula string } // deviceID -> metric -> value (Cache for calculation) deviceMetricValues := make(map[string]map[string]float64) requestMap := make(map[string][]Target) // deviceID -> hasSwitch (Track if device has a switch configured) deviceHasSwitch := make(map[string]bool) // Virtual calculation tasks type CalcTask struct { DeviceID string Metric string LocationID string Formula string // For pure formula virtual values } calcTasks := make([]CalcTask, 0) for _, d := range devices { // Init cache deviceMetricValues[d.ID.String()] = make(map[string]float64) var mapping map[string]string b, _ := d.AttributeMapping.MarshalJSON() json.Unmarshal(b, &mapping) locID := "" if d.LocationID != nil { locID = d.LocationID.String() } // --- Rule Validation & Task Generation --- // 1. Handle Explicit Mappings (Switch, Voltage, Current, etc.) for metric, entityID := range mapping { // Skip formula definition keys if len(metric) > 8 && metric[len(metric)-8:] == "_formula" { continue } // Switch must have bound entity if metric == "switch" && entityID == "" { // log.Printf("Warning: Device %s has 'switch' metric but no entity bound.", d.Name) continue } // Record if switch is mapped if metric == "switch" && entityID != "" { deviceHasSwitch[d.ID.String()] = true } // If configured with entity ID, add to collection queue if entityID != "" { if _, exists := requestMap[entityID]; !exists { entityIDs = append(entityIDs, entityID) } requestMap[entityID] = append(requestMap[entityID], Target{ DeviceID: d.ID.String(), Metric: metric, LocationID: locID, Formula: mapping[metric+"_formula"], // Optional correction formula }) } } // 2. Handle Electric Device Logic (Voltage/Current -> Power -> Energy) // We use "ELECTRIC" string literal matching the schema comment // Note: Schema says "ELECTRIC", user said "电力设备" (Electric Equipment) if d.DeviceType == "ELECTRIC" || d.DeviceType == "METER" || d.DeviceType == "Electric" { // Check Voltage hasVoltageEntity := mapping["voltage"] != "" voltageFormula := mapping["voltage_formula"] if !hasVoltageEntity { // If no entity, must have formula (Virtual Value) if voltageFormula != "" { calcTasks = append(calcTasks, CalcTask{ DeviceID: d.ID.String(), Metric: "voltage", LocationID: locID, Formula: voltageFormula, }) } else { // log.Printf("Warning: Device %s (Electric) missing 'voltage' source (Entity or Formula required).", d.Name) } } // Check Current hasCurrentEntity := mapping["current"] != "" currentFormula := mapping["current_formula"] if !hasCurrentEntity { if currentFormula != "" { calcTasks = append(calcTasks, CalcTask{ DeviceID: d.ID.String(), Metric: "current", LocationID: locID, Formula: currentFormula, }) } else { // log.Printf("Warning: Device %s (Electric) missing 'current' source.", d.Name) } } // Auto Add Power and Energy Calculation Tasks (Always try to calculate if missing or as shadow) // Strategy: If user mapped "power", we fetched it above. // If we also add a calc task here, we might overwrite it or fill it if fetch failed. // Let's only add calc task if NO entity mapped, OR if we want to enforce consistency. // User query: "active_power and cumulative_power are repetitive... user calculates power and energy" // Implicitly, if they have an entity for power, they map it. // If they don't, we calculate. if mapping["power"] == "" { calcTasks = append(calcTasks, CalcTask{ DeviceID: d.ID.String(), Metric: "power", LocationID: locID, }) } if mapping["energy"] == "" { calcTasks = append(calcTasks, CalcTask{ DeviceID: d.ID.String(), Metric: "energy", LocationID: locID, }) } } } if len(entityIDs) == 0 && len(calcTasks) == 0 { log.Printf("数据源 %s: 未找到采集任务 (未映射实体且无虚拟任务)", source.Name) return } // 2. Fetch Data (Real World) var states map[string]string var err error if len(entityIDs) > 0 { log.Printf("数据源 %s: 正在请求 %d 个实体: %v", source.Name, len(entityIDs), entityIDs) states, err = utils.BatchFetchStates(source.Config, entityIDs) if err != nil { log.Printf("数据源 %s 获取状态失败: %v\n", source.Name, err) return } log.Printf("数据源 %s: 从 HA 获取到 %d 个状态: %v", source.Name, len(states), states) } else { states = make(map[string]string) } // Update Device Status for _, d := range devices { isOnline := false var mapping map[string]string b, _ := d.AttributeMapping.MarshalJSON() json.Unmarshal(b, &mapping) hasBoundEntity := false for k, entityID := range mapping { if len(k) > 8 && k[len(k)-8:] == "_formula" { continue } if entityID != "" { hasBoundEntity = true if _, ok := states[entityID]; ok { isOnline = true break } } } targetStatus := d.Status if hasBoundEntity { if isOnline { targetStatus = "ONLINE" } else { targetStatus = "OFFLINE" } } else { targetStatus = "ONLINE" } if d.Status != targetStatus { models.DB.Model(&d).Update("Status", targetStatus) } } now := time.Now() var toInsert []db.ReadingRecord // Helper to collect (batch insert at end of cycle) saveMetric := func(deviceID, metric string, val float64, locID string) { // Update Cache if deviceMetricValues[deviceID] == nil { deviceMetricValues[deviceID] = make(map[string]float64) } deviceMetricValues[deviceID][metric] = val toInsert = append(toInsert, db.ReadingRecord{ DeviceID: deviceID, Metric: metric, Value: val, LocationID: locID, Ts: now, }) if s.IsDebug() { log.Printf("调试: 已收集 设备=%s 指标=%s 值=%.2f", deviceID, metric, val) } // Check Alarms Async (before batch write) if GlobalAlarmService != nil { GlobalAlarmService.CheckRules(deviceID, metric, val) } } // 3. Process Real Entities for entityID, valStr := range states { // Parse val, err := strconv.ParseFloat(valStr, 64) if err != nil { // Boolean handling for Switch lower := strings.ToLower(valStr) if lower == "on" || lower == "true" { val = 1.0 } else if lower == "off" || lower == "false" { val = 0.0 } else { if s.IsDebug() { log.Printf("调试: 实体 %s 解析数值失败 (原始值=%s): %v. 跳过.", entityID, valStr, err) } continue } } targets := requestMap[entityID] for _, t := range targets { finalVal := val // Apply Formula (e.g. calibration) if t.Formula != "" { if res, err := utils.EvaluateFormula(t.Formula, val); err == nil { if s.IsDebug() { log.Printf("调试: 应用公式 [%s] 设备=%s 指标=%s: %.2f -> %.2f", t.Formula, t.DeviceID, t.Metric, val, res) } finalVal = res } else { log.Printf("调试: 公式计算错误 [%s] 设备=%s 指标=%s: %v. 使用原始值: %.2f", t.Formula, t.DeviceID, t.Metric, err, val) } } saveMetric(t.DeviceID, t.Metric, finalVal, t.LocationID) } } // 4. Process Virtual/Calculated Tasks (Ordered) // Pass 1: Base Virtual Values (Fx only, e.g. Virtual Voltage) for _, task := range calcTasks { if task.Formula != "" && (task.Metric == "voltage" || task.Metric == "current") { // Formula with x=0 (constant or time-based if supported, currently constant) if res, err := utils.EvaluateFormula(task.Formula, 0); err == nil { if s.IsDebug() { log.Printf("调试: 虚拟指标 [%s] 设备=%s: 公式 [%s] (x=0) -> %.2f", task.Metric, task.DeviceID, task.Formula, res) } saveMetric(task.DeviceID, task.Metric, res, task.LocationID) } else { log.Printf("调试: 虚拟指标计算错误 [%s] 设备=%s: 公式 [%s] 失败: %v", task.Metric, task.DeviceID, task.Formula, err) } } } // Pass 2: Power (P = V * I) for _, task := range calcTasks { if task.Metric == "power" { // Check Switch Status if deviceHasSwitch[task.DeviceID] { swVal, ok := deviceMetricValues[task.DeviceID]["switch"] // If switch is OFF (0) or Unknown (!ok), skip calculation if !ok || swVal != 1 { continue } } metrics := deviceMetricValues[task.DeviceID] v, hasV := metrics["voltage"] c, hasC := metrics["current"] if hasV && hasC { power := v * c saveMetric(task.DeviceID, "power", power, task.LocationID) } } } // Pass 3: Energy (E += P * t) for _, task := range calcTasks { if task.Metric == "energy" { // Check Switch Status if deviceHasSwitch[task.DeviceID] { swVal, ok := deviceMetricValues[task.DeviceID]["switch"] // If switch is OFF (0) or Unknown (!ok), skip calculation if !ok || swVal != 1 { continue } } metrics := deviceMetricValues[task.DeviceID] p, hasP := metrics["power"] if hasP { // Interval is dynamic hours := s.IntervalSeconds / 3600.0 increment := (p / 1000.0) * hours // W -> kWh // Get previous value lastVal := 0.0 lastData, err := db.GetLatestDeviceData(task.DeviceID) if err == nil { for _, ld := range lastData { if ld.Metric == "energy" { lastVal = ld.Value break } } } newEnergy := lastVal + increment saveMetric(task.DeviceID, "energy", newEnergy, task.LocationID) } } } // Batch insert to TDengine (one REST call per ~500 records) count := 0 if len(toInsert) > 0 { if err := db.BatchInsertReadings(toInsert); err != nil { log.Printf("数据库错误: 批量写入失败 %d 条: %v", len(toInsert), err) } else { count = len(toInsert) } } duration := time.Since(start) log.Printf("Source %s: Collected %d data points for %d devices in %v", source.Name, count, len(devices), duration) if duration.Seconds() > s.IntervalSeconds { log.Printf("PERFORMANCE WARNING: Collection for source %s took %v, which is longer than interval %.1fs", source.Name, duration, s.IntervalSeconds) } }