| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276 |
- package services
- import (
- "ems-backend/db"
- "ems-backend/models"
- "ems-backend/utils"
- "encoding/json"
- "log"
- "strconv"
- "time"
- "strings"
- "github.com/robfig/cron/v3"
- )
- type CollectorService struct {
- cron *cron.Cron
- }
- func NewCollectorService() *CollectorService {
- return &CollectorService{
- cron: cron.New(cron.WithSeconds()),
- }
- }
- func (s *CollectorService) Start() {
- // Run every 1 minute
- // Spec: "0 * * * * *" -> Every minute at 00s
- // For testing, maybe every 10 seconds? "*/10 * * * * *"
- // Let's stick to every minute for production-like behavior, or 30s.
- _, err := s.cron.AddFunc("*/30 * * * * *", s.collectJob)
- if err != nil {
- log.Fatalf("Failed to start collector cron: %v", err)
- }
- s.cron.Start()
- log.Println("Data Collector Service started (interval: 30s)")
- }
- func (s *CollectorService) Stop() {
- s.cron.Stop()
- }
- func (s *CollectorService) collectJob() {
- log.Println("Starting collection cycle...")
-
- // 1. Fetch all active devices with source
- var devices []models.Device
- if err := models.DB.Where("status != ?", "INACTIVE").Find(&devices).Error; err != nil {
- log.Printf("Error fetching devices: %v\n", err)
- return
- }
- if len(devices) == 0 {
- return
- }
- // 2. Group by SourceID
- deviceGroups := make(map[string][]models.Device)
- for _, d := range devices {
- if d.SourceID.String() != "" && d.SourceID.String() != "00000000-0000-0000-0000-000000000000" {
- deviceGroups[d.SourceID.String()] = append(deviceGroups[d.SourceID.String()], d)
- }
- }
- // 3. Process each group
- for sourceID, devs := range deviceGroups {
- go s.processSourceGroup(sourceID, devs)
- }
- }
- func (s *CollectorService) processSourceGroup(sourceID string, devices []models.Device) {
- // Fetch Source Config
- var source models.IntegrationSource
- if err := models.DB.First(&source, "id = ?", sourceID).Error; err != nil {
- log.Printf("Source %s not found: %v\n", sourceID, err)
- return
- }
- if source.DriverType != "HOME_ASSISTANT" {
- // Only HA supported for now
- return
- }
- // Collect all Entity IDs
- entityIDs := make([]string, 0)
- // Map: EntityID -> []{DeviceID, MetricKey}
- // One entity might be used by multiple devices/metrics? Rare but possible.
- type Target struct {
- DeviceID string
- Metric string
- LocationID string
- Formula string
- }
- requestMap := make(map[string][]Target)
-
- // Map to store pre-fetched states from direct device queries
- // EntityID -> State Value (string)
- preFetchedStates := make(map[string]string)
- for _, d := range devices {
- // Parse Attribute Mapping
- var mapping map[string]string
- // mapping is JSONB, stored as datatypes.JSON
- // Need to unmarshal
- b, _ := d.AttributeMapping.MarshalJSON()
- json.Unmarshal(b, &mapping)
-
- locID := ""
- if d.LocationID != nil {
- locID = d.LocationID.String()
- }
- for metric, entityID := range mapping {
- // Skip formulas for now (keys ending in _formula)
- if len(metric) > 8 && metric[len(metric)-8:] == "_formula" {
- continue
- }
- if entityID == "" {
- continue
- }
- if _, exists := requestMap[entityID]; !exists {
- entityIDs = append(entityIDs, entityID)
- }
-
- // Check for formula
- formula := mapping[metric+"_formula"]
- requestMap[entityID] = append(requestMap[entityID], Target{
- DeviceID: d.ID.String(),
- Metric: metric,
- LocationID: locID,
- Formula: formula,
- })
- }
-
- // Automatic Discovery: fetch all entities if not mapped or simply enforce "all" policy
- // Case A: ExternalID is an Entity ID (contains '.')
- if strings.Contains(d.ExternalID, ".") {
- entityID := d.ExternalID
- // Add to fetch list if not already there
- if _, exists := requestMap[entityID]; !exists {
- entityIDs = append(entityIDs, entityID)
- }
-
- // Use sanitized entity ID as metric name
- metric := strings.ReplaceAll(entityID, ".", "_")
-
- // Avoid duplicate if already mapped
- if len(requestMap[entityID]) == 0 {
- requestMap[entityID] = append(requestMap[entityID], Target{
- DeviceID: d.ID.String(),
- Metric: metric,
- LocationID: locID,
- })
- }
- } else if d.ExternalID != "" {
- // Case B: ExternalID is a Device ID (UUID-like, no '.')
- // Fetch all entities for this device
- // Note: This adds overhead. In production, result should be cached.
- haEntities, err := utils.FetchHAEntitiesByDevice(source.Config, d.ExternalID)
- if err == nil {
- for _, ent := range haEntities {
- // Metric name: sanitized entity ID
- metric := strings.ReplaceAll(ent.EntityID, ".", "_")
-
- // Register target
- if _, exists := requestMap[ent.EntityID]; !exists {
- // Only add if not already requested (avoid dupes if mapped)
- // But if it IS mapped, we might have added it above with a specific metric name.
- // Here we add it with default metric name.
- // To avoid double writing for mapped entities, we could check if ent.EntityID is in requestMap.
- // However, user requirement is "add all". Double writing (one with alias, one with raw ID) is acceptable or even desired.
- }
-
- if len(requestMap[ent.EntityID]) == 0 {
- requestMap[ent.EntityID] = append(requestMap[ent.EntityID], Target{
- DeviceID: d.ID.String(),
- Metric: metric,
- LocationID: locID,
- })
- }
-
- // Store value directly to avoid re-fetching
- preFetchedStates[ent.EntityID] = ent.State
- }
- } else {
- // Log error but continue
- log.Printf("Failed to auto-discover entities for device %s (%s): %v", d.Name, d.ExternalID, err)
- }
- }
- }
- if len(entityIDs) == 0 && len(preFetchedStates) == 0 {
- return
- }
- // Fetch Data for explicit entityIDs
- // TODO: Handle large batches?
- var states map[string]string
- var err error
-
- if len(entityIDs) > 0 {
- states, err = utils.BatchFetchStates(source.Config, entityIDs)
- if err != nil {
- log.Printf("Failed to fetch states for source %s: %v\n", source.Name, err)
- return
- }
- } else {
- states = make(map[string]string)
- }
- // Merge preFetchedStates into states
- for id, val := range preFetchedStates {
- states[id] = val
- }
- if len(states) < len(entityIDs) {
- log.Printf("Source %s: Requested %d entities, but got states for only %d (some may be unavailable)", source.Name, len(entityIDs), len(states))
- }
- // Write to TSDB
- now := time.Now()
- count := 0
-
- for entityID, valStr := range states {
- // Handle Switch Status (on/off) explicitly
- if valStr == "on" || valStr == "off" {
- valBool := valStr == "on"
- targets := requestMap[entityID]
- for _, target := range targets {
- // Insert Switch Log
- err := db.InsertSwitchLog(target.DeviceID, target.Metric, valBool, target.LocationID, now)
- if err != nil {
- log.Printf("TSDB Insert Switch Log Error for device %s (metric: %s, entity: %s): %v", target.DeviceID, target.Metric, entityID, err)
- } else {
- count++
- }
- }
- continue
- }
- // Parse value to float
- val, err := strconv.ParseFloat(valStr, 64)
- if err != nil {
- log.Printf("Skipping non-numeric value for entity %s: %s", entityID, valStr)
- continue // Skip non-numeric
- }
- targets := requestMap[entityID]
- for _, target := range targets {
-
- finalVal := val
- if target.Formula != "" {
- res, err := utils.EvaluateFormula(target.Formula, val)
- if err == nil {
- finalVal = res
- } else {
- log.Printf("Formula error for device %s metric %s: %v", target.DeviceID, target.Metric, err)
- }
- }
- // Insert
- err := db.InsertReading(target.DeviceID, target.Metric, finalVal, target.LocationID, now)
- if err != nil {
- log.Printf("TSDB Insert Error for device %s (metric: %s, entity: %s): %v", target.DeviceID, target.Metric, entityID, err)
- } else {
- count++
- }
- }
- }
- log.Printf("Source %s: Collected %d data points for %d devices", source.Name, count, len(devices))
- }
|