collector.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. package services
  2. import (
  3. "ems-backend/db"
  4. "ems-backend/models"
  5. "ems-backend/utils"
  6. "encoding/json"
  7. "log"
  8. "strconv"
  9. "time"
  10. "strings"
  11. "github.com/robfig/cron/v3"
  12. )
  13. type CollectorService struct {
  14. cron *cron.Cron
  15. }
  16. func NewCollectorService() *CollectorService {
  17. return &CollectorService{
  18. cron: cron.New(cron.WithSeconds()),
  19. }
  20. }
  21. func (s *CollectorService) Start() {
  22. // Run every 1 minute
  23. // Spec: "0 * * * * *" -> Every minute at 00s
  24. // For testing, maybe every 10 seconds? "*/10 * * * * *"
  25. // Let's stick to every minute for production-like behavior, or 30s.
  26. _, err := s.cron.AddFunc("*/30 * * * * *", s.collectJob)
  27. if err != nil {
  28. log.Fatalf("Failed to start collector cron: %v", err)
  29. }
  30. s.cron.Start()
  31. log.Println("Data Collector Service started (interval: 30s)")
  32. }
  33. func (s *CollectorService) Stop() {
  34. s.cron.Stop()
  35. }
  36. func (s *CollectorService) collectJob() {
  37. log.Println("Starting collection cycle...")
  38. // 1. Fetch all active devices with source
  39. var devices []models.Device
  40. if err := models.DB.Where("status != ?", "INACTIVE").Find(&devices).Error; err != nil {
  41. log.Printf("Error fetching devices: %v\n", err)
  42. return
  43. }
  44. if len(devices) == 0 {
  45. return
  46. }
  47. // 2. Group by SourceID
  48. deviceGroups := make(map[string][]models.Device)
  49. for _, d := range devices {
  50. if d.SourceID.String() != "" && d.SourceID.String() != "00000000-0000-0000-0000-000000000000" {
  51. deviceGroups[d.SourceID.String()] = append(deviceGroups[d.SourceID.String()], d)
  52. }
  53. }
  54. // 3. Process each group
  55. for sourceID, devs := range deviceGroups {
  56. go s.processSourceGroup(sourceID, devs)
  57. }
  58. }
  59. func (s *CollectorService) processSourceGroup(sourceID string, devices []models.Device) {
  60. // Fetch Source Config
  61. var source models.IntegrationSource
  62. if err := models.DB.First(&source, "id = ?", sourceID).Error; err != nil {
  63. log.Printf("Source %s not found: %v\n", sourceID, err)
  64. return
  65. }
  66. if source.DriverType != "HOME_ASSISTANT" {
  67. // Only HA supported for now
  68. return
  69. }
  70. // Collect all Entity IDs
  71. entityIDs := make([]string, 0)
  72. // Map: EntityID -> []{DeviceID, MetricKey}
  73. // One entity might be used by multiple devices/metrics? Rare but possible.
  74. type Target struct {
  75. DeviceID string
  76. Metric string
  77. LocationID string
  78. Formula string
  79. }
  80. requestMap := make(map[string][]Target)
  81. // Map to store pre-fetched states from direct device queries
  82. // EntityID -> State Value (string)
  83. preFetchedStates := make(map[string]string)
  84. for _, d := range devices {
  85. // Parse Attribute Mapping
  86. var mapping map[string]string
  87. // mapping is JSONB, stored as datatypes.JSON
  88. // Need to unmarshal
  89. b, _ := d.AttributeMapping.MarshalJSON()
  90. json.Unmarshal(b, &mapping)
  91. locID := ""
  92. if d.LocationID != nil {
  93. locID = d.LocationID.String()
  94. }
  95. for metric, entityID := range mapping {
  96. // Skip formulas for now (keys ending in _formula)
  97. if len(metric) > 8 && metric[len(metric)-8:] == "_formula" {
  98. continue
  99. }
  100. if entityID == "" {
  101. continue
  102. }
  103. if _, exists := requestMap[entityID]; !exists {
  104. entityIDs = append(entityIDs, entityID)
  105. }
  106. // Check for formula
  107. formula := mapping[metric+"_formula"]
  108. requestMap[entityID] = append(requestMap[entityID], Target{
  109. DeviceID: d.ID.String(),
  110. Metric: metric,
  111. LocationID: locID,
  112. Formula: formula,
  113. })
  114. }
  115. // Automatic Discovery: fetch all entities if not mapped or simply enforce "all" policy
  116. // Case A: ExternalID is an Entity ID (contains '.')
  117. if strings.Contains(d.ExternalID, ".") {
  118. entityID := d.ExternalID
  119. // Add to fetch list if not already there
  120. if _, exists := requestMap[entityID]; !exists {
  121. entityIDs = append(entityIDs, entityID)
  122. }
  123. // Use sanitized entity ID as metric name
  124. metric := strings.ReplaceAll(entityID, ".", "_")
  125. // Avoid duplicate if already mapped
  126. if len(requestMap[entityID]) == 0 {
  127. requestMap[entityID] = append(requestMap[entityID], Target{
  128. DeviceID: d.ID.String(),
  129. Metric: metric,
  130. LocationID: locID,
  131. })
  132. }
  133. } else if d.ExternalID != "" {
  134. // Case B: ExternalID is a Device ID (UUID-like, no '.')
  135. // Fetch all entities for this device
  136. // Note: This adds overhead. In production, result should be cached.
  137. haEntities, err := utils.FetchHAEntitiesByDevice(source.Config, d.ExternalID)
  138. if err == nil {
  139. for _, ent := range haEntities {
  140. // Metric name: sanitized entity ID
  141. metric := strings.ReplaceAll(ent.EntityID, ".", "_")
  142. // Register target
  143. if _, exists := requestMap[ent.EntityID]; !exists {
  144. // Only add if not already requested (avoid dupes if mapped)
  145. // But if it IS mapped, we might have added it above with a specific metric name.
  146. // Here we add it with default metric name.
  147. // To avoid double writing for mapped entities, we could check if ent.EntityID is in requestMap.
  148. // However, user requirement is "add all". Double writing (one with alias, one with raw ID) is acceptable or even desired.
  149. }
  150. if len(requestMap[ent.EntityID]) == 0 {
  151. requestMap[ent.EntityID] = append(requestMap[ent.EntityID], Target{
  152. DeviceID: d.ID.String(),
  153. Metric: metric,
  154. LocationID: locID,
  155. })
  156. }
  157. // Store value directly to avoid re-fetching
  158. preFetchedStates[ent.EntityID] = ent.State
  159. }
  160. } else {
  161. // Log error but continue
  162. log.Printf("Failed to auto-discover entities for device %s (%s): %v", d.Name, d.ExternalID, err)
  163. }
  164. }
  165. }
  166. if len(entityIDs) == 0 && len(preFetchedStates) == 0 {
  167. return
  168. }
  169. // Fetch Data for explicit entityIDs
  170. // TODO: Handle large batches?
  171. var states map[string]string
  172. var err error
  173. if len(entityIDs) > 0 {
  174. states, err = utils.BatchFetchStates(source.Config, entityIDs)
  175. if err != nil {
  176. log.Printf("Failed to fetch states for source %s: %v\n", source.Name, err)
  177. return
  178. }
  179. } else {
  180. states = make(map[string]string)
  181. }
  182. // Merge preFetchedStates into states
  183. for id, val := range preFetchedStates {
  184. states[id] = val
  185. }
  186. if len(states) < len(entityIDs) {
  187. log.Printf("Source %s: Requested %d entities, but got states for only %d (some may be unavailable)", source.Name, len(entityIDs), len(states))
  188. }
  189. // Write to TSDB
  190. now := time.Now()
  191. count := 0
  192. for entityID, valStr := range states {
  193. // Handle Switch Status (on/off) explicitly
  194. if valStr == "on" || valStr == "off" {
  195. valBool := valStr == "on"
  196. targets := requestMap[entityID]
  197. for _, target := range targets {
  198. // Insert Switch Log
  199. err := db.InsertSwitchLog(target.DeviceID, target.Metric, valBool, target.LocationID, now)
  200. if err != nil {
  201. log.Printf("TSDB Insert Switch Log Error for device %s (metric: %s, entity: %s): %v", target.DeviceID, target.Metric, entityID, err)
  202. } else {
  203. count++
  204. }
  205. }
  206. continue
  207. }
  208. // Parse value to float
  209. val, err := strconv.ParseFloat(valStr, 64)
  210. if err != nil {
  211. log.Printf("Skipping non-numeric value for entity %s: %s", entityID, valStr)
  212. continue // Skip non-numeric
  213. }
  214. targets := requestMap[entityID]
  215. for _, target := range targets {
  216. finalVal := val
  217. if target.Formula != "" {
  218. res, err := utils.EvaluateFormula(target.Formula, val)
  219. if err == nil {
  220. finalVal = res
  221. } else {
  222. log.Printf("Formula error for device %s metric %s: %v", target.DeviceID, target.Metric, err)
  223. }
  224. }
  225. // Insert
  226. err := db.InsertReading(target.DeviceID, target.Metric, finalVal, target.LocationID, now)
  227. if err != nil {
  228. log.Printf("TSDB Insert Error for device %s (metric: %s, entity: %s): %v", target.DeviceID, target.Metric, entityID, err)
  229. } else {
  230. count++
  231. }
  232. }
  233. }
  234. log.Printf("Source %s: Collected %d data points for %d devices", source.Name, count, len(devices))
  235. }