| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526 |
- package services
- import (
- "ems-backend/db"
- "ems-backend/models"
- "ems-backend/utils"
- "encoding/json"
- "log"
- "strconv"
- "time"
- "strings"
- "sync"
- "github.com/robfig/cron/v3"
- )
- var GlobalCollector *CollectorService
- type CollectorService struct {
- cron *cron.Cron
- DebugMode bool
- mu sync.RWMutex
- IntervalSeconds float64
- runningSources sync.Map
- }
- func NewCollectorService() *CollectorService {
- GlobalCollector = &CollectorService{
- cron: cron.New(cron.WithSeconds()),
- IntervalSeconds: 5.0,
- }
- return GlobalCollector
- }
- func (s *CollectorService) SetDebugMode(enabled bool) {
- s.mu.Lock()
- defer s.mu.Unlock()
- s.DebugMode = enabled
- log.Printf("Collector Debug Mode set to: %v", enabled)
- }
- func (s *CollectorService) IsDebug() bool {
- s.mu.RLock()
- defer s.mu.RUnlock()
- return s.DebugMode
- }
- func (s *CollectorService) Start() {
- // 1. 尝试从数据库加载配置
- var config models.SysConfig
- spec := "*/5 * * * * *" // 默认 5秒
- if err := models.DB.Where("config_key = ?", "collection_frequency").First(&config).Error; err == nil {
- if config.ConfigValue != "" {
- spec = config.ConfigValue
- }
- } else {
- // 如果不存在,初始化默认值到数据库
- models.DB.Create(&models.SysConfig{
- ConfigKey: "collection_frequency",
- ConfigValue: spec,
- ConfigType: "Y",
- Remark: "数据采集频率 (Cron表达式, 默认: */5 * * * * *)",
- })
- }
- s.updateIntervalFromSpec(spec)
- _, err := s.cron.AddFunc(spec, s.collectJob)
- if err != nil {
- log.Printf("配置的频率格式错误 '%s', 使用默认值. Error: %v", spec, err)
- spec = "*/5 * * * * *"
- s.updateIntervalFromSpec(spec)
- s.cron.AddFunc(spec, s.collectJob)
- }
- s.cron.Start()
- log.Printf("Data Collector Service started (spec: %s, interval: %.1fs)", spec, s.IntervalSeconds)
- }
- // 新增: 重启服务以应用新频率
- func (s *CollectorService) Restart(spec string) error {
- s.mu.Lock()
- defer s.mu.Unlock()
- s.cron.Stop()
- // 重新创建 Cron 实例以清除旧任务
- s.cron = cron.New(cron.WithSeconds())
- _, err := s.cron.AddFunc(spec, s.collectJob)
- if err != nil {
- // 如果新规则错误,恢复默认并返回错误
- log.Printf("Invalid cron spec '%s', reverting to default: %v", spec, err)
- defaultSpec := "*/5 * * * * *"
- s.updateIntervalFromSpec(defaultSpec)
- s.cron.AddFunc(defaultSpec, s.collectJob)
- s.cron.Start()
- return err
- }
- s.updateIntervalFromSpec(spec)
- s.cron.Start()
- log.Printf("Data Collector restarted with new spec: %s, interval: %.1fs", spec, s.IntervalSeconds)
- return nil
- }
- func (s *CollectorService) updateIntervalFromSpec(spec string) {
- // 简单解析 */N,如果失败则保留默认 5.0
- // 仅支持 */N * * * * * 格式的秒级解析,其他情况默认5s (或者不变更)
- if strings.HasPrefix(spec, "*/") {
- parts := strings.Split(spec, " ")
- if len(parts) > 0 {
- divParts := strings.Split(parts[0], "/")
- if len(divParts) == 2 {
- if val, err := strconv.ParseFloat(divParts[1], 64); err == nil {
- s.IntervalSeconds = val
- return
- }
- }
- }
- }
- // 如果无法解析,或者是 "0 * ..." 等格式,暂时默认5s,或者保持不变
- // 为了安全起见,如果不确定,设置为 5.0
- // 实际生产中应该记录上次采集时间来计算 Delta
- s.IntervalSeconds = 5.0
- }
- func (s *CollectorService) Stop() {
- s.cron.Stop()
- }
- func (s *CollectorService) collectJob() {
- log.Println("Starting collection cycle...")
-
- // 1. Fetch all active devices with source
- var devices []models.Device
- if err := models.DB.Where("status != ?", "INACTIVE").Find(&devices).Error; err != nil {
- log.Printf("Error fetching devices: %v\n", err)
- return
- }
- if len(devices) == 0 {
- return
- }
- // 2. Group by SourceID
- deviceGroups := make(map[string][]models.Device)
- for _, d := range devices {
- if d.SourceID.String() != "" && d.SourceID.String() != "00000000-0000-0000-0000-000000000000" {
- deviceGroups[d.SourceID.String()] = append(deviceGroups[d.SourceID.String()], d)
- }
- }
- // 3. Process each group
- for sourceID, devs := range deviceGroups {
- // 并发控制:如果该源的上一次采集还在运行,则跳过本次
- if _, loaded := s.runningSources.LoadOrStore(sourceID, true); loaded {
- log.Printf("Warning: Collection for source %s is still running, skipping this cycle.", sourceID)
- continue
- }
- go s.processSourceGroup(sourceID, devs)
- }
- }
- func (s *CollectorService) processSourceGroup(sourceID string, devices []models.Device) {
- // 任务结束时移除标记
- defer s.runningSources.Delete(sourceID)
- // 记录开始时间
- start := time.Now()
- // Fetch Source Config
- var source models.IntegrationSource
- if err := models.DB.First(&source, "id = ?", sourceID).Error; err != nil {
- log.Printf("Source %s not found: %v\n", sourceID, err)
- return
- }
- if source.DriverType != "HOME_ASSISTANT" {
- return
- }
- // 1. Prepare Request
- entityIDs := make([]string, 0)
-
- // Helper structs
- type Target struct {
- DeviceID string
- Metric string
- LocationID string
- Formula string
- }
-
- // deviceID -> metric -> value (Cache for calculation)
- deviceMetricValues := make(map[string]map[string]float64)
- requestMap := make(map[string][]Target)
-
- // deviceID -> hasSwitch (Track if device has a switch configured)
- deviceHasSwitch := make(map[string]bool)
-
- // Virtual calculation tasks
- type CalcTask struct {
- DeviceID string
- Metric string
- LocationID string
- Formula string // For pure formula virtual values
- }
- calcTasks := make([]CalcTask, 0)
- for _, d := range devices {
- // Init cache
- deviceMetricValues[d.ID.String()] = make(map[string]float64)
-
- var mapping map[string]string
- b, _ := d.AttributeMapping.MarshalJSON()
- json.Unmarshal(b, &mapping)
- locID := ""
- if d.LocationID != nil {
- locID = d.LocationID.String()
- }
- // --- Rule Validation & Task Generation ---
-
- // 1. Handle Explicit Mappings (Switch, Voltage, Current, etc.)
- for metric, entityID := range mapping {
- // Skip formula definition keys
- if len(metric) > 8 && metric[len(metric)-8:] == "_formula" {
- continue
- }
- // Switch must have bound entity
- if metric == "switch" && entityID == "" {
- // log.Printf("Warning: Device %s has 'switch' metric but no entity bound.", d.Name)
- continue
- }
- // Record if switch is mapped
- if metric == "switch" && entityID != "" {
- deviceHasSwitch[d.ID.String()] = true
- }
- // If configured with entity ID, add to collection queue
- if entityID != "" {
- if _, exists := requestMap[entityID]; !exists {
- entityIDs = append(entityIDs, entityID)
- }
- requestMap[entityID] = append(requestMap[entityID], Target{
- DeviceID: d.ID.String(),
- Metric: metric,
- LocationID: locID,
- Formula: mapping[metric+"_formula"], // Optional correction formula
- })
- }
- }
- // 2. Handle Electric Device Logic (Voltage/Current -> Power -> Energy)
- // We use "ELECTRIC" string literal matching the schema comment
- // Note: Schema says "ELECTRIC", user said "电力设备" (Electric Equipment)
- if d.DeviceType == "ELECTRIC" || d.DeviceType == "METER" || d.DeviceType == "Electric" {
- // Check Voltage
- hasVoltageEntity := mapping["voltage"] != ""
- voltageFormula := mapping["voltage_formula"]
-
- if !hasVoltageEntity {
- // If no entity, must have formula (Virtual Value)
- if voltageFormula != "" {
- calcTasks = append(calcTasks, CalcTask{
- DeviceID: d.ID.String(), Metric: "voltage", LocationID: locID, Formula: voltageFormula,
- })
- } else {
- // log.Printf("Warning: Device %s (Electric) missing 'voltage' source (Entity or Formula required).", d.Name)
- }
- }
- // Check Current
- hasCurrentEntity := mapping["current"] != ""
- currentFormula := mapping["current_formula"]
- if !hasCurrentEntity {
- if currentFormula != "" {
- calcTasks = append(calcTasks, CalcTask{
- DeviceID: d.ID.String(), Metric: "current", LocationID: locID, Formula: currentFormula,
- })
- } else {
- // log.Printf("Warning: Device %s (Electric) missing 'current' source.", d.Name)
- }
- }
- // Auto Add Power and Energy Calculation Tasks (Always try to calculate if missing or as shadow)
- // Strategy: If user mapped "power", we fetched it above.
- // If we also add a calc task here, we might overwrite it or fill it if fetch failed.
- // Let's only add calc task if NO entity mapped, OR if we want to enforce consistency.
- // User query: "active_power and cumulative_power are repetitive... user calculates power and energy"
- // Implicitly, if they have an entity for power, they map it.
- // If they don't, we calculate.
-
- if mapping["power"] == "" {
- calcTasks = append(calcTasks, CalcTask{
- DeviceID: d.ID.String(), Metric: "power", LocationID: locID,
- })
- }
-
- if mapping["energy"] == "" {
- calcTasks = append(calcTasks, CalcTask{
- DeviceID: d.ID.String(), Metric: "energy", LocationID: locID,
- })
- }
- }
- }
- if len(entityIDs) == 0 && len(calcTasks) == 0 {
- log.Printf("数据源 %s: 未找到采集任务 (未映射实体且无虚拟任务)", source.Name)
- return
- }
- // 2. Fetch Data (Real World)
- var states map[string]string
- var err error
-
- if len(entityIDs) > 0 {
- log.Printf("数据源 %s: 正在请求 %d 个实体: %v", source.Name, len(entityIDs), entityIDs)
- states, err = utils.BatchFetchStates(source.Config, entityIDs)
- if err != nil {
- log.Printf("数据源 %s 获取状态失败: %v\n", source.Name, err)
- return
- }
- log.Printf("数据源 %s: 从 HA 获取到 %d 个状态: %v", source.Name, len(states), states)
- } else {
- states = make(map[string]string)
- }
- // Update Device Status
- for _, d := range devices {
- isOnline := false
- var mapping map[string]string
- b, _ := d.AttributeMapping.MarshalJSON()
- json.Unmarshal(b, &mapping)
- hasBoundEntity := false
- for k, entityID := range mapping {
- if len(k) > 8 && k[len(k)-8:] == "_formula" {
- continue
- }
- if entityID != "" {
- hasBoundEntity = true
- if _, ok := states[entityID]; ok {
- isOnline = true
- break
- }
- }
- }
- targetStatus := d.Status
- if hasBoundEntity {
- if isOnline {
- targetStatus = "ONLINE"
- } else {
- targetStatus = "OFFLINE"
- }
- } else {
- targetStatus = "ONLINE"
- }
- if d.Status != targetStatus {
- models.DB.Model(&d).Update("Status", targetStatus)
- }
- }
- now := time.Now()
- var toInsert []db.ReadingRecord
- // Helper to collect (batch insert at end of cycle)
- saveMetric := func(deviceID, metric string, val float64, locID string) {
- // Update Cache
- if deviceMetricValues[deviceID] == nil {
- deviceMetricValues[deviceID] = make(map[string]float64)
- }
- deviceMetricValues[deviceID][metric] = val
- toInsert = append(toInsert, db.ReadingRecord{
- DeviceID: deviceID, Metric: metric, Value: val, LocationID: locID, Ts: now,
- })
- if s.IsDebug() {
- log.Printf("调试: 已收集 设备=%s 指标=%s 值=%.2f", deviceID, metric, val)
- }
- // Check Alarms Async (before batch write)
- if GlobalAlarmService != nil {
- GlobalAlarmService.CheckRules(deviceID, metric, val)
- }
- }
- // 3. Process Real Entities
- for entityID, valStr := range states {
- // Parse
- val, err := strconv.ParseFloat(valStr, 64)
- if err != nil {
- // Boolean handling for Switch
- lower := strings.ToLower(valStr)
- if lower == "on" || lower == "true" {
- val = 1.0
- } else if lower == "off" || lower == "false" {
- val = 0.0
- } else {
- if s.IsDebug() {
- log.Printf("调试: 实体 %s 解析数值失败 (原始值=%s): %v. 跳过.", entityID, valStr, err)
- }
- continue
- }
- }
- targets := requestMap[entityID]
- for _, t := range targets {
- finalVal := val
- // Apply Formula (e.g. calibration)
- if t.Formula != "" {
- if res, err := utils.EvaluateFormula(t.Formula, val); err == nil {
- if s.IsDebug() {
- log.Printf("调试: 应用公式 [%s] 设备=%s 指标=%s: %.2f -> %.2f", t.Formula, t.DeviceID, t.Metric, val, res)
- }
- finalVal = res
- } else {
- log.Printf("调试: 公式计算错误 [%s] 设备=%s 指标=%s: %v. 使用原始值: %.2f", t.Formula, t.DeviceID, t.Metric, err, val)
- }
- }
- saveMetric(t.DeviceID, t.Metric, finalVal, t.LocationID)
- }
- }
- // 4. Process Virtual/Calculated Tasks (Ordered)
-
- // Pass 1: Base Virtual Values (Fx only, e.g. Virtual Voltage)
- for _, task := range calcTasks {
- if task.Formula != "" && (task.Metric == "voltage" || task.Metric == "current") {
- // Formula with x=0 (constant or time-based if supported, currently constant)
- if res, err := utils.EvaluateFormula(task.Formula, 0); err == nil {
- if s.IsDebug() {
- log.Printf("调试: 虚拟指标 [%s] 设备=%s: 公式 [%s] (x=0) -> %.2f", task.Metric, task.DeviceID, task.Formula, res)
- }
- saveMetric(task.DeviceID, task.Metric, res, task.LocationID)
- } else {
- log.Printf("调试: 虚拟指标计算错误 [%s] 设备=%s: 公式 [%s] 失败: %v", task.Metric, task.DeviceID, task.Formula, err)
- }
- }
- }
- // Pass 2: Power (P = V * I)
- for _, task := range calcTasks {
- if task.Metric == "power" {
- // Check Switch Status
- if deviceHasSwitch[task.DeviceID] {
- swVal, ok := deviceMetricValues[task.DeviceID]["switch"]
- // If switch is OFF (0) or Unknown (!ok), skip calculation
- if !ok || swVal != 1 {
- continue
- }
- }
- metrics := deviceMetricValues[task.DeviceID]
- v, hasV := metrics["voltage"]
- c, hasC := metrics["current"]
-
- if hasV && hasC {
- power := v * c
- saveMetric(task.DeviceID, "power", power, task.LocationID)
- }
- }
- }
- // Pass 3: Energy (E += P * t)
- for _, task := range calcTasks {
- if task.Metric == "energy" {
- // Check Switch Status
- if deviceHasSwitch[task.DeviceID] {
- swVal, ok := deviceMetricValues[task.DeviceID]["switch"]
- // If switch is OFF (0) or Unknown (!ok), skip calculation
- if !ok || swVal != 1 {
- continue
- }
- }
- metrics := deviceMetricValues[task.DeviceID]
- p, hasP := metrics["power"]
-
- if hasP {
- // Interval is dynamic
- hours := s.IntervalSeconds / 3600.0
- increment := (p / 1000.0) * hours // W -> kWh
-
- // Get previous value
- lastVal := 0.0
- lastData, err := db.GetLatestDeviceData(task.DeviceID)
- if err == nil {
- for _, ld := range lastData {
- if ld.Metric == "energy" {
- lastVal = ld.Value
- break
- }
- }
- }
-
- newEnergy := lastVal + increment
- saveMetric(task.DeviceID, "energy", newEnergy, task.LocationID)
- }
- }
- }
- // Async write to TDengine (non-blocking when GlobalTDengineWriter is available)
- count := 0
- if len(toInsert) > 0 {
- if GlobalTDengineWriter != nil {
- GlobalTDengineWriter.Write(toInsert)
- count = len(toInsert)
- } else {
- if err := db.BatchInsertReadings(toInsert); err != nil {
- log.Printf("数据库错误: 批量写入失败 %d 条: %v", len(toInsert), err)
- } else {
- count = len(toInsert)
- }
- }
- }
- duration := time.Since(start)
- log.Printf("Source %s: Collected %d data points for %d devices in %v", source.Name, count, len(devices), duration)
-
- if duration.Seconds() > s.IntervalSeconds {
- log.Printf("PERFORMANCE WARNING: Collection for source %s took %v, which is longer than interval %.1fs", source.Name, duration, s.IntervalSeconds)
- }
- }
|