|
|
@@ -8,6 +8,7 @@ import (
|
|
|
"log"
|
|
|
"strconv"
|
|
|
"time"
|
|
|
+ "strings"
|
|
|
|
|
|
"github.com/robfig/cron/v3"
|
|
|
)
|
|
|
@@ -88,8 +89,13 @@ func (s *CollectorService) processSourceGroup(sourceID string, devices []models.
|
|
|
DeviceID string
|
|
|
Metric string
|
|
|
LocationID string
|
|
|
+ Formula string
|
|
|
}
|
|
|
requestMap := make(map[string][]Target)
|
|
|
+
|
|
|
+ // Map to store pre-fetched states from direct device queries
|
|
|
+ // EntityID -> State Value (string)
|
|
|
+ preFetchedStates := make(map[string]string)
|
|
|
|
|
|
for _, d := range devices {
|
|
|
// Parse Attribute Mapping
|
|
|
@@ -98,6 +104,11 @@ func (s *CollectorService) processSourceGroup(sourceID string, devices []models.
|
|
|
// Need to unmarshal
|
|
|
b, _ := d.AttributeMapping.MarshalJSON()
|
|
|
json.Unmarshal(b, &mapping)
|
|
|
+
|
|
|
+ locID := ""
|
|
|
+ if d.LocationID != nil {
|
|
|
+ locID = d.LocationID.String()
|
|
|
+ }
|
|
|
|
|
|
for metric, entityID := range mapping {
|
|
|
// Skip formulas for now (keys ending in _formula)
|
|
|
@@ -112,29 +123,101 @@ func (s *CollectorService) processSourceGroup(sourceID string, devices []models.
|
|
|
entityIDs = append(entityIDs, entityID)
|
|
|
}
|
|
|
|
|
|
- locID := ""
|
|
|
- if d.LocationID != nil {
|
|
|
- locID = d.LocationID.String()
|
|
|
- }
|
|
|
+ // Check for formula
|
|
|
+ formula := mapping[metric+"_formula"]
|
|
|
|
|
|
requestMap[entityID] = append(requestMap[entityID], Target{
|
|
|
DeviceID: d.ID.String(),
|
|
|
Metric: metric,
|
|
|
LocationID: locID,
|
|
|
+ Formula: formula,
|
|
|
})
|
|
|
}
|
|
|
+
|
|
|
+ // Automatic Discovery: fetch all entities if not mapped or simply enforce "all" policy
|
|
|
+ // Case A: ExternalID is an Entity ID (contains '.')
|
|
|
+ if strings.Contains(d.ExternalID, ".") {
|
|
|
+ entityID := d.ExternalID
|
|
|
+ // Add to fetch list if not already there
|
|
|
+ if _, exists := requestMap[entityID]; !exists {
|
|
|
+ entityIDs = append(entityIDs, entityID)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Use sanitized entity ID as metric name
|
|
|
+ metric := strings.ReplaceAll(entityID, ".", "_")
|
|
|
+
|
|
|
+ // Avoid duplicate if already mapped
|
|
|
+ if len(requestMap[entityID]) == 0 {
|
|
|
+ requestMap[entityID] = append(requestMap[entityID], Target{
|
|
|
+ DeviceID: d.ID.String(),
|
|
|
+ Metric: metric,
|
|
|
+ LocationID: locID,
|
|
|
+ })
|
|
|
+ }
|
|
|
+ } else if d.ExternalID != "" {
|
|
|
+ // Case B: ExternalID is a Device ID (UUID-like, no '.')
|
|
|
+ // Fetch all entities for this device
|
|
|
+ // Note: This adds overhead. In production, result should be cached.
|
|
|
+ haEntities, err := utils.FetchHAEntitiesByDevice(source.Config, d.ExternalID)
|
|
|
+ if err == nil {
|
|
|
+ for _, ent := range haEntities {
|
|
|
+ // Metric name: sanitized entity ID
|
|
|
+ metric := strings.ReplaceAll(ent.EntityID, ".", "_")
|
|
|
+
|
|
|
+ // Register target
|
|
|
+ if _, exists := requestMap[ent.EntityID]; !exists {
|
|
|
+ // Only add if not already requested (avoid dupes if mapped)
|
|
|
+ // But if it IS mapped, we might have added it above with a specific metric name.
|
|
|
+ // Here we add it with default metric name.
|
|
|
+ // To avoid double writing for mapped entities, we could check if ent.EntityID is in requestMap.
|
|
|
+ // However, user requirement is "add all". Double writing (one with alias, one with raw ID) is acceptable or even desired.
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(requestMap[ent.EntityID]) == 0 {
|
|
|
+ requestMap[ent.EntityID] = append(requestMap[ent.EntityID], Target{
|
|
|
+ DeviceID: d.ID.String(),
|
|
|
+ Metric: metric,
|
|
|
+ LocationID: locID,
|
|
|
+ })
|
|
|
+ }
|
|
|
+
|
|
|
+ // Store value directly to avoid re-fetching
|
|
|
+ preFetchedStates[ent.EntityID] = ent.State
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // Log error but continue
|
|
|
+ log.Printf("Failed to auto-discover entities for device %s (%s): %v", d.Name, d.ExternalID, err)
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- if len(entityIDs) == 0 {
|
|
|
+ if len(entityIDs) == 0 && len(preFetchedStates) == 0 {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
- // Fetch Data
|
|
|
+ // Fetch Data for explicit entityIDs
|
|
|
// TODO: Handle large batches?
|
|
|
- states, err := utils.BatchFetchStates(source.Config, entityIDs)
|
|
|
- if err != nil {
|
|
|
- log.Printf("Failed to fetch states for source %s: %v\n", source.Name, err)
|
|
|
- return
|
|
|
+ var states map[string]string
|
|
|
+ var err error
|
|
|
+
|
|
|
+ if len(entityIDs) > 0 {
|
|
|
+ states, err = utils.BatchFetchStates(source.Config, entityIDs)
|
|
|
+ if err != nil {
|
|
|
+ log.Printf("Failed to fetch states for source %s: %v\n", source.Name, err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ states = make(map[string]string)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Merge preFetchedStates into states
|
|
|
+ for id, val := range preFetchedStates {
|
|
|
+ states[id] = val
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ if len(states) < len(entityIDs) {
|
|
|
+ log.Printf("Source %s: Requested %d entities, but got states for only %d (some may be unavailable)", source.Name, len(entityIDs), len(states))
|
|
|
}
|
|
|
|
|
|
// Write to TSDB
|
|
|
@@ -142,26 +225,46 @@ func (s *CollectorService) processSourceGroup(sourceID string, devices []models.
|
|
|
count := 0
|
|
|
|
|
|
for entityID, valStr := range states {
|
|
|
+ // Handle Switch Status (on/off) explicitly
|
|
|
+ if valStr == "on" || valStr == "off" {
|
|
|
+ valBool := valStr == "on"
|
|
|
+ targets := requestMap[entityID]
|
|
|
+ for _, target := range targets {
|
|
|
+ // Insert Switch Log
|
|
|
+ err := db.InsertSwitchLog(target.DeviceID, target.Metric, valBool, target.LocationID, now)
|
|
|
+ if err != nil {
|
|
|
+ log.Printf("TSDB Insert Switch Log Error for device %s (metric: %s, entity: %s): %v", target.DeviceID, target.Metric, entityID, err)
|
|
|
+ } else {
|
|
|
+ count++
|
|
|
+ }
|
|
|
+ }
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
// Parse value to float
|
|
|
val, err := strconv.ParseFloat(valStr, 64)
|
|
|
if err != nil {
|
|
|
- // Try to handle non-numeric states if necessary (e.g. "on"/"off")
|
|
|
- if valStr == "on" {
|
|
|
- val = 1
|
|
|
- } else if valStr == "off" {
|
|
|
- val = 0
|
|
|
- } else {
|
|
|
- continue // Skip non-numeric
|
|
|
- }
|
|
|
+ log.Printf("Skipping non-numeric value for entity %s: %s", entityID, valStr)
|
|
|
+ continue // Skip non-numeric
|
|
|
}
|
|
|
|
|
|
targets := requestMap[entityID]
|
|
|
for _, target := range targets {
|
|
|
+
|
|
|
+ finalVal := val
|
|
|
+ if target.Formula != "" {
|
|
|
+ res, err := utils.EvaluateFormula(target.Formula, val)
|
|
|
+ if err == nil {
|
|
|
+ finalVal = res
|
|
|
+ } else {
|
|
|
+ log.Printf("Formula error for device %s metric %s: %v", target.DeviceID, target.Metric, err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
// Insert
|
|
|
- err := db.InsertReading(target.DeviceID, target.Metric, val, target.LocationID, now)
|
|
|
+ err := db.InsertReading(target.DeviceID, target.Metric, finalVal, target.LocationID, now)
|
|
|
if err != nil {
|
|
|
- // Log occasional errors, but don't flood
|
|
|
- // log.Printf("TSDB Insert Error: %v\n", err)
|
|
|
+ log.Printf("TSDB Insert Error for device %s (metric: %s, entity: %s): %v", target.DeviceID, target.Metric, entityID, err)
|
|
|
} else {
|
|
|
count++
|
|
|
}
|