package services import ( "ems-backend/db" "ems-backend/models" "ems-backend/utils" "encoding/json" "log" "strconv" "time" "github.com/robfig/cron/v3" ) type CollectorService struct { cron *cron.Cron } func NewCollectorService() *CollectorService { return &CollectorService{ cron: cron.New(cron.WithSeconds()), } } func (s *CollectorService) Start() { // Run every 1 minute // Spec: "0 * * * * *" -> Every minute at 00s // For testing, maybe every 10 seconds? "*/10 * * * * *" // Let's stick to every minute for production-like behavior, or 30s. _, err := s.cron.AddFunc("*/30 * * * * *", s.collectJob) if err != nil { log.Fatalf("Failed to start collector cron: %v", err) } s.cron.Start() log.Println("Data Collector Service started (interval: 30s)") } func (s *CollectorService) Stop() { s.cron.Stop() } func (s *CollectorService) collectJob() { log.Println("Starting collection cycle...") // 1. Fetch all active devices with source var devices []models.Device if err := models.DB.Where("status != ?", "INACTIVE").Find(&devices).Error; err != nil { log.Printf("Error fetching devices: %v\n", err) return } if len(devices) == 0 { return } // 2. Group by SourceID deviceGroups := make(map[string][]models.Device) for _, d := range devices { if d.SourceID.String() != "" && d.SourceID.String() != "00000000-0000-0000-0000-000000000000" { deviceGroups[d.SourceID.String()] = append(deviceGroups[d.SourceID.String()], d) } } // 3. Process each group for sourceID, devs := range deviceGroups { go s.processSourceGroup(sourceID, devs) } } func (s *CollectorService) processSourceGroup(sourceID string, devices []models.Device) { // Fetch Source Config var source models.IntegrationSource if err := models.DB.First(&source, "id = ?", sourceID).Error; err != nil { log.Printf("Source %s not found: %v\n", sourceID, err) return } if source.DriverType != "HOME_ASSISTANT" { // Only HA supported for now return } // Collect all Entity IDs entityIDs := make([]string, 0) // Map: EntityID -> []{DeviceID, MetricKey} // One entity might be used by multiple devices/metrics? Rare but possible. type Target struct { DeviceID string Metric string LocationID string } requestMap := make(map[string][]Target) for _, d := range devices { // Parse Attribute Mapping var mapping map[string]string // mapping is JSONB, stored as datatypes.JSON // Need to unmarshal b, _ := d.AttributeMapping.MarshalJSON() json.Unmarshal(b, &mapping) for metric, entityID := range mapping { // Skip formulas for now (keys ending in _formula) if len(metric) > 8 && metric[len(metric)-8:] == "_formula" { continue } if entityID == "" { continue } if _, exists := requestMap[entityID]; !exists { entityIDs = append(entityIDs, entityID) } locID := "" if d.LocationID != nil { locID = d.LocationID.String() } requestMap[entityID] = append(requestMap[entityID], Target{ DeviceID: d.ID.String(), Metric: metric, LocationID: locID, }) } } if len(entityIDs) == 0 { return } // Fetch Data // TODO: Handle large batches? states, err := utils.BatchFetchStates(source.Config, entityIDs) if err != nil { log.Printf("Failed to fetch states for source %s: %v\n", source.Name, err) return } // Write to TSDB now := time.Now() count := 0 for entityID, valStr := range states { // Parse value to float val, err := strconv.ParseFloat(valStr, 64) if err != nil { // Try to handle non-numeric states if necessary (e.g. "on"/"off") if valStr == "on" { val = 1 } else if valStr == "off" { val = 0 } else { continue // Skip non-numeric } } targets := requestMap[entityID] for _, target := range targets { // Insert err := db.InsertReading(target.DeviceID, target.Metric, val, target.LocationID, now) if err != nil { // Log occasional errors, but don't flood // log.Printf("TSDB Insert Error: %v\n", err) } else { count++ } } } log.Printf("Source %s: Collected %d data points for %d devices", source.Name, count, len(devices)) }