Explorar o código

解决时序数据库不存数据的问题

liuq hai 2 meses
pai
achega
65608ff9ab

+ 51 - 0
backend/controllers/resource_controller.go

@@ -2,6 +2,7 @@ package controllers
 
 import (
 	"fmt"
+	"ems-backend/db"
 	"ems-backend/models"
 	"encoding/json"
 	"net/http"
@@ -696,3 +697,53 @@ func DeleteLocation(c *gin.Context) {
 	c.JSON(http.StatusOK, gin.H{"message": "Location deleted"})
 }
 
+// GetDeviceHistory fetches historical data for devices
+func GetDeviceHistory(c *gin.Context) {
+	// Parse Query Params
+	// device_ids (comma separated)
+	// metric (default power)
+	// start, end (timestamps or ISO strings)
+	// interval (e.g. 1m, 1h)
+
+	deviceIDsStr := c.Query("device_ids")
+	if deviceIDsStr == "" {
+		c.JSON(http.StatusBadRequest, gin.H{"error": "device_ids is required"})
+		return
+	}
+	deviceIDs := strings.Split(deviceIDsStr, ",")
+
+	metric := c.Query("metric")
+	if metric == "" {
+		metric = "power"
+	}
+
+	startStr := c.Query("start")
+	endStr := c.Query("end")
+	interval := c.Query("interval")
+
+	// Default time range: last 24h
+	end := time.Now()
+	start := end.Add(-24 * time.Hour)
+
+	if startStr != "" {
+		if t, err := time.Parse(time.RFC3339, startStr); err == nil {
+			start = t
+		} else if t, err := time.Parse("2006-01-02 15:04:05", startStr); err == nil {
+			start = t
+		}
+	}
+	if endStr != "" {
+		if t, err := time.Parse(time.RFC3339, endStr); err == nil {
+			end = t
+		} else if t, err := time.Parse("2006-01-02 15:04:05", endStr); err == nil {
+			end = t
+		}
+	}
+
+	data, err := db.GetReadings(deviceIDs, metric, start, end, interval)
+	if err != nil {
+		c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
+		return
+	}
+	c.JSON(http.StatusOK, data)
+}

+ 196 - 0
backend/db/tdengine.go

@@ -0,0 +1,196 @@
+package db
+
+import (
+	"database/sql"
+	"fmt"
+	"log"
+	"os"
+	"time"
+
+	_ "github.com/taosdata/driver-go/v3/taosRestful"
+)
+
+var TD *sql.DB
+
+func InitTDengine() {
+	dsn := os.Getenv("TD_DSN")
+	if dsn == "" {
+		log.Println("TD_DSN not set, skipping TDengine initialization")
+		return
+	}
+
+	// Connect to TDengine using RESTful driver (CGO_ENABLED=0 friendly)
+	var err error
+	TD, err = sql.Open("taosRestful", dsn)
+	if err != nil {
+		log.Printf("Failed to open TDengine connection: %v\n", err)
+		return
+	}
+
+	if err := TD.Ping(); err != nil {
+		log.Printf("Failed to ping TDengine: %v\n", err)
+		return
+	}
+
+	log.Println("Connected to TDengine")
+
+	// Initialize Schema
+	initSchema()
+}
+
+func initSchema() {
+	// Create Database
+	_, err := TD.Exec("CREATE DATABASE IF NOT EXISTS power_db KEEP 365 DURATION 10 BUFFER 16")
+	if err != nil {
+		log.Printf("Failed to create database: %v\n", err)
+		return
+	}
+
+	// Switch to database
+	_, err = TD.Exec("USE power_db")
+	if err != nil {
+		log.Printf("Failed to use database: %v\n", err)
+		return
+	}
+
+	// Create Super Table for standard metrics
+	// Using a "Single Value" model for flexibility:
+	// ts: timestamp
+	// val: double value
+	// Tags: device_id, metric (e.g., 'power', 'voltage'), location_id
+	
+	stSql := `CREATE STABLE IF NOT EXISTS readings (
+		ts TIMESTAMP,
+		val DOUBLE
+	) TAGS (
+		device_id BINARY(64),
+		metric BINARY(32),
+		location_id BINARY(64)
+	)`
+
+	_, err = TD.Exec(stSql)
+	if err != nil {
+		log.Printf("Failed to create stable readings: %v\n", err)
+	} else {
+		log.Println("TDengine schema initialized")
+	}
+}
+
+// InsertReading inserts a single reading
+func InsertReading(deviceID string, metric string, val float64, locationID string, ts time.Time) error {
+	if TD == nil {
+		return fmt.Errorf("TDengine not initialized")
+	}
+
+	// Sanitize table name: d_{device_id}_{metric}
+	// We need to ensure device_id and metric are safe for table names.
+	// UUIDs are safe (with hyphens replaced by underscores).
+	// Metrics should be alphanumeric.
+	
+	// Actually, using automatic table creation via INSERT ... USING is best.
+	// Table name: t_{device_id}_{metric} (hash or sanitized)
+	// But simple string concatenation is risky if not sanitized.
+	// Let's rely on prepared statements or careful construction.
+	
+	// Construct table name
+	// deviceID: usually UUID
+	// metric: e.g. "power", "voltage"
+	// locationID: UUID or empty
+	
+	// Safe table name construction
+	// Replace '-' with '_'
+	safeDID := fmt.Sprintf("d_%s_%s", deviceID, metric)
+	// Remove special chars if any (simplified)
+	
+	// SQL: INSERT INTO {table_name} USING readings TAGS (...) VALUES (?, ?)
+	// Note: taosSql driver might not support prepared statement for Table Name or Tags in USING clause well in all versions.
+	// Standard approach: fmt.Sprintf for the SQL structure.
+	
+	query := fmt.Sprintf("INSERT INTO `%s` USING readings TAGS ('%s', '%s', '%s') VALUES (?, ?)",
+		safeDID, deviceID, metric, locationID)
+	
+	_, err := TD.Exec(query, ts, val)
+	return err
+}
+
+// ReadingHistory represents aggregated historical data
+type ReadingHistory struct {
+	Ts       string  `json:"ts"`
+	Open     float64 `json:"open"`
+	High     float64 `json:"high"`
+	Low      float64 `json:"low"`
+	Close    float64 `json:"close"`
+	Avg      float64 `json:"avg"`
+	DeviceID string  `json:"device_id"`
+}
+
+// GetReadings fetches aggregated historical data
+func GetReadings(deviceIDs []string, metric string, start, end time.Time, interval string) ([]ReadingHistory, error) {
+	if TD == nil {
+		return nil, fmt.Errorf("TDengine not initialized")
+	}
+
+	if len(deviceIDs) == 0 {
+		return []ReadingHistory{}, nil
+	}
+
+	// Construct device_id IN clause
+	// device_id is BINARY in TDengine, so we quote them
+	inClause := ""
+	for i, id := range deviceIDs {
+		if i > 0 {
+			inClause += ", "
+		}
+		inClause += fmt.Sprintf("'%s'", id)
+	}
+
+	// Validate Interval (simple check)
+	// e.g. 1m, 1h, 1d
+	if interval == "" {
+		interval = "1h"
+	}
+	// TDengine might not support '1y' directly in all versions, map to days
+	if interval == "1y" {
+		interval = "365d"
+	}
+
+	// Query
+	// Note: ts output format depends on driver, usually time.Time or string.
+	// Using generic sql driver, it might be time.Time.
+	// Group by device_id to separate lines.
+	query := fmt.Sprintf(`SELECT 
+		FIRST(val), MAX(val), MIN(val), LAST(val), AVG(val), device_id
+		FROM readings 
+		WHERE device_id IN (%s) AND metric = '%s' AND ts >= '%s' AND ts <= '%s' 
+		INTERVAL(%s) 
+		GROUP BY device_id 
+		ORDER BY ts ASC`,
+		inClause, metric, start.Format("2006-01-02 15:04:05"), end.Format("2006-01-02 15:04:05"), interval)
+
+	rows, err := TD.Query(query)
+	if err != nil {
+		log.Printf("TDengine Query Error: %v\nQuery: %s", err, query)
+		return nil, err
+	}
+	defer rows.Close()
+
+	var results []ReadingHistory
+	for rows.Next() {
+		var r ReadingHistory
+		var ts time.Time
+		var did sql.NullString // device_id might be grouped?
+
+		// Note: TDengine RESTful driver behavior on INTERVAL + GROUP BY:
+		// Result columns: ts, first, max, min, last, avg, device_id
+		if err := rows.Scan(&ts, &r.Open, &r.High, &r.Low, &r.Close, &r.Avg, &did); err != nil {
+			log.Printf("Scan error: %v", err)
+			continue
+		}
+		r.Ts = ts.Format("2006-01-02 15:04:05")
+		if did.Valid {
+			r.DeviceID = did.String
+		}
+		results = append(results, r)
+	}
+	return results, nil
+}

+ 2 - 0
backend/go.mod

@@ -8,6 +8,8 @@ require (
 	github.com/golang-migrate/migrate/v4 v4.17.0
 	github.com/google/uuid v1.6.0
 	github.com/rogpeppe/go-internal v1.13.1
+	github.com/taosdata/driver-go/v3 v3.5.0
+	github.com/robfig/cron/v3 v3.0.1
 	gorm.io/datatypes v1.2.0
 	gorm.io/driver/postgres v1.5.4
 	gorm.io/gorm v1.25.5

+ 11 - 1
backend/main.go

@@ -12,8 +12,10 @@ import (
 	_ "github.com/golang-migrate/migrate/v4/database/postgres"
 	_ "github.com/golang-migrate/migrate/v4/source/file"
 
+	"ems-backend/db"
 	"ems-backend/models"
 	"ems-backend/routes"
+	"ems-backend/services"
 )
 
 func initSchema() {
@@ -66,6 +68,15 @@ func initSchema() {
 func main() {
 	initSchema()
 	models.InitDB()
+	
+	// Initialize Time Series Database (TDengine)
+	db.InitTDengine()
+	
+	// Start Data Collector Service
+	collector := services.NewCollectorService()
+	collector.Start()
+	// Note: collector.Stop() is not strictly needed as container kill handles cleanup,
+	// but can be added if we implement graceful shutdown.
 
 	r := gin.Default()
 
@@ -99,4 +110,3 @@ func main() {
 	fmt.Printf("Server starting on port %s...\n", port)
 	r.Run(":" + port)
 }
-

+ 1 - 1
backend/models/sys_menu.go

@@ -93,7 +93,7 @@ func InitSysMenuData(db *gorm.DB) {
 					{Name: "修改", Perms: "resource:datasource:edit", Type: "F", OrderNum: 2},
 					{Name: "删除", Perms: "resource:datasource:remove", Type: "F", OrderNum: 3},
 				}},
-				{Name: "智能导入与清洗", Path: "import", Component: "resource/ImportClean", Perms: "resource:import:list", Type: "C", OrderNum: 2},
+				{Name: "设备管理", Path: "import", Component: "resource/ImportClean", Perms: "resource:import:list", Type: "C", OrderNum: 2},
 				{Name: "空间拓扑管理", Path: "topology", Component: "resource/Topology", Perms: "resource:topology:list", Type: "C", OrderNum: 3},
 			},
 		},

+ 1 - 0
backend/routes/routes.go

@@ -32,6 +32,7 @@ func SetupRoutes(r *gin.Engine) {
 
 		// Devices
 		api.GET("/devices", controllers.GetDevices)
+		api.GET("/devices/history", controllers.GetDeviceHistory)
 		api.POST("/devices", controllers.CreateDevice)
 		api.PUT("/devices/:id", controllers.UpdateDevice)
 		api.DELETE("/devices/:id", controllers.DeleteDevice)

+ 173 - 0
backend/services/collector.go

@@ -0,0 +1,173 @@
+package services
+
+import (
+	"ems-backend/db"
+	"ems-backend/models"
+	"ems-backend/utils"
+	"encoding/json"
+	"log"
+	"strconv"
+	"time"
+
+	"github.com/robfig/cron/v3"
+)
+
+type CollectorService struct {
+	cron *cron.Cron
+}
+
+func NewCollectorService() *CollectorService {
+	return &CollectorService{
+		cron: cron.New(cron.WithSeconds()),
+	}
+}
+
+func (s *CollectorService) Start() {
+	// Run every 1 minute
+	// Spec: "0 * * * * *" -> Every minute at 00s
+	// For testing, maybe every 10 seconds? "*/10 * * * * *"
+	// Let's stick to every minute for production-like behavior, or 30s.
+	_, err := s.cron.AddFunc("*/30 * * * * *", s.collectJob)
+	if err != nil {
+		log.Fatalf("Failed to start collector cron: %v", err)
+	}
+	s.cron.Start()
+	log.Println("Data Collector Service started (interval: 30s)")
+}
+
+func (s *CollectorService) Stop() {
+	s.cron.Stop()
+}
+
+func (s *CollectorService) collectJob() {
+	log.Println("Starting collection cycle...")
+	
+	// 1. Fetch all active devices with source
+	var devices []models.Device
+	if err := models.DB.Where("status != ?", "INACTIVE").Find(&devices).Error; err != nil {
+		log.Printf("Error fetching devices: %v\n", err)
+		return
+	}
+
+	if len(devices) == 0 {
+		return
+	}
+
+	// 2. Group by SourceID
+	deviceGroups := make(map[string][]models.Device)
+	for _, d := range devices {
+		if d.SourceID.String() != "" && d.SourceID.String() != "00000000-0000-0000-0000-000000000000" {
+			deviceGroups[d.SourceID.String()] = append(deviceGroups[d.SourceID.String()], d)
+		}
+	}
+
+	// 3. Process each group
+	for sourceID, devs := range deviceGroups {
+		go s.processSourceGroup(sourceID, devs)
+	}
+}
+
+func (s *CollectorService) processSourceGroup(sourceID string, devices []models.Device) {
+	// Fetch Source Config
+	var source models.IntegrationSource
+	if err := models.DB.First(&source, "id = ?", sourceID).Error; err != nil {
+		log.Printf("Source %s not found: %v\n", sourceID, err)
+		return
+	}
+
+	if source.DriverType != "HOME_ASSISTANT" {
+		// Only HA supported for now
+		return
+	}
+
+	// Collect all Entity IDs
+	entityIDs := make([]string, 0)
+	// Map: EntityID -> []{DeviceID, MetricKey}
+	// One entity might be used by multiple devices/metrics? Rare but possible.
+	type Target struct {
+		DeviceID   string
+		Metric     string
+		LocationID string
+	}
+	requestMap := make(map[string][]Target)
+
+	for _, d := range devices {
+		// Parse Attribute Mapping
+		var mapping map[string]string
+		// mapping is JSONB, stored as datatypes.JSON
+		// Need to unmarshal
+		b, _ := d.AttributeMapping.MarshalJSON()
+		json.Unmarshal(b, &mapping)
+
+		for metric, entityID := range mapping {
+			// Skip formulas for now (keys ending in _formula)
+			if len(metric) > 8 && metric[len(metric)-8:] == "_formula" {
+				continue
+			}
+			if entityID == "" {
+				continue
+			}
+
+			if _, exists := requestMap[entityID]; !exists {
+				entityIDs = append(entityIDs, entityID)
+			}
+			
+			locID := ""
+			if d.LocationID != nil {
+				locID = d.LocationID.String()
+			}
+
+			requestMap[entityID] = append(requestMap[entityID], Target{
+				DeviceID:   d.ID.String(),
+				Metric:     metric,
+				LocationID: locID,
+			})
+		}
+	}
+
+	if len(entityIDs) == 0 {
+		return
+	}
+
+	// Fetch Data
+	// TODO: Handle large batches?
+	states, err := utils.BatchFetchStates(source.Config, entityIDs)
+	if err != nil {
+		log.Printf("Failed to fetch states for source %s: %v\n", source.Name, err)
+		return
+	}
+
+	// Write to TSDB
+	now := time.Now()
+	count := 0
+	
+	for entityID, valStr := range states {
+		// Parse value to float
+		val, err := strconv.ParseFloat(valStr, 64)
+		if err != nil {
+			// Try to handle non-numeric states if necessary (e.g. "on"/"off")
+			if valStr == "on" {
+				val = 1
+			} else if valStr == "off" {
+				val = 0
+			} else {
+				continue // Skip non-numeric
+			}
+		}
+
+		targets := requestMap[entityID]
+		for _, target := range targets {
+			// Insert
+			err := db.InsertReading(target.DeviceID, target.Metric, val, target.LocationID, now)
+			if err != nil {
+				// Log occasional errors, but don't flood
+				// log.Printf("TSDB Insert Error: %v\n", err)
+			} else {
+				count++
+			}
+		}
+	}
+
+	log.Printf("Source %s: Collected %d data points for %d devices", source.Name, count, len(devices))
+}
+

+ 251 - 0
backend/utils/ha_client.go

@@ -0,0 +1,251 @@
+package utils
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"io"
+	"net/http"
+	"sort"
+	"strings"
+	"time"
+
+	"gorm.io/datatypes"
+)
+
+type HAConfig struct {
+	URL   string `json:"url"`
+	Token string `json:"token"`
+}
+
+type HAEntity struct {
+	EntityID    string                 `json:"entity_id"`
+	State       string                 `json:"state"`
+	Attributes  map[string]interface{} `json:"attributes"`
+	LastChanged time.Time              `json:"last_changed"`
+	LastUpdated time.Time              `json:"last_updated"`
+	DeviceID    string                 `json:"device_id"`
+	DeviceName  string                 `json:"device_name"`
+}
+
+type HATemplateReq struct {
+	Template string `json:"template"`
+}
+
+type HATemplateResult struct {
+	ID    string `json:"id"`
+	State string `json:"s"`
+	Name  string `json:"n"`
+	DID   string `json:"did"`
+	DName string `json:"dn"`
+}
+
+// FetchHAEntitiesByDevice fetches entities for a specific device using HA Template API
+func FetchHAEntitiesByDevice(config datatypes.JSON, deviceID string) ([]HAEntity, error) {
+	haConfig, err := parseHAConfig(config)
+	if err != nil {
+		return nil, err
+	}
+
+	client := &http.Client{Timeout: 10 * time.Second}
+	url := normalizeURL(haConfig.URL)
+
+	rawTemplate := `
+{% set ns = namespace(result=[]) %}
+{% set device_entities = device_entities('__DEVICE_ID__') %}
+{% for entity_id in device_entities %}
+  {% set state = states[entity_id] %}
+  {% if state %}
+    {% set name = state.attributes.friendly_name %}
+    {% if name is not defined or name is none %}
+      {% set name = entity_id %}
+    {% endif %}
+    {% set entry = {
+      "id": entity_id,
+      "s": state.state,
+      "n": name,
+      "did": '__DEVICE_ID__',
+      "dn": ''
+    } %}
+    {% set ns.result = ns.result + [entry] %}
+  {% endif %}
+{% endfor %}
+{{ ns.result | to_json }}
+`
+	template := strings.ReplaceAll(rawTemplate, "__DEVICE_ID__", deviceID)
+
+	return executeTemplateQuery(client, url, haConfig.Token, template)
+}
+
+// FetchEntityState fetches a single entity state
+func FetchEntityState(config datatypes.JSON, entityID string) (*HAEntity, error) {
+	haConfig, err := parseHAConfig(config)
+	if err != nil {
+		return nil, err
+	}
+
+	client := &http.Client{Timeout: 5 * time.Second}
+	url := normalizeURL(haConfig.URL)
+	
+	req, err := http.NewRequest("GET", fmt.Sprintf("%s/api/states/%s", url, entityID), nil)
+	if err != nil {
+		return nil, err
+	}
+	req.Header.Set("Authorization", "Bearer "+haConfig.Token)
+	req.Header.Set("Content-Type", "application/json")
+
+	resp, err := client.Do(req)
+	if err != nil {
+		return nil, err
+	}
+	defer resp.Body.Close()
+
+	if resp.StatusCode != 200 {
+		return nil, fmt.Errorf("HA returned status: %s", resp.Status)
+	}
+
+	var entity HAEntity
+	if err := json.NewDecoder(resp.Body).Decode(&entity); err != nil {
+		return nil, err
+	}
+	return &entity, nil
+}
+
+// BatchFetchStates fetches multiple entities in one go using template
+func BatchFetchStates(config datatypes.JSON, entityIDs []string) (map[string]string, error) {
+	if len(entityIDs) == 0 {
+		return map[string]string{}, nil
+	}
+	
+	haConfig, err := parseHAConfig(config)
+	if err != nil {
+		return nil, err
+	}
+
+	client := &http.Client{Timeout: 10 * time.Second}
+	url := normalizeURL(haConfig.URL)
+
+	// Safer template approach: Return List of Structs instead of Dict
+	// This avoids potential Jinja2 dictionary key issues or namespace update quirks
+	idsJson, _ := json.Marshal(entityIDs)
+	// Escape % for Sprintf by using %%
+	template := fmt.Sprintf(`
+{%% set ids = %s %%}
+{%% set result = [] %%}
+{%% for id in ids %%}
+  {%% set s = states(id) %%}
+  {%% if s not in ['unknown', 'unavailable', 'none', ''] %%}
+    {%% set result = result + [{"id": id, "s": s}] %%}
+  {%% endif %%}
+{%% endfor %%}
+{{ result | to_json }}
+`, string(idsJson))
+
+	reqBody, _ := json.Marshal(HATemplateReq{Template: template})
+	req, err := http.NewRequest("POST", url+"/api/template", bytes.NewBuffer(reqBody))
+	if err != nil {
+		return nil, err
+	}
+	req.Header.Set("Authorization", "Bearer "+haConfig.Token)
+	req.Header.Set("Content-Type", "application/json")
+
+	resp, err := client.Do(req)
+	if err != nil {
+		return nil, err
+	}
+	defer resp.Body.Close()
+
+	// Read body for debug if error
+	if resp.StatusCode != 200 {
+		bodyBytes, _ := io.ReadAll(resp.Body)
+		return nil, fmt.Errorf("HA error: %s, Body: %s", resp.Status, string(bodyBytes))
+	}
+
+	// Helper struct for decoding the list response
+	type StateResult struct {
+		ID    string `json:"id"`
+		State string `json:"s"`
+	}
+
+	var results []StateResult
+	if err := json.NewDecoder(resp.Body).Decode(&results); err != nil {
+		return nil, fmt.Errorf("decode error: %v", err)
+	}
+
+	// Convert list to map
+	resultMap := make(map[string]string)
+	for _, r := range results {
+		resultMap[r.ID] = r.State
+	}
+
+	return resultMap, nil
+}
+
+// Helpers
+
+func parseHAConfig(config datatypes.JSON) (HAConfig, error) {
+	var haConfig HAConfig
+	b, err := config.MarshalJSON()
+	if err != nil {
+		return haConfig, fmt.Errorf("config error: %v", err)
+	}
+	if err := json.Unmarshal(b, &haConfig); err != nil {
+		return haConfig, fmt.Errorf("invalid configuration format: %v", err)
+	}
+	if haConfig.URL == "" || haConfig.Token == "" {
+		return haConfig, fmt.Errorf("URL and Token are required")
+	}
+	return haConfig, nil
+}
+
+func normalizeURL(url string) string {
+	url = strings.TrimSuffix(url, "/")
+	url = strings.TrimSuffix(url, "/api")
+	return url
+}
+
+func executeTemplateQuery(client *http.Client, url, token, template string) ([]HAEntity, error) {
+	reqBody, _ := json.Marshal(HATemplateReq{Template: template})
+	req, err := http.NewRequest("POST", url+"/api/template", bytes.NewBuffer(reqBody))
+	if err != nil {
+		return nil, fmt.Errorf("failed to create request: %v", err)
+	}
+	req.Header.Set("Authorization", "Bearer "+token)
+	req.Header.Set("Content-Type", "application/json")
+
+	resp, err := client.Do(req)
+	if err != nil {
+		return nil, fmt.Errorf("connection failed: %v", err)
+	}
+	defer resp.Body.Close()
+
+	if resp.StatusCode != 200 {
+		return nil, fmt.Errorf("Home Assistant returned status: %s", resp.Status)
+	}
+
+	var tmplResults []HATemplateResult
+	if err := json.NewDecoder(resp.Body).Decode(&tmplResults); err != nil {
+		return nil, fmt.Errorf("failed to decode response: %v", err)
+	}
+
+	entities := make([]HAEntity, len(tmplResults))
+	for i, r := range tmplResults {
+		entities[i] = HAEntity{
+			EntityID:   r.ID,
+			State:      r.State,
+			Attributes: map[string]interface{}{"friendly_name": r.Name},
+			DeviceID:   r.DID,
+			DeviceName: r.DName,
+		}
+	}
+
+	sort.Slice(entities, func(i, j int) bool {
+		nameI, _ := entities[i].Attributes["friendly_name"].(string)
+		nameJ, _ := entities[j].Attributes["friendly_name"].(string)
+		if nameI == "" { nameI = entities[i].EntityID }
+		if nameJ == "" { nameJ = entities[j].EntityID }
+		return nameI < nameJ
+	})
+
+	return entities, nil
+}

+ 4 - 1
docker-compose.wsl.yml

@@ -26,7 +26,7 @@ services:
       retries: 5
     environment:
       - DB_DSN=host=postgres user=${POSTGRES_USER} password=${DB_PASSWORD} dbname=${POSTGRES_DB} port=${POSTGRES_PORT} sslmode=disable
-      - TD_DSN=root:taosdata@tcp(${TD_HOST}:${TD_PORT})/power_db
+      - TD_DSN=root:taosdata@http(${TD_HOST}:6041)/power_db
       - REDIS_ADDR=${REDIS_HOST}:${REDIS_PORT}
       - POSTGRES_USER=${POSTGRES_USER}
       - POSTGRES_PASSWORD=${DB_PASSWORD}
@@ -62,12 +62,15 @@ services:
   tdengine:
     image: tdengine/tdengine:3.3.0.0
     container_name: ems-tdengine
+    hostname: tdengine
+    privileged: true
     ports: ["6030:6030"]
     volumes:
       - ./data/taos/data:/var/lib/taos
       - ./data/taos/log:/var/log/taos
     environment:
       - TAOS_FIRST_EP=tdengine:6030
+      - TAOS_FQDN=tdengine
 
   # 5. Cache: Redis
   redis:

+ 1 - 1
documents/功能设计方案.md

@@ -41,7 +41,7 @@
 * 管理 HA 实例、PLC 网关、HTTP 数据源。
 * 功能:新增、编辑、连接测试、立即同步。
 
-#### 1.2 智能导入与清洗
+#### 1.2 设备管理
 
 * **预过滤 (Pre-filter):** 后端自动清洗 `automation`, `script` 等垃圾实体。
 * **交互式列表:**

+ 22 - 0
frontend/src/api/resource.ts

@@ -82,10 +82,32 @@ export const getSourceDeviceEntities = (id: string, deviceId: string) => {
 
 // --- Device APIs ---
 
+export interface DeviceHistoryParams {
+  device_ids: string;
+  metric?: string;
+  start?: string;
+  end?: string;
+  interval?: string;
+}
+
+export interface HistoryData {
+  ts: string;
+  open: number;
+  high: number;
+  low: number;
+  close: number;
+  avg: number;
+  device_id: string;
+}
+
 export const getDevices = (params?: any) => {
   return api.get<any, Device[]>('/devices', { params });
 };
 
+export const getDeviceHistory = (params: DeviceHistoryParams) => {
+  return api.get<any, HistoryData[]>('/devices/history', { params });
+};
+
 export const createDevice = (data: Partial<Device>) => {
   return api.post<any, Device>('/devices', data);
 };

+ 1 - 1
frontend/src/stores/permission.ts

@@ -48,7 +48,7 @@ export const usePermissionStore = defineStore('permission', {
             icon: 'IconApps',
             children: [
               { path: 'datasource', name: '数据源配置', component: 'resource/DataSource' },
-              { path: 'import', name: '智能导入与清洗', component: 'resource/ImportClean' },
+              { path: 'import', name: '设备管理', component: 'resource/ImportClean' },
               { path: 'topology', name: '空间拓扑管理', component: 'resource/Topology' },
             ]
           },

+ 312 - 39
frontend/src/views/resource/ImportClean.vue

@@ -1,7 +1,7 @@
 <template>
   <div class="page-container">
     <div class="header">
-      <a-typography-title :heading="4" style="margin: 0">智能导入与清洗</a-typography-title>
+      <a-typography-title :heading="4" style="margin: 0">设备管理</a-typography-title>
       <div class="filters">
          <a-select v-model="filterSource" placeholder="筛选接入源" allow-clear @change="loadData" style="width: 200px; margin-right: 10px">
             <a-option v-for="s in sources" :key="s.ID" :label="s.Name" :value="s.ID" />
@@ -28,21 +28,19 @@
           <a-table-column title="设备名称" data-index="Name" :width="200" />
           <a-table-column title="原始标识" data-index="ExternalID" :width="200" ellipsis tooltip />
           <a-table-column title="类型" data-index="DeviceType" :width="120" />
-          <a-table-column title="计量模式" data-index="MeteringMode" :width="150">
-            <template #cell="{ record }">
-                <a-tag v-if="record.MeteringMode === 'VIRTUAL'" color="orange">虚拟 ({{ record.RatedPower }}W)</a-tag>
-                <a-tag v-else-if="record.MeteringMode === 'REAL'" color="green">实测</a-tag>
-                <a-tag v-else color="gray">未知</a-tag>
-            </template>
-          </a-table-column>
           <a-table-column title="空间位置" data-index="LocationID" :width="150">
              <template #cell="{ record }">
                  {{ getLocationName(record.LocationID) }}
              </template>
           </a-table-column>
-          <a-table-column title="操作" :width="100" fixed="right">
+          <a-table-column title="操作" :width="180" fixed="right">
             <template #cell="{ record }">
-               <a-button type="text" size="small" @click="editDevice(record)">编辑</a-button>
+               <a-space>
+                 <a-button type="text" size="small" @click="openDataChart(record)">
+                    <template #icon><icon-bar-chart /></template>
+                 </a-button>
+                 <a-button type="text" size="small" @click="editDevice(record)">编辑</a-button>
+               </a-space>
             </template>
           </a-table-column>
         </template>
@@ -53,9 +51,6 @@
     <div class="bottom-toolbar" v-if="selectedDeviceKeys.length > 0">
       <span class="selected-count">已选 {{ selectedDeviceKeys.length }} 个设备</span>
       <a-divider direction="vertical" />
-      <a-button status="warning" @click="openVirtualMeterDialog">
-         <template #icon><icon-thunderbolt /></template> 批量设为虚拟计量
-      </a-button>
       <a-button type="primary" status="success" @click="openLocationDialog" style="margin-left: 10px;">
          <template #icon><icon-location /></template> 批量关联空间
       </a-button>
@@ -63,14 +58,7 @@
       <a-button type="primary" @click="batchImport">确认导入 / 激活</a-button>
     </div>
 
-    <!-- Virtual Meter Dialog -->
-    <a-modal v-model:visible="virtualDialogVisible" title="批量设为虚拟计量" @ok="applyVirtualMeter" @cancel="virtualDialogVisible = false">
-       <a-form :model="{}" layout="vertical">
-          <a-form-item label="额定功率(W)">
-             <a-input-number v-model="batchRatedPower" :min="0" :step="10" />
-          </a-form-item>
-       </a-form>
-    </a-modal>
+    <!-- Virtual Meter Dialog Removed -->
 
     <!-- Location Dialog -->
     <a-modal v-model:visible="locationDialogVisible" title="批量关联空间" @ok="applyLocation" @cancel="locationDialogVisible = false">
@@ -84,15 +72,89 @@
         />
     </a-modal>
 
+    <!-- Edit Dialog -->
+    <a-modal v-model:visible="editVisible" title="设备清洗与配置" @ok="handleEditOk" @cancel="editVisible = false" width="800px">
+        <a-form :model="editForm" layout="vertical">
+            <a-form-item label="设备名称" field="Name">
+                <a-input v-model="editForm.Name" />
+            </a-form-item>
+            <a-form-item label="原始标识" field="ExternalID">
+                <a-input v-model="editForm.ExternalID" disabled />
+            </a-form-item>
+            
+            <a-alert type="info" style="margin-bottom: 10px">配置设备属性映射与清洗规则。</a-alert>
+                 
+            <div v-for="(row, index) in attributeRows" :key="index" style="display: flex; gap: 10px; margin-bottom: 10px; align-items: center">
+               <div style="width: 100px; flex-shrink: 0; text-align: right; padding-right: 10px;">
+                   <span style="font-weight: bold; white-space: nowrap;">{{ getAttributeLabel(row.key) }}</span>
+                   <div style="font-size: 12px; color: var(--color-text-3)" v-if="getAttributeUnit(row.key)">
+                       ({{ getAttributeUnit(row.key) }})
+                   </div>
+               </div>
+               <div style="width: 350px; flex-shrink: 0">
+                   <a-select 
+                       v-model="row.source" 
+                       placeholder="选择数据源 (HA实体)" 
+                       allow-search 
+                       allow-clear
+                       style="width: 100%"
+                   >
+                       <a-option v-for="ent in candidateEntities" :key="ent.entity_id" :value="ent.entity_id">
+                           {{ ent.attributes?.friendly_name || ent.entity_id }} ({{ ent.entity_id }})
+                       </a-option>
+                   </a-select>
+               </div>
+               <div style="flex: 1">
+                   <a-input v-model="row.formula" placeholder="清洗公式 (如 x/1000)" style="width: 100%">
+                        <template #prepend>f(x)=</template>
+                   </a-input>
+               </div>
+            </div>
+            
+            <div v-if="attributeRows.length === 0" style="color: var(--color-text-3); text-align: center; padding: 20px; background: var(--color-fill-2)">
+               暂无标准属性
+            </div>
+
+            <a-form-item label="空间位置" field="LocationID" style="margin-top: 20px">
+                 <a-tree-select
+                   v-model="editForm.LocationID"
+                   :data="locationTree"
+                   :field-names="{ key: 'ID', title: 'Name', children: 'children' }"
+                   placeholder="请选择空间"
+                   allow-clear
+                />
+            </a-form-item>
+        </a-form>
+    </a-modal>
+
+    <!-- Data Chart Modal -->
+    <a-modal v-model:visible="chartVisible" title="历史数据趋势" width="90%" :footer="false" @open="initChart">
+        <div style="margin-bottom: 20px; display: flex; gap: 10px; flex-wrap: wrap; align-items: center">
+            <a-range-picker v-model="dateRange" show-time format="YYYY-MM-DD HH:mm:ss" style="width: 380px" />
+            <a-select v-model="chartInterval" placeholder="粒度" style="width: 100px">
+                <a-option value="1m">1分钟</a-option>
+                <a-option value="1h">1小时</a-option>
+                <a-option value="1d">1天</a-option>
+                <a-option value="1y">1年</a-option>
+            </a-select>
+            <a-select v-model="chartMetric" placeholder="指标" style="width: 150px">
+                 <a-option v-for="attr in standardAttributes" :key="attr.value" :value="attr.value" :label="attr.label" />
+            </a-select>
+            <a-button type="primary" @click="fetchChartData" :loading="chartLoading">查询</a-button>
+        </div>
+        <div ref="chartRef" style="width: 100%; height: 600px;"></div>
+    </a-modal>
+
   </div>
 </template>
 
 <script setup lang="ts">
 import { ref, onMounted, computed, reactive } from 'vue';
 import { Message } from '@arco-design/web-vue';
-import { IconRefresh, IconThunderbolt, IconLocation } from '@arco-design/web-vue/es/icon';
-import { getDevices, getSources, getLocations, updateDevice } from '@/api/resource';
-import type { Device, IntegrationSource, Location as LocType } from '@/api/resource';
+import { getDevices, getSources, getLocations, updateDevice, getSourceDeviceEntities, getDeviceHistory } from '@/api/resource';
+import type { Device, IntegrationSource, Location as LocType, HAEntity } from '@/api/resource';
+import { IconRefresh, IconLocation, IconBarChart } from '@arco-design/web-vue/es/icon';
+import * as echarts from 'echarts';
 
 // --- State ---
 const loading = ref(false);
@@ -103,12 +165,37 @@ const filterSource = ref('');
 const selectedDeviceKeys = ref<string[]>([]);
 const rowSelection = reactive({ type: 'checkbox', showCheckedAll: true, onlyCurrent: false });
 
-const virtualDialogVisible = ref(false);
-const batchRatedPower = ref(15);
-
 const locationDialogVisible = ref(false);
 const batchLocationID = ref('');
 
+const editVisible = ref(false);
+const editForm = reactive<Partial<Device>>({
+    Name: '',
+    AttributeMapping: {},
+    LocationID: ''
+});
+const candidateEntities = ref<HAEntity[]>([]);
+const attributeRows = ref<{ key: string; source: string; formula: string }[]>([]);
+
+// Chart State
+const chartVisible = ref(false);
+const chartRef = ref<HTMLElement | null>(null);
+const chartInstance = ref<echarts.ECharts | null>(null);
+const chartDevice = ref<Device | null>(null);
+const chartMetric = ref('power');
+const chartInterval = ref('1h');
+const dateRange = ref<any[]>([]); // Array of Date or strings
+const chartLoading = ref(false);
+
+const standardAttributes = [
+    { label: '开关状态', value: 'state', unit: '' },
+    { label: '有功功率', value: 'power', unit: 'W' },
+    { label: '电压', value: 'voltage', unit: 'V' },
+    { label: '电流', value: 'current', unit: 'A' },
+    { label: '累计用电', value: 'energy', unit: 'kWh' },
+    { label: '温度', value: 'temperature', unit: '°C' },
+];
+
 // --- Lifecycle ---
 onMounted(async () => {
   await Promise.all([loadSources(), loadLocations()]);
@@ -171,22 +258,150 @@ const getLocationName = (id?: string) => {
     return loc ? loc.Name : id;
 };
 
-// --- Interactions ---
+const getAttributeLabel = (key: string) => {
+    const attr = standardAttributes.find(a => a.value === key);
+    return attr ? attr.label : key;
+};
 
-const openVirtualMeterDialog = () => {
-    virtualDialogVisible.value = true;
+const getAttributeUnit = (key: string) => {
+    const attr = standardAttributes.find(a => a.value === key);
+    return attr ? attr.unit : '';
+};
+
+// --- Chart Methods ---
+
+const openDataChart = (record: Device) => {
+    chartDevice.value = record;
+    chartVisible.value = true;
+    
+    // Default range: last 24h
+    const end = new Date();
+    const start = new Date();
+    start.setTime(start.getTime() - 3600 * 1000 * 24);
+    dateRange.value = [start, end]; 
 };
 
-const applyVirtualMeter = async () => {
-    for (const id of selectedDeviceKeys.value) {
-        await updateDevice(id, { MeteringMode: 'VIRTUAL', RatedPower: batchRatedPower.value });
+const initChart = () => {
+    // Wait for DOM
+    setTimeout(() => {
+        if (chartRef.value) {
+            if (chartInstance.value) {
+                chartInstance.value.dispose();
+            }
+            chartInstance.value = echarts.init(chartRef.value);
+            // Default load
+            fetchChartData();
+        }
+    }, 100);
+};
+
+const fetchChartData = async () => {
+    if (!chartDevice.value) return;
+    
+    // Validate range
+    let startStr = '';
+    let endStr = '';
+    
+    if (dateRange.value && dateRange.value.length === 2) {
+        startStr = new Date(dateRange.value[0]).toISOString();
+        endStr = new Date(dateRange.value[1]).toISOString();
+    } else {
+        // Fallback default
+        const end = new Date();
+        const start = new Date(end.getTime() - 24 * 60 * 60 * 1000);
+        startStr = start.toISOString();
+        endStr = end.toISOString();
+        // Update model to reflect default
+        dateRange.value = [start, end];
+    }
+    
+    chartLoading.value = true;
+    try {
+        const res = await getDeviceHistory({
+            device_ids: chartDevice.value.ID,
+            metric: chartMetric.value,
+            start: startStr,
+            end: endStr,
+            interval: chartInterval.value
+        });
+        const data = (res as any).data || res;
+        
+        renderChart(data);
+    } catch (e) {
+        Message.error('获取数据失败');
+    } finally {
+        chartLoading.value = false;
     }
-    Message.success('批量设置成功');
-    virtualDialogVisible.value = false;
-    loadData();
-    selectedDeviceKeys.value = [];
 };
 
+const renderChart = (data: any[]) => {
+    if (!chartInstance.value) return;
+    
+    const dates = data.map(item => item.ts);
+    // For Candlestick: [open, close, low, high] (ECharts format: open, close, lowest, highest)
+    // Our data: open, high, low, close.
+    // Mapping: [open, close, low, high]
+    const values = data.map(item => [item.open, item.close, item.low, item.high]);
+    
+    const option = {
+        title: {
+            text: `${chartDevice.value?.Name} - ${chartMetric.value}`,
+            left: 'center'
+        },
+        tooltip: {
+            trigger: 'axis',
+            axisPointer: { type: 'cross' }
+        },
+        grid: {
+            left: '5%',
+            right: '5%',
+            bottom: '15%'
+        },
+        xAxis: {
+            type: 'category',
+            data: dates,
+            scale: true,
+            boundaryGap: false,
+            axisLine: { onZero: false },
+            splitLine: { show: false },
+            min: 'dataMin',
+            max: 'dataMax'
+        },
+        yAxis: {
+            scale: true,
+            splitArea: { show: true }
+        },
+        dataZoom: [
+            { type: 'inside', start: 0, end: 100 },
+            { show: true, type: 'slider', bottom: '5%', start: 0, end: 100 }
+        ],
+        series: [
+            {
+                name: chartMetric.value,
+                type: 'candlestick',
+                data: values,
+                itemStyle: {
+                    color: '#ec0000',
+                    color0: '#00da3c',
+                    borderColor: '#8A0000',
+                    borderColor0: '#008F28'
+                }
+            },
+            {
+                name: 'Average',
+                type: 'line',
+                data: data.map(item => item.avg),
+                smooth: true,
+                lineStyle: { opacity: 0.5 }
+            }
+        ]
+    };
+    
+    chartInstance.value.setOption(option);
+};
+
+// --- Interactions ---
+
 const openLocationDialog = () => {
     locationDialogVisible.value = true;
 };
@@ -210,10 +425,68 @@ const batchImport = async () => {
     selectedDeviceKeys.value = [];
 };
 
-const editDevice = (row: Device) => {
-    Message.info(`详细编辑功能待实现: ${row.Name}`);
+const editDevice = async (row: Device) => {
+    Object.assign(editForm, JSON.parse(JSON.stringify(row)));
+    // Ensure AttributeMapping exists
+    if (!editForm.AttributeMapping) {
+        editForm.AttributeMapping = {};
+    }
+    
+    // Parse AttributeMapping into rows
+    attributeRows.value = [];
+    const map = editForm.AttributeMapping;
+
+    // Load ALL standard attributes
+    standardAttributes.forEach(attr => {
+        attributeRows.value.push({
+            key: attr.value,
+            source: map[attr.value] || '',
+            formula: map[`${attr.value}_formula`] || ''
+        });
+    });
+
+    editVisible.value = true;
+    
+    // Fetch candidates if we have SourceID and ExternalID (Device ID in HA)
+    if (row.SourceID && row.ExternalID) {
+        try {
+            const res = await getSourceDeviceEntities(row.SourceID, row.ExternalID);
+            candidateEntities.value = (res as any).data || res;
+        } catch (e) {
+            candidateEntities.value = [];
+            Message.warning('无法加载关联的HA实体列表');
+        }
+    } else {
+        candidateEntities.value = [];
+    }
+};
+
+const handleEditOk = async () => {
+    if (!editForm.ID) return;
+    
+    // Reconstruct AttributeMapping
+    const newMap: Record<string, string> = {};
+    attributeRows.value.forEach(row => {
+        if (row.key && row.source) {
+            newMap[row.key] = row.source;
+            if (row.formula) {
+                newMap[`${row.key}_formula`] = row.formula;
+            }
+        }
+    });
+    editForm.AttributeMapping = newMap;
+
+    try {
+        await updateDevice(editForm.ID, editForm);
+        Message.success('设备配置已更新');
+        editVisible.value = false;
+        loadData();
+    } catch (err) {
+        Message.error('更新失败');
+    }
 };
 
+
 </script>
 
 <style scoped>