collector.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. package services
  2. import (
  3. "ems-backend/db"
  4. "ems-backend/models"
  5. "ems-backend/utils"
  6. "encoding/json"
  7. "log"
  8. "strconv"
  9. "time"
  10. "github.com/robfig/cron/v3"
  11. )
  12. type CollectorService struct {
  13. cron *cron.Cron
  14. }
  15. func NewCollectorService() *CollectorService {
  16. return &CollectorService{
  17. cron: cron.New(cron.WithSeconds()),
  18. }
  19. }
  20. func (s *CollectorService) Start() {
  21. // Run every 1 minute
  22. // Spec: "0 * * * * *" -> Every minute at 00s
  23. // For testing, maybe every 10 seconds? "*/10 * * * * *"
  24. // Let's stick to every minute for production-like behavior, or 30s.
  25. _, err := s.cron.AddFunc("*/30 * * * * *", s.collectJob)
  26. if err != nil {
  27. log.Fatalf("Failed to start collector cron: %v", err)
  28. }
  29. s.cron.Start()
  30. log.Println("Data Collector Service started (interval: 30s)")
  31. }
  32. func (s *CollectorService) Stop() {
  33. s.cron.Stop()
  34. }
  35. func (s *CollectorService) collectJob() {
  36. log.Println("Starting collection cycle...")
  37. // 1. Fetch all active devices with source
  38. var devices []models.Device
  39. if err := models.DB.Where("status != ?", "INACTIVE").Find(&devices).Error; err != nil {
  40. log.Printf("Error fetching devices: %v\n", err)
  41. return
  42. }
  43. if len(devices) == 0 {
  44. return
  45. }
  46. // 2. Group by SourceID
  47. deviceGroups := make(map[string][]models.Device)
  48. for _, d := range devices {
  49. if d.SourceID.String() != "" && d.SourceID.String() != "00000000-0000-0000-0000-000000000000" {
  50. deviceGroups[d.SourceID.String()] = append(deviceGroups[d.SourceID.String()], d)
  51. }
  52. }
  53. // 3. Process each group
  54. for sourceID, devs := range deviceGroups {
  55. go s.processSourceGroup(sourceID, devs)
  56. }
  57. }
  58. func (s *CollectorService) processSourceGroup(sourceID string, devices []models.Device) {
  59. // Fetch Source Config
  60. var source models.IntegrationSource
  61. if err := models.DB.First(&source, "id = ?", sourceID).Error; err != nil {
  62. log.Printf("Source %s not found: %v\n", sourceID, err)
  63. return
  64. }
  65. if source.DriverType != "HOME_ASSISTANT" {
  66. // Only HA supported for now
  67. return
  68. }
  69. // Collect all Entity IDs
  70. entityIDs := make([]string, 0)
  71. // Map: EntityID -> []{DeviceID, MetricKey}
  72. // One entity might be used by multiple devices/metrics? Rare but possible.
  73. type Target struct {
  74. DeviceID string
  75. Metric string
  76. LocationID string
  77. }
  78. requestMap := make(map[string][]Target)
  79. for _, d := range devices {
  80. // Parse Attribute Mapping
  81. var mapping map[string]string
  82. // mapping is JSONB, stored as datatypes.JSON
  83. // Need to unmarshal
  84. b, _ := d.AttributeMapping.MarshalJSON()
  85. json.Unmarshal(b, &mapping)
  86. for metric, entityID := range mapping {
  87. // Skip formulas for now (keys ending in _formula)
  88. if len(metric) > 8 && metric[len(metric)-8:] == "_formula" {
  89. continue
  90. }
  91. if entityID == "" {
  92. continue
  93. }
  94. if _, exists := requestMap[entityID]; !exists {
  95. entityIDs = append(entityIDs, entityID)
  96. }
  97. locID := ""
  98. if d.LocationID != nil {
  99. locID = d.LocationID.String()
  100. }
  101. requestMap[entityID] = append(requestMap[entityID], Target{
  102. DeviceID: d.ID.String(),
  103. Metric: metric,
  104. LocationID: locID,
  105. })
  106. }
  107. }
  108. if len(entityIDs) == 0 {
  109. return
  110. }
  111. // Fetch Data
  112. // TODO: Handle large batches?
  113. states, err := utils.BatchFetchStates(source.Config, entityIDs)
  114. if err != nil {
  115. log.Printf("Failed to fetch states for source %s: %v\n", source.Name, err)
  116. return
  117. }
  118. // Write to TSDB
  119. now := time.Now()
  120. count := 0
  121. for entityID, valStr := range states {
  122. // Parse value to float
  123. val, err := strconv.ParseFloat(valStr, 64)
  124. if err != nil {
  125. // Try to handle non-numeric states if necessary (e.g. "on"/"off")
  126. if valStr == "on" {
  127. val = 1
  128. } else if valStr == "off" {
  129. val = 0
  130. } else {
  131. continue // Skip non-numeric
  132. }
  133. }
  134. targets := requestMap[entityID]
  135. for _, target := range targets {
  136. // Insert
  137. err := db.InsertReading(target.DeviceID, target.Metric, val, target.LocationID, now)
  138. if err != nil {
  139. // Log occasional errors, but don't flood
  140. // log.Printf("TSDB Insert Error: %v\n", err)
  141. } else {
  142. count++
  143. }
  144. }
  145. }
  146. log.Printf("Source %s: Collected %d data points for %d devices", source.Name, count, len(devices))
  147. }