| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- package services
- import (
- "ems-backend/db"
- "ems-backend/models"
- "ems-backend/utils"
- "encoding/json"
- "log"
- "strconv"
- "time"
- "github.com/robfig/cron/v3"
- )
- type CollectorService struct {
- cron *cron.Cron
- }
- func NewCollectorService() *CollectorService {
- return &CollectorService{
- cron: cron.New(cron.WithSeconds()),
- }
- }
- func (s *CollectorService) Start() {
- // Run every 1 minute
- // Spec: "0 * * * * *" -> Every minute at 00s
- // For testing, maybe every 10 seconds? "*/10 * * * * *"
- // Let's stick to every minute for production-like behavior, or 30s.
- _, err := s.cron.AddFunc("*/30 * * * * *", s.collectJob)
- if err != nil {
- log.Fatalf("Failed to start collector cron: %v", err)
- }
- s.cron.Start()
- log.Println("Data Collector Service started (interval: 30s)")
- }
- func (s *CollectorService) Stop() {
- s.cron.Stop()
- }
- func (s *CollectorService) collectJob() {
- log.Println("Starting collection cycle...")
-
- // 1. Fetch all active devices with source
- var devices []models.Device
- if err := models.DB.Where("status != ?", "INACTIVE").Find(&devices).Error; err != nil {
- log.Printf("Error fetching devices: %v\n", err)
- return
- }
- if len(devices) == 0 {
- return
- }
- // 2. Group by SourceID
- deviceGroups := make(map[string][]models.Device)
- for _, d := range devices {
- if d.SourceID.String() != "" && d.SourceID.String() != "00000000-0000-0000-0000-000000000000" {
- deviceGroups[d.SourceID.String()] = append(deviceGroups[d.SourceID.String()], d)
- }
- }
- // 3. Process each group
- for sourceID, devs := range deviceGroups {
- go s.processSourceGroup(sourceID, devs)
- }
- }
- func (s *CollectorService) processSourceGroup(sourceID string, devices []models.Device) {
- // Fetch Source Config
- var source models.IntegrationSource
- if err := models.DB.First(&source, "id = ?", sourceID).Error; err != nil {
- log.Printf("Source %s not found: %v\n", sourceID, err)
- return
- }
- if source.DriverType != "HOME_ASSISTANT" {
- // Only HA supported for now
- return
- }
- // Collect all Entity IDs
- entityIDs := make([]string, 0)
- // Map: EntityID -> []{DeviceID, MetricKey}
- // One entity might be used by multiple devices/metrics? Rare but possible.
- type Target struct {
- DeviceID string
- Metric string
- LocationID string
- }
- requestMap := make(map[string][]Target)
- for _, d := range devices {
- // Parse Attribute Mapping
- var mapping map[string]string
- // mapping is JSONB, stored as datatypes.JSON
- // Need to unmarshal
- b, _ := d.AttributeMapping.MarshalJSON()
- json.Unmarshal(b, &mapping)
- for metric, entityID := range mapping {
- // Skip formulas for now (keys ending in _formula)
- if len(metric) > 8 && metric[len(metric)-8:] == "_formula" {
- continue
- }
- if entityID == "" {
- continue
- }
- if _, exists := requestMap[entityID]; !exists {
- entityIDs = append(entityIDs, entityID)
- }
-
- locID := ""
- if d.LocationID != nil {
- locID = d.LocationID.String()
- }
- requestMap[entityID] = append(requestMap[entityID], Target{
- DeviceID: d.ID.String(),
- Metric: metric,
- LocationID: locID,
- })
- }
- }
- if len(entityIDs) == 0 {
- return
- }
- // Fetch Data
- // TODO: Handle large batches?
- states, err := utils.BatchFetchStates(source.Config, entityIDs)
- if err != nil {
- log.Printf("Failed to fetch states for source %s: %v\n", source.Name, err)
- return
- }
- // Write to TSDB
- now := time.Now()
- count := 0
-
- for entityID, valStr := range states {
- // Parse value to float
- val, err := strconv.ParseFloat(valStr, 64)
- if err != nil {
- // Try to handle non-numeric states if necessary (e.g. "on"/"off")
- if valStr == "on" {
- val = 1
- } else if valStr == "off" {
- val = 0
- } else {
- continue // Skip non-numeric
- }
- }
- targets := requestMap[entityID]
- for _, target := range targets {
- // Insert
- err := db.InsertReading(target.DeviceID, target.Metric, val, target.LocationID, now)
- if err != nil {
- // Log occasional errors, but don't flood
- // log.Printf("TSDB Insert Error: %v\n", err)
- } else {
- count++
- }
- }
- }
- log.Printf("Source %s: Collected %d data points for %d devices", source.Name, count, len(devices))
- }
|