collector.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498
  1. package services
  2. import (
  3. "ems-backend/db"
  4. "ems-backend/models"
  5. "ems-backend/utils"
  6. "encoding/json"
  7. "log"
  8. "strconv"
  9. "time"
  10. "strings"
  11. "sync"
  12. "github.com/robfig/cron/v3"
  13. )
  14. var GlobalCollector *CollectorService
  15. type CollectorService struct {
  16. cron *cron.Cron
  17. DebugMode bool
  18. mu sync.RWMutex
  19. IntervalSeconds float64
  20. }
  21. func NewCollectorService() *CollectorService {
  22. GlobalCollector = &CollectorService{
  23. cron: cron.New(cron.WithSeconds()),
  24. IntervalSeconds: 5.0,
  25. }
  26. return GlobalCollector
  27. }
  28. func (s *CollectorService) SetDebugMode(enabled bool) {
  29. s.mu.Lock()
  30. defer s.mu.Unlock()
  31. s.DebugMode = enabled
  32. log.Printf("Collector Debug Mode set to: %v", enabled)
  33. }
  34. func (s *CollectorService) IsDebug() bool {
  35. s.mu.RLock()
  36. defer s.mu.RUnlock()
  37. return s.DebugMode
  38. }
  39. func (s *CollectorService) Start() {
  40. // 1. 尝试从数据库加载配置
  41. var config models.SysConfig
  42. spec := "*/5 * * * * *" // 默认 5秒
  43. if err := models.DB.Where("config_key = ?", "collection_frequency").First(&config).Error; err == nil {
  44. if config.ConfigValue != "" {
  45. spec = config.ConfigValue
  46. }
  47. } else {
  48. // 如果不存在,初始化默认值到数据库
  49. models.DB.Create(&models.SysConfig{
  50. ConfigKey: "collection_frequency",
  51. ConfigValue: spec,
  52. ConfigType: "Y",
  53. Remark: "数据采集频率 (Cron表达式, 默认: */5 * * * * *)",
  54. })
  55. }
  56. s.updateIntervalFromSpec(spec)
  57. _, err := s.cron.AddFunc(spec, s.collectJob)
  58. if err != nil {
  59. log.Printf("配置的频率格式错误 '%s', 使用默认值. Error: %v", spec, err)
  60. spec = "*/5 * * * * *"
  61. s.updateIntervalFromSpec(spec)
  62. s.cron.AddFunc(spec, s.collectJob)
  63. }
  64. s.cron.Start()
  65. log.Printf("Data Collector Service started (spec: %s, interval: %.1fs)", spec, s.IntervalSeconds)
  66. }
  67. // 新增: 重启服务以应用新频率
  68. func (s *CollectorService) Restart(spec string) error {
  69. s.mu.Lock()
  70. defer s.mu.Unlock()
  71. s.cron.Stop()
  72. // 重新创建 Cron 实例以清除旧任务
  73. s.cron = cron.New(cron.WithSeconds())
  74. _, err := s.cron.AddFunc(spec, s.collectJob)
  75. if err != nil {
  76. // 如果新规则错误,恢复默认并返回错误
  77. log.Printf("Invalid cron spec '%s', reverting to default: %v", spec, err)
  78. defaultSpec := "*/5 * * * * *"
  79. s.updateIntervalFromSpec(defaultSpec)
  80. s.cron.AddFunc(defaultSpec, s.collectJob)
  81. s.cron.Start()
  82. return err
  83. }
  84. s.updateIntervalFromSpec(spec)
  85. s.cron.Start()
  86. log.Printf("Data Collector restarted with new spec: %s, interval: %.1fs", spec, s.IntervalSeconds)
  87. return nil
  88. }
  89. func (s *CollectorService) updateIntervalFromSpec(spec string) {
  90. // 简单解析 */N,如果失败则保留默认 5.0
  91. // 仅支持 */N * * * * * 格式的秒级解析,其他情况默认5s (或者不变更)
  92. if strings.HasPrefix(spec, "*/") {
  93. parts := strings.Split(spec, " ")
  94. if len(parts) > 0 {
  95. divParts := strings.Split(parts[0], "/")
  96. if len(divParts) == 2 {
  97. if val, err := strconv.ParseFloat(divParts[1], 64); err == nil {
  98. s.IntervalSeconds = val
  99. return
  100. }
  101. }
  102. }
  103. }
  104. // 如果无法解析,或者是 "0 * ..." 等格式,暂时默认5s,或者保持不变
  105. // 为了安全起见,如果不确定,设置为 5.0
  106. // 实际生产中应该记录上次采集时间来计算 Delta
  107. s.IntervalSeconds = 5.0
  108. }
  109. func (s *CollectorService) Stop() {
  110. s.cron.Stop()
  111. }
  112. func (s *CollectorService) collectJob() {
  113. log.Println("Starting collection cycle...")
  114. // 1. Fetch all active devices with source
  115. var devices []models.Device
  116. if err := models.DB.Where("status != ?", "INACTIVE").Find(&devices).Error; err != nil {
  117. log.Printf("Error fetching devices: %v\n", err)
  118. return
  119. }
  120. if len(devices) == 0 {
  121. return
  122. }
  123. // 2. Group by SourceID
  124. deviceGroups := make(map[string][]models.Device)
  125. for _, d := range devices {
  126. if d.SourceID.String() != "" && d.SourceID.String() != "00000000-0000-0000-0000-000000000000" {
  127. deviceGroups[d.SourceID.String()] = append(deviceGroups[d.SourceID.String()], d)
  128. }
  129. }
  130. // 3. Process each group
  131. for sourceID, devs := range deviceGroups {
  132. go s.processSourceGroup(sourceID, devs)
  133. }
  134. }
  135. func (s *CollectorService) processSourceGroup(sourceID string, devices []models.Device) {
  136. // Fetch Source Config
  137. var source models.IntegrationSource
  138. if err := models.DB.First(&source, "id = ?", sourceID).Error; err != nil {
  139. log.Printf("Source %s not found: %v\n", sourceID, err)
  140. return
  141. }
  142. if source.DriverType != "HOME_ASSISTANT" {
  143. return
  144. }
  145. // 1. Prepare Request
  146. entityIDs := make([]string, 0)
  147. // Helper structs
  148. type Target struct {
  149. DeviceID string
  150. Metric string
  151. LocationID string
  152. Formula string
  153. }
  154. // deviceID -> metric -> value (Cache for calculation)
  155. deviceMetricValues := make(map[string]map[string]float64)
  156. requestMap := make(map[string][]Target)
  157. // deviceID -> hasSwitch (Track if device has a switch configured)
  158. deviceHasSwitch := make(map[string]bool)
  159. // Virtual calculation tasks
  160. type CalcTask struct {
  161. DeviceID string
  162. Metric string
  163. LocationID string
  164. Formula string // For pure formula virtual values
  165. }
  166. calcTasks := make([]CalcTask, 0)
  167. for _, d := range devices {
  168. // Init cache
  169. deviceMetricValues[d.ID.String()] = make(map[string]float64)
  170. var mapping map[string]string
  171. b, _ := d.AttributeMapping.MarshalJSON()
  172. json.Unmarshal(b, &mapping)
  173. locID := ""
  174. if d.LocationID != nil {
  175. locID = d.LocationID.String()
  176. }
  177. // --- Rule Validation & Task Generation ---
  178. // 1. Handle Explicit Mappings (Switch, Voltage, Current, etc.)
  179. for metric, entityID := range mapping {
  180. // Skip formula definition keys
  181. if len(metric) > 8 && metric[len(metric)-8:] == "_formula" {
  182. continue
  183. }
  184. // Switch must have bound entity
  185. if metric == "switch" && entityID == "" {
  186. // log.Printf("Warning: Device %s has 'switch' metric but no entity bound.", d.Name)
  187. continue
  188. }
  189. // Record if switch is mapped
  190. if metric == "switch" && entityID != "" {
  191. deviceHasSwitch[d.ID.String()] = true
  192. }
  193. // If configured with entity ID, add to collection queue
  194. if entityID != "" {
  195. if _, exists := requestMap[entityID]; !exists {
  196. entityIDs = append(entityIDs, entityID)
  197. }
  198. requestMap[entityID] = append(requestMap[entityID], Target{
  199. DeviceID: d.ID.String(),
  200. Metric: metric,
  201. LocationID: locID,
  202. Formula: mapping[metric+"_formula"], // Optional correction formula
  203. })
  204. }
  205. }
  206. // 2. Handle Electric Device Logic (Voltage/Current -> Power -> Energy)
  207. // We use "ELECTRIC" string literal matching the schema comment
  208. // Note: Schema says "ELECTRIC", user said "电力设备" (Electric Equipment)
  209. if d.DeviceType == "ELECTRIC" || d.DeviceType == "METER" || d.DeviceType == "Electric" {
  210. // Check Voltage
  211. hasVoltageEntity := mapping["voltage"] != ""
  212. voltageFormula := mapping["voltage_formula"]
  213. if !hasVoltageEntity {
  214. // If no entity, must have formula (Virtual Value)
  215. if voltageFormula != "" {
  216. calcTasks = append(calcTasks, CalcTask{
  217. DeviceID: d.ID.String(), Metric: "voltage", LocationID: locID, Formula: voltageFormula,
  218. })
  219. } else {
  220. // log.Printf("Warning: Device %s (Electric) missing 'voltage' source (Entity or Formula required).", d.Name)
  221. }
  222. }
  223. // Check Current
  224. hasCurrentEntity := mapping["current"] != ""
  225. currentFormula := mapping["current_formula"]
  226. if !hasCurrentEntity {
  227. if currentFormula != "" {
  228. calcTasks = append(calcTasks, CalcTask{
  229. DeviceID: d.ID.String(), Metric: "current", LocationID: locID, Formula: currentFormula,
  230. })
  231. } else {
  232. // log.Printf("Warning: Device %s (Electric) missing 'current' source.", d.Name)
  233. }
  234. }
  235. // Auto Add Power and Energy Calculation Tasks (Always try to calculate if missing or as shadow)
  236. // Strategy: If user mapped "power", we fetched it above.
  237. // If we also add a calc task here, we might overwrite it or fill it if fetch failed.
  238. // Let's only add calc task if NO entity mapped, OR if we want to enforce consistency.
  239. // User query: "active_power and cumulative_power are repetitive... user calculates power and energy"
  240. // Implicitly, if they have an entity for power, they map it.
  241. // If they don't, we calculate.
  242. if mapping["power"] == "" {
  243. calcTasks = append(calcTasks, CalcTask{
  244. DeviceID: d.ID.String(), Metric: "power", LocationID: locID,
  245. })
  246. }
  247. if mapping["energy"] == "" {
  248. calcTasks = append(calcTasks, CalcTask{
  249. DeviceID: d.ID.String(), Metric: "energy", LocationID: locID,
  250. })
  251. }
  252. }
  253. }
  254. if len(entityIDs) == 0 && len(calcTasks) == 0 {
  255. log.Printf("数据源 %s: 未找到采集任务 (未映射实体且无虚拟任务)", source.Name)
  256. return
  257. }
  258. // 2. Fetch Data (Real World)
  259. var states map[string]string
  260. var err error
  261. if len(entityIDs) > 0 {
  262. log.Printf("数据源 %s: 正在请求 %d 个实体: %v", source.Name, len(entityIDs), entityIDs)
  263. states, err = utils.BatchFetchStates(source.Config, entityIDs)
  264. if err != nil {
  265. log.Printf("数据源 %s 获取状态失败: %v\n", source.Name, err)
  266. return
  267. }
  268. log.Printf("数据源 %s: 从 HA 获取到 %d 个状态: %v", source.Name, len(states), states)
  269. } else {
  270. states = make(map[string]string)
  271. }
  272. // Update Device Status
  273. for _, d := range devices {
  274. isOnline := false
  275. var mapping map[string]string
  276. b, _ := d.AttributeMapping.MarshalJSON()
  277. json.Unmarshal(b, &mapping)
  278. hasBoundEntity := false
  279. for k, entityID := range mapping {
  280. if len(k) > 8 && k[len(k)-8:] == "_formula" {
  281. continue
  282. }
  283. if entityID != "" {
  284. hasBoundEntity = true
  285. if _, ok := states[entityID]; ok {
  286. isOnline = true
  287. break
  288. }
  289. }
  290. }
  291. targetStatus := d.Status
  292. if hasBoundEntity {
  293. if isOnline {
  294. targetStatus = "ONLINE"
  295. } else {
  296. targetStatus = "OFFLINE"
  297. }
  298. } else {
  299. targetStatus = "ONLINE"
  300. }
  301. if d.Status != targetStatus {
  302. models.DB.Model(&d).Update("Status", targetStatus)
  303. }
  304. }
  305. now := time.Now()
  306. count := 0
  307. // Helper to Save
  308. saveMetric := func(deviceID, metric string, val float64, locID string) {
  309. // Update Cache
  310. if deviceMetricValues[deviceID] == nil {
  311. deviceMetricValues[deviceID] = make(map[string]float64)
  312. }
  313. deviceMetricValues[deviceID][metric] = val
  314. // DB Insert
  315. if err := db.InsertReading(deviceID, metric, val, locID, now); err == nil {
  316. count++
  317. if s.IsDebug() {
  318. log.Printf("调试: 已保存 设备=%s 指标=%s 值=%.2f", deviceID, metric, val)
  319. }
  320. // Check Alarms Async
  321. if GlobalAlarmService != nil {
  322. go GlobalAlarmService.CheckRules(deviceID, metric, val)
  323. }
  324. } else {
  325. log.Printf("数据库错误: 保存失败 设备=%s 指标=%s 值=%.2f: %v", deviceID, metric, val, err)
  326. }
  327. }
  328. // 3. Process Real Entities
  329. for entityID, valStr := range states {
  330. // Parse
  331. val, err := strconv.ParseFloat(valStr, 64)
  332. if err != nil {
  333. // Boolean handling for Switch
  334. lower := strings.ToLower(valStr)
  335. if lower == "on" || lower == "true" {
  336. val = 1.0
  337. } else if lower == "off" || lower == "false" {
  338. val = 0.0
  339. } else {
  340. if s.IsDebug() {
  341. log.Printf("调试: 实体 %s 解析数值失败 (原始值=%s): %v. 跳过.", entityID, valStr, err)
  342. }
  343. continue
  344. }
  345. }
  346. targets := requestMap[entityID]
  347. for _, t := range targets {
  348. finalVal := val
  349. // Apply Formula (e.g. calibration)
  350. if t.Formula != "" {
  351. if res, err := utils.EvaluateFormula(t.Formula, val); err == nil {
  352. if s.IsDebug() {
  353. log.Printf("调试: 应用公式 [%s] 设备=%s 指标=%s: %.2f -> %.2f", t.Formula, t.DeviceID, t.Metric, val, res)
  354. }
  355. finalVal = res
  356. } else {
  357. log.Printf("调试: 公式计算错误 [%s] 设备=%s 指标=%s: %v. 使用原始值: %.2f", t.Formula, t.DeviceID, t.Metric, err, val)
  358. }
  359. }
  360. saveMetric(t.DeviceID, t.Metric, finalVal, t.LocationID)
  361. }
  362. }
  363. // 4. Process Virtual/Calculated Tasks (Ordered)
  364. // Pass 1: Base Virtual Values (Fx only, e.g. Virtual Voltage)
  365. for _, task := range calcTasks {
  366. if task.Formula != "" && (task.Metric == "voltage" || task.Metric == "current") {
  367. // Formula with x=0 (constant or time-based if supported, currently constant)
  368. if res, err := utils.EvaluateFormula(task.Formula, 0); err == nil {
  369. if s.IsDebug() {
  370. log.Printf("调试: 虚拟指标 [%s] 设备=%s: 公式 [%s] (x=0) -> %.2f", task.Metric, task.DeviceID, task.Formula, res)
  371. }
  372. saveMetric(task.DeviceID, task.Metric, res, task.LocationID)
  373. } else {
  374. log.Printf("调试: 虚拟指标计算错误 [%s] 设备=%s: 公式 [%s] 失败: %v", task.Metric, task.DeviceID, task.Formula, err)
  375. }
  376. }
  377. }
  378. // Pass 2: Power (P = V * I)
  379. for _, task := range calcTasks {
  380. if task.Metric == "power" {
  381. // Check Switch Status
  382. if deviceHasSwitch[task.DeviceID] {
  383. swVal, ok := deviceMetricValues[task.DeviceID]["switch"]
  384. // If switch is OFF (0) or Unknown (!ok), skip calculation
  385. if !ok || swVal != 1 {
  386. continue
  387. }
  388. }
  389. metrics := deviceMetricValues[task.DeviceID]
  390. v, hasV := metrics["voltage"]
  391. c, hasC := metrics["current"]
  392. if hasV && hasC {
  393. power := v * c
  394. saveMetric(task.DeviceID, "power", power, task.LocationID)
  395. }
  396. }
  397. }
  398. // Pass 3: Energy (E += P * t)
  399. for _, task := range calcTasks {
  400. if task.Metric == "energy" {
  401. // Check Switch Status
  402. if deviceHasSwitch[task.DeviceID] {
  403. swVal, ok := deviceMetricValues[task.DeviceID]["switch"]
  404. // If switch is OFF (0) or Unknown (!ok), skip calculation
  405. if !ok || swVal != 1 {
  406. continue
  407. }
  408. }
  409. metrics := deviceMetricValues[task.DeviceID]
  410. p, hasP := metrics["power"]
  411. if hasP {
  412. // Interval is dynamic
  413. hours := s.IntervalSeconds / 3600.0
  414. increment := (p / 1000.0) * hours // W -> kWh
  415. // Get previous value
  416. lastVal := 0.0
  417. lastData, err := db.GetLatestDeviceData(task.DeviceID)
  418. if err == nil {
  419. for _, ld := range lastData {
  420. if ld.Metric == "energy" {
  421. lastVal = ld.Value
  422. break
  423. }
  424. }
  425. }
  426. newEnergy := lastVal + increment
  427. saveMetric(task.DeviceID, "energy", newEnergy, task.LocationID)
  428. }
  429. }
  430. }
  431. log.Printf("Source %s: Collected %d data points for %d devices", source.Name, count, len(devices))
  432. }