collector.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526
  1. package services
  2. import (
  3. "ems-backend/db"
  4. "ems-backend/models"
  5. "ems-backend/utils"
  6. "encoding/json"
  7. "log"
  8. "strconv"
  9. "time"
  10. "strings"
  11. "sync"
  12. "github.com/robfig/cron/v3"
  13. )
  14. var GlobalCollector *CollectorService
  15. type CollectorService struct {
  16. cron *cron.Cron
  17. DebugMode bool
  18. mu sync.RWMutex
  19. IntervalSeconds float64
  20. runningSources sync.Map
  21. }
  22. func NewCollectorService() *CollectorService {
  23. GlobalCollector = &CollectorService{
  24. cron: cron.New(cron.WithSeconds()),
  25. IntervalSeconds: 5.0,
  26. }
  27. return GlobalCollector
  28. }
  29. func (s *CollectorService) SetDebugMode(enabled bool) {
  30. s.mu.Lock()
  31. defer s.mu.Unlock()
  32. s.DebugMode = enabled
  33. log.Printf("Collector Debug Mode set to: %v", enabled)
  34. }
  35. func (s *CollectorService) IsDebug() bool {
  36. s.mu.RLock()
  37. defer s.mu.RUnlock()
  38. return s.DebugMode
  39. }
  40. func (s *CollectorService) Start() {
  41. // 1. 尝试从数据库加载配置
  42. var config models.SysConfig
  43. spec := "*/5 * * * * *" // 默认 5秒
  44. if err := models.DB.Where("config_key = ?", "collection_frequency").First(&config).Error; err == nil {
  45. if config.ConfigValue != "" {
  46. spec = config.ConfigValue
  47. }
  48. } else {
  49. // 如果不存在,初始化默认值到数据库
  50. models.DB.Create(&models.SysConfig{
  51. ConfigKey: "collection_frequency",
  52. ConfigValue: spec,
  53. ConfigType: "Y",
  54. Remark: "数据采集频率 (Cron表达式, 默认: */5 * * * * *)",
  55. })
  56. }
  57. s.updateIntervalFromSpec(spec)
  58. _, err := s.cron.AddFunc(spec, s.collectJob)
  59. if err != nil {
  60. log.Printf("配置的频率格式错误 '%s', 使用默认值. Error: %v", spec, err)
  61. spec = "*/5 * * * * *"
  62. s.updateIntervalFromSpec(spec)
  63. s.cron.AddFunc(spec, s.collectJob)
  64. }
  65. s.cron.Start()
  66. log.Printf("Data Collector Service started (spec: %s, interval: %.1fs)", spec, s.IntervalSeconds)
  67. }
  68. // 新增: 重启服务以应用新频率
  69. func (s *CollectorService) Restart(spec string) error {
  70. s.mu.Lock()
  71. defer s.mu.Unlock()
  72. s.cron.Stop()
  73. // 重新创建 Cron 实例以清除旧任务
  74. s.cron = cron.New(cron.WithSeconds())
  75. _, err := s.cron.AddFunc(spec, s.collectJob)
  76. if err != nil {
  77. // 如果新规则错误,恢复默认并返回错误
  78. log.Printf("Invalid cron spec '%s', reverting to default: %v", spec, err)
  79. defaultSpec := "*/5 * * * * *"
  80. s.updateIntervalFromSpec(defaultSpec)
  81. s.cron.AddFunc(defaultSpec, s.collectJob)
  82. s.cron.Start()
  83. return err
  84. }
  85. s.updateIntervalFromSpec(spec)
  86. s.cron.Start()
  87. log.Printf("Data Collector restarted with new spec: %s, interval: %.1fs", spec, s.IntervalSeconds)
  88. return nil
  89. }
  90. func (s *CollectorService) updateIntervalFromSpec(spec string) {
  91. // 简单解析 */N,如果失败则保留默认 5.0
  92. // 仅支持 */N * * * * * 格式的秒级解析,其他情况默认5s (或者不变更)
  93. if strings.HasPrefix(spec, "*/") {
  94. parts := strings.Split(spec, " ")
  95. if len(parts) > 0 {
  96. divParts := strings.Split(parts[0], "/")
  97. if len(divParts) == 2 {
  98. if val, err := strconv.ParseFloat(divParts[1], 64); err == nil {
  99. s.IntervalSeconds = val
  100. return
  101. }
  102. }
  103. }
  104. }
  105. // 如果无法解析,或者是 "0 * ..." 等格式,暂时默认5s,或者保持不变
  106. // 为了安全起见,如果不确定,设置为 5.0
  107. // 实际生产中应该记录上次采集时间来计算 Delta
  108. s.IntervalSeconds = 5.0
  109. }
  110. func (s *CollectorService) Stop() {
  111. s.cron.Stop()
  112. }
  113. func (s *CollectorService) collectJob() {
  114. log.Println("Starting collection cycle...")
  115. // 1. Fetch all active devices with source
  116. var devices []models.Device
  117. if err := models.DB.Where("status != ?", "INACTIVE").Find(&devices).Error; err != nil {
  118. log.Printf("Error fetching devices: %v\n", err)
  119. return
  120. }
  121. if len(devices) == 0 {
  122. return
  123. }
  124. // 2. Group by SourceID
  125. deviceGroups := make(map[string][]models.Device)
  126. for _, d := range devices {
  127. if d.SourceID.String() != "" && d.SourceID.String() != "00000000-0000-0000-0000-000000000000" {
  128. deviceGroups[d.SourceID.String()] = append(deviceGroups[d.SourceID.String()], d)
  129. }
  130. }
  131. // 3. Process each group
  132. for sourceID, devs := range deviceGroups {
  133. // 并发控制:如果该源的上一次采集还在运行,则跳过本次
  134. if _, loaded := s.runningSources.LoadOrStore(sourceID, true); loaded {
  135. log.Printf("Warning: Collection for source %s is still running, skipping this cycle.", sourceID)
  136. continue
  137. }
  138. go s.processSourceGroup(sourceID, devs)
  139. }
  140. }
  141. func (s *CollectorService) processSourceGroup(sourceID string, devices []models.Device) {
  142. // 任务结束时移除标记
  143. defer s.runningSources.Delete(sourceID)
  144. // 记录开始时间
  145. start := time.Now()
  146. // Fetch Source Config
  147. var source models.IntegrationSource
  148. if err := models.DB.First(&source, "id = ?", sourceID).Error; err != nil {
  149. log.Printf("Source %s not found: %v\n", sourceID, err)
  150. return
  151. }
  152. if source.DriverType != "HOME_ASSISTANT" {
  153. return
  154. }
  155. // 1. Prepare Request
  156. entityIDs := make([]string, 0)
  157. // Helper structs
  158. type Target struct {
  159. DeviceID string
  160. Metric string
  161. LocationID string
  162. Formula string
  163. }
  164. // deviceID -> metric -> value (Cache for calculation)
  165. deviceMetricValues := make(map[string]map[string]float64)
  166. requestMap := make(map[string][]Target)
  167. // deviceID -> hasSwitch (Track if device has a switch configured)
  168. deviceHasSwitch := make(map[string]bool)
  169. // Virtual calculation tasks
  170. type CalcTask struct {
  171. DeviceID string
  172. Metric string
  173. LocationID string
  174. Formula string // For pure formula virtual values
  175. }
  176. calcTasks := make([]CalcTask, 0)
  177. for _, d := range devices {
  178. // Init cache
  179. deviceMetricValues[d.ID.String()] = make(map[string]float64)
  180. var mapping map[string]string
  181. b, _ := d.AttributeMapping.MarshalJSON()
  182. json.Unmarshal(b, &mapping)
  183. locID := ""
  184. if d.LocationID != nil {
  185. locID = d.LocationID.String()
  186. }
  187. // --- Rule Validation & Task Generation ---
  188. // 1. Handle Explicit Mappings (Switch, Voltage, Current, etc.)
  189. for metric, entityID := range mapping {
  190. // Skip formula definition keys
  191. if len(metric) > 8 && metric[len(metric)-8:] == "_formula" {
  192. continue
  193. }
  194. // Switch must have bound entity
  195. if metric == "switch" && entityID == "" {
  196. // log.Printf("Warning: Device %s has 'switch' metric but no entity bound.", d.Name)
  197. continue
  198. }
  199. // Record if switch is mapped
  200. if metric == "switch" && entityID != "" {
  201. deviceHasSwitch[d.ID.String()] = true
  202. }
  203. // If configured with entity ID, add to collection queue
  204. if entityID != "" {
  205. if _, exists := requestMap[entityID]; !exists {
  206. entityIDs = append(entityIDs, entityID)
  207. }
  208. requestMap[entityID] = append(requestMap[entityID], Target{
  209. DeviceID: d.ID.String(),
  210. Metric: metric,
  211. LocationID: locID,
  212. Formula: mapping[metric+"_formula"], // Optional correction formula
  213. })
  214. }
  215. }
  216. // 2. Handle Electric Device Logic (Voltage/Current -> Power -> Energy)
  217. // We use "ELECTRIC" string literal matching the schema comment
  218. // Note: Schema says "ELECTRIC", user said "电力设备" (Electric Equipment)
  219. if d.DeviceType == "ELECTRIC" || d.DeviceType == "METER" || d.DeviceType == "Electric" {
  220. // Check Voltage
  221. hasVoltageEntity := mapping["voltage"] != ""
  222. voltageFormula := mapping["voltage_formula"]
  223. if !hasVoltageEntity {
  224. // If no entity, must have formula (Virtual Value)
  225. if voltageFormula != "" {
  226. calcTasks = append(calcTasks, CalcTask{
  227. DeviceID: d.ID.String(), Metric: "voltage", LocationID: locID, Formula: voltageFormula,
  228. })
  229. } else {
  230. // log.Printf("Warning: Device %s (Electric) missing 'voltage' source (Entity or Formula required).", d.Name)
  231. }
  232. }
  233. // Check Current
  234. hasCurrentEntity := mapping["current"] != ""
  235. currentFormula := mapping["current_formula"]
  236. if !hasCurrentEntity {
  237. if currentFormula != "" {
  238. calcTasks = append(calcTasks, CalcTask{
  239. DeviceID: d.ID.String(), Metric: "current", LocationID: locID, Formula: currentFormula,
  240. })
  241. } else {
  242. // log.Printf("Warning: Device %s (Electric) missing 'current' source.", d.Name)
  243. }
  244. }
  245. // Auto Add Power and Energy Calculation Tasks (Always try to calculate if missing or as shadow)
  246. // Strategy: If user mapped "power", we fetched it above.
  247. // If we also add a calc task here, we might overwrite it or fill it if fetch failed.
  248. // Let's only add calc task if NO entity mapped, OR if we want to enforce consistency.
  249. // User query: "active_power and cumulative_power are repetitive... user calculates power and energy"
  250. // Implicitly, if they have an entity for power, they map it.
  251. // If they don't, we calculate.
  252. if mapping["power"] == "" {
  253. calcTasks = append(calcTasks, CalcTask{
  254. DeviceID: d.ID.String(), Metric: "power", LocationID: locID,
  255. })
  256. }
  257. if mapping["energy"] == "" {
  258. calcTasks = append(calcTasks, CalcTask{
  259. DeviceID: d.ID.String(), Metric: "energy", LocationID: locID,
  260. })
  261. }
  262. }
  263. }
  264. if len(entityIDs) == 0 && len(calcTasks) == 0 {
  265. log.Printf("数据源 %s: 未找到采集任务 (未映射实体且无虚拟任务)", source.Name)
  266. return
  267. }
  268. // 2. Fetch Data (Real World)
  269. var states map[string]string
  270. var err error
  271. if len(entityIDs) > 0 {
  272. log.Printf("数据源 %s: 正在请求 %d 个实体: %v", source.Name, len(entityIDs), entityIDs)
  273. states, err = utils.BatchFetchStates(source.Config, entityIDs)
  274. if err != nil {
  275. log.Printf("数据源 %s 获取状态失败: %v\n", source.Name, err)
  276. return
  277. }
  278. log.Printf("数据源 %s: 从 HA 获取到 %d 个状态: %v", source.Name, len(states), states)
  279. } else {
  280. states = make(map[string]string)
  281. }
  282. // Update Device Status
  283. for _, d := range devices {
  284. isOnline := false
  285. var mapping map[string]string
  286. b, _ := d.AttributeMapping.MarshalJSON()
  287. json.Unmarshal(b, &mapping)
  288. hasBoundEntity := false
  289. for k, entityID := range mapping {
  290. if len(k) > 8 && k[len(k)-8:] == "_formula" {
  291. continue
  292. }
  293. if entityID != "" {
  294. hasBoundEntity = true
  295. if _, ok := states[entityID]; ok {
  296. isOnline = true
  297. break
  298. }
  299. }
  300. }
  301. targetStatus := d.Status
  302. if hasBoundEntity {
  303. if isOnline {
  304. targetStatus = "ONLINE"
  305. } else {
  306. targetStatus = "OFFLINE"
  307. }
  308. } else {
  309. targetStatus = "ONLINE"
  310. }
  311. if d.Status != targetStatus {
  312. models.DB.Model(&d).Update("Status", targetStatus)
  313. }
  314. }
  315. now := time.Now()
  316. var toInsert []db.ReadingRecord
  317. // Helper to collect (batch insert at end of cycle)
  318. saveMetric := func(deviceID, metric string, val float64, locID string) {
  319. // Update Cache
  320. if deviceMetricValues[deviceID] == nil {
  321. deviceMetricValues[deviceID] = make(map[string]float64)
  322. }
  323. deviceMetricValues[deviceID][metric] = val
  324. toInsert = append(toInsert, db.ReadingRecord{
  325. DeviceID: deviceID, Metric: metric, Value: val, LocationID: locID, Ts: now,
  326. })
  327. if s.IsDebug() {
  328. log.Printf("调试: 已收集 设备=%s 指标=%s 值=%.2f", deviceID, metric, val)
  329. }
  330. // Check Alarms Async (before batch write)
  331. if GlobalAlarmService != nil {
  332. GlobalAlarmService.CheckRules(deviceID, metric, val)
  333. }
  334. }
  335. // 3. Process Real Entities
  336. for entityID, valStr := range states {
  337. // Parse
  338. val, err := strconv.ParseFloat(valStr, 64)
  339. if err != nil {
  340. // Boolean handling for Switch
  341. lower := strings.ToLower(valStr)
  342. if lower == "on" || lower == "true" {
  343. val = 1.0
  344. } else if lower == "off" || lower == "false" {
  345. val = 0.0
  346. } else {
  347. if s.IsDebug() {
  348. log.Printf("调试: 实体 %s 解析数值失败 (原始值=%s): %v. 跳过.", entityID, valStr, err)
  349. }
  350. continue
  351. }
  352. }
  353. targets := requestMap[entityID]
  354. for _, t := range targets {
  355. finalVal := val
  356. // Apply Formula (e.g. calibration)
  357. if t.Formula != "" {
  358. if res, err := utils.EvaluateFormula(t.Formula, val); err == nil {
  359. if s.IsDebug() {
  360. log.Printf("调试: 应用公式 [%s] 设备=%s 指标=%s: %.2f -> %.2f", t.Formula, t.DeviceID, t.Metric, val, res)
  361. }
  362. finalVal = res
  363. } else {
  364. log.Printf("调试: 公式计算错误 [%s] 设备=%s 指标=%s: %v. 使用原始值: %.2f", t.Formula, t.DeviceID, t.Metric, err, val)
  365. }
  366. }
  367. saveMetric(t.DeviceID, t.Metric, finalVal, t.LocationID)
  368. }
  369. }
  370. // 4. Process Virtual/Calculated Tasks (Ordered)
  371. // Pass 1: Base Virtual Values (Fx only, e.g. Virtual Voltage)
  372. for _, task := range calcTasks {
  373. if task.Formula != "" && (task.Metric == "voltage" || task.Metric == "current") {
  374. // Formula with x=0 (constant or time-based if supported, currently constant)
  375. if res, err := utils.EvaluateFormula(task.Formula, 0); err == nil {
  376. if s.IsDebug() {
  377. log.Printf("调试: 虚拟指标 [%s] 设备=%s: 公式 [%s] (x=0) -> %.2f", task.Metric, task.DeviceID, task.Formula, res)
  378. }
  379. saveMetric(task.DeviceID, task.Metric, res, task.LocationID)
  380. } else {
  381. log.Printf("调试: 虚拟指标计算错误 [%s] 设备=%s: 公式 [%s] 失败: %v", task.Metric, task.DeviceID, task.Formula, err)
  382. }
  383. }
  384. }
  385. // Pass 2: Power (P = V * I)
  386. for _, task := range calcTasks {
  387. if task.Metric == "power" {
  388. // Check Switch Status
  389. if deviceHasSwitch[task.DeviceID] {
  390. swVal, ok := deviceMetricValues[task.DeviceID]["switch"]
  391. // If switch is OFF (0) or Unknown (!ok), skip calculation
  392. if !ok || swVal != 1 {
  393. continue
  394. }
  395. }
  396. metrics := deviceMetricValues[task.DeviceID]
  397. v, hasV := metrics["voltage"]
  398. c, hasC := metrics["current"]
  399. if hasV && hasC {
  400. power := v * c
  401. saveMetric(task.DeviceID, "power", power, task.LocationID)
  402. }
  403. }
  404. }
  405. // Pass 3: Energy (E += P * t)
  406. for _, task := range calcTasks {
  407. if task.Metric == "energy" {
  408. // Check Switch Status
  409. if deviceHasSwitch[task.DeviceID] {
  410. swVal, ok := deviceMetricValues[task.DeviceID]["switch"]
  411. // If switch is OFF (0) or Unknown (!ok), skip calculation
  412. if !ok || swVal != 1 {
  413. continue
  414. }
  415. }
  416. metrics := deviceMetricValues[task.DeviceID]
  417. p, hasP := metrics["power"]
  418. if hasP {
  419. // Interval is dynamic
  420. hours := s.IntervalSeconds / 3600.0
  421. increment := (p / 1000.0) * hours // W -> kWh
  422. // Get previous value
  423. lastVal := 0.0
  424. lastData, err := db.GetLatestDeviceData(task.DeviceID)
  425. if err == nil {
  426. for _, ld := range lastData {
  427. if ld.Metric == "energy" {
  428. lastVal = ld.Value
  429. break
  430. }
  431. }
  432. }
  433. newEnergy := lastVal + increment
  434. saveMetric(task.DeviceID, "energy", newEnergy, task.LocationID)
  435. }
  436. }
  437. }
  438. // Async write to TDengine (non-blocking when GlobalTDengineWriter is available)
  439. count := 0
  440. if len(toInsert) > 0 {
  441. if GlobalTDengineWriter != nil {
  442. GlobalTDengineWriter.Write(toInsert)
  443. count = len(toInsert)
  444. } else {
  445. if err := db.BatchInsertReadings(toInsert); err != nil {
  446. log.Printf("数据库错误: 批量写入失败 %d 条: %v", len(toInsert), err)
  447. } else {
  448. count = len(toInsert)
  449. }
  450. }
  451. }
  452. duration := time.Since(start)
  453. log.Printf("Source %s: Collected %d data points for %d devices in %v", source.Name, count, len(devices), duration)
  454. if duration.Seconds() > s.IntervalSeconds {
  455. log.Printf("PERFORMANCE WARNING: Collection for source %s took %v, which is longer than interval %.1fs", source.Name, duration, s.IntervalSeconds)
  456. }
  457. }