package db import ( "database/sql" "fmt" "log" "os" "strings" "time" _ "github.com/taosdata/driver-go/v3/taosRestful" ) var TD *sql.DB func InitTDengine() { dsn := os.Getenv("TD_DSN") if dsn == "" { log.Println("TD_DSN not set, skipping TDengine initialization") return } // Connect to TDengine using RESTful driver (CGO_ENABLED=0 friendly) var err error TD, err = sql.Open("taosRestful", dsn) if err != nil { log.Printf("Failed to open TDengine connection: %v\n", err) return } if err := TD.Ping(); err != nil { log.Printf("Failed to ping TDengine: %v\n", err) return } log.Println("Connected to TDengine") // Initialize Schema initSchema() } func initSchema() { // Create Database _, err := TD.Exec("CREATE DATABASE IF NOT EXISTS power_db KEEP 365 DURATION 10 BUFFER 16") if err != nil { log.Printf("Failed to create database: %v\n", err) return } // Switch to database _, err = TD.Exec("USE power_db") if err != nil { log.Printf("Failed to use database: %v\n", err) return } // Create Super Table for standard metrics // Using a "Single Value" model for flexibility: // ts: timestamp // val: double value // Tags: device_id, metric (e.g., 'power', 'voltage'), location_id stSql := `CREATE STABLE IF NOT EXISTS readings ( ts TIMESTAMP, val DOUBLE ) TAGS ( device_id BINARY(64), metric BINARY(256), location_id BINARY(64) )` _, err = TD.Exec(stSql) if err != nil { log.Printf("Failed to create stable readings: %v\n", err) } else { log.Println("TDengine schema initialized") } // Create Super Table for switch logs stSwitchSql := `CREATE STABLE IF NOT EXISTS switch_logs ( ts TIMESTAMP, val BOOL ) TAGS ( device_id BINARY(64), metric BINARY(256), location_id BINARY(64) )` _, err = TD.Exec(stSwitchSql) if err != nil { log.Printf("Failed to create stable switch_logs: %v\n", err) } // Upgrade schema for existing tables (ignore error if redundant) _, _ = TD.Exec("ALTER STABLE readings MODIFY TAG metric BINARY(256)") } // InsertReading inserts a single reading func InsertReading(deviceID string, metric string, val float64, locationID string, ts time.Time) error { if TD == nil { return fmt.Errorf("TDengine not initialized") } // Sanitize table name: d_{device_id}_{metric} // We need to ensure device_id and metric are safe for table names. // UUIDs are safe (with hyphens replaced by underscores). // Metrics should be alphanumeric. // Actually, using automatic table creation via INSERT ... USING is best. // Table name: t_{device_id}_{metric} (hash or sanitized) // But simple string concatenation is risky if not sanitized. // Let's rely on prepared statements or careful construction. // Construct table name // deviceID: usually UUID // metric: e.g. "power", "voltage" // locationID: UUID or empty // Safe table name construction // Replace '-' with '_' and '.' with '_' to ensure valid table name safeMetric := metric safeMetric = strings.ReplaceAll(safeMetric, ".", "_") safeMetric = strings.ReplaceAll(safeMetric, "-", "_") safeDID := fmt.Sprintf("d_%s_%s", strings.ReplaceAll(deviceID, "-", "_"), safeMetric) // Remove special chars if any (simplified) // SQL: INSERT INTO {table_name} USING readings TAGS (...) VALUES (?, ?) // Note: taosSql driver might not support prepared statement for Table Name or Tags in USING clause well in all versions. // Standard approach: fmt.Sprintf for the SQL structure. query := fmt.Sprintf("INSERT INTO `%s` USING readings TAGS ('%s', '%s', '%s') VALUES (?, ?)", safeDID, deviceID, metric, locationID) _, err := TD.Exec(query, ts, val) return err } // InsertSwitchLog inserts a switch status log func InsertSwitchLog(deviceID string, metric string, val bool, locationID string, ts time.Time) error { if TD == nil { return fmt.Errorf("TDengine not initialized") } safeMetric := metric safeMetric = strings.ReplaceAll(safeMetric, ".", "_") safeMetric = strings.ReplaceAll(safeMetric, "-", "_") // Use 's_' prefix for switch tables safeDID := fmt.Sprintf("s_%s_%s", strings.ReplaceAll(deviceID, "-", "_"), safeMetric) query := fmt.Sprintf("INSERT INTO `%s` USING switch_logs TAGS ('%s', '%s', '%s') VALUES (?, ?)", safeDID, deviceID, metric, locationID) _, err := TD.Exec(query, ts, val) return err } // ReadingHistory represents aggregated historical data type ReadingHistory struct { Ts string `json:"ts"` Open float64 `json:"open"` High float64 `json:"high"` Low float64 `json:"low"` Close float64 `json:"close"` Avg float64 `json:"avg"` DeviceID string `json:"device_id"` } // GetReadings fetches aggregated historical data func GetReadings(deviceIDs []string, metric string, start, end time.Time, interval string) ([]ReadingHistory, error) { if TD == nil { return nil, fmt.Errorf("TDengine not initialized") } if len(deviceIDs) == 0 { return []ReadingHistory{}, nil } // Construct device_id IN clause // device_id is BINARY in TDengine, so we quote them inClause := "" for i, id := range deviceIDs { if i > 0 { inClause += ", " } inClause += fmt.Sprintf("'%s'", id) } // Metric Filter metricFilter := "" if metric != "" { metricFilter = fmt.Sprintf("AND metric = '%s'", metric) } var query string // 如果 interval 为 "raw",查询原始数据(不聚合) // 这有助于在数据量少时直接查看,或者调试 if interval == "raw" { // 限制 2000 条以防止数据量过大 // 选择 val 重复 5 次是为了复用 Scan 逻辑 (Open, High, Low, Close, Avg) query = fmt.Sprintf(`SELECT ts, val, val, val, val, val, device_id FROM readings WHERE device_id IN (%s) %s AND ts >= '%s' AND ts <= '%s' ORDER BY ts ASC LIMIT 2000`, inClause, metricFilter, start.Format("2006-01-02 15:04:05"), end.Format("2006-01-02 15:04:05")) } else { // 聚合查询 if interval == "" { interval = "1h" } if interval == "1y" { interval = "365d" } // TDengine Query: Simplified to avoid GROUP BY + INTERVAL conflict query = fmt.Sprintf(`SELECT _wstart, FIRST(val), MAX(val), MIN(val), LAST(val), AVG(val) FROM readings WHERE device_id IN (%s) %s AND ts >= '%s' AND ts <= '%s' INTERVAL(%s) ORDER BY _wstart ASC`, inClause, metricFilter, start.Format("2006-01-02 15:04:05"), end.Format("2006-01-02 15:04:05"), interval) } rows, err := TD.Query(query) if err != nil { log.Printf("TDengine Query Error: %v\nQuery: %s", err, query) return nil, err } defer rows.Close() var results []ReadingHistory for rows.Next() { var r ReadingHistory var ts time.Time // var did sql.NullString // Result columns: ts, open, high, low, close, avg if err := rows.Scan(&ts, &r.Open, &r.High, &r.Low, &r.Close, &r.Avg); err != nil { log.Printf("Scan error: %v", err) continue } r.Ts = ts.Format("2006-01-02 15:04:05") // If we only queried one device, we can fill it here if len(deviceIDs) == 1 { r.DeviceID = deviceIDs[0] } results = append(results, r) } return results, nil }