liuq 1 miesiąc temu
rodzic
commit
3c675da590
4 zmienionych plików z 303 dodań i 18 usunięć
  1. 138 0
      TDengine_dev.md
  2. 88 0
      TDengine_dev.pdf
  3. 54 1
      backend/db/tdengine.go
  4. 23 17
      backend/services/collector.go

+ 138 - 0
TDengine_dev.md

@@ -0,0 +1,138 @@
+Ubuntu + TDengine 3.3.0.0 集群设计
+
+---
+
+### 第一步:配置本地 Hosts 解析(仅添加别名)
+
+在 **3 台 Ubuntu 服务器**上,编辑 `/etc/hosts` 文件:
+
+```bash
+sudo nano /etc/hosts
+
+```
+
+在文件末尾追加以下内容。这里的 `TDengine_node` 系列将仅作为 TDengine 集群内部通信使用的域名别名,完全不影响系统原本的主机名:
+
+```text
+192.168.254.11 TDengine_node1
+192.168.254.12 TDengine_node2
+192.168.254.13 TDengine_node3
+
+```
+
+*(如果您之前开启了 ufw 防火墙,请确保放行 6030-6042 端口,或者在内网测试时执行 `sudo ufw disable` 关闭它。)*
+
+---
+
+### 第二步:下载并安装 TDengine 3.3.0.0
+
+在 **3 台服务器**上分别执行下载和安装命令:
+
+```bash
+# 下载 3.3.0.0 版本的 deb 包
+wget https://www.taosdata.com/assets-download/3.0/TDengine-server-3.3.0.0-Linux-x64.deb
+
+# 执行安装(遇到粉色弹窗提示输入 FQDN 和 Email 时,直接按回车跳过)
+sudo dpkg -i TDengine-server-3.3.0.0-Linux-x64.deb
+
+```
+
+---
+
+### 第三步:通过配置文件指定 TDengine 专属 FQDN
+
+这是最关键的一步。通过修改 `taos.cfg`,明确告诉 TDengine 程序它叫什么名字,从而绕开系统原本的主机名。
+
+**1. 在 192.168.254.11 上配置:**
+
+```bash
+sudo nano /etc/taos/taos.cfg
+
+```
+
+写入以下内容(注意第一行定义了集群的起始节点,第二行定义了本机别名):
+
+```ini
+firstEp               TDengine_node1:6030
+fqdn                  TDengine_node1
+
+```
+
+**2. 在 192.168.254.12 上配置:**
+
+```bash
+sudo nano /etc/taos/taos.cfg
+
+```
+
+写入以下内容:
+
+```ini
+firstEp               TDengine_node1:6030
+fqdn                  TDengine_node2
+
+```
+
+**3. 在 192.168.254.13 上配置:**
+
+```bash
+sudo nano /etc/taos/taos.cfg
+
+```
+
+写入以下内容:
+
+```ini
+firstEp               TDengine_node1:6030
+fqdn                  TDengine_node3
+
+```
+
+---
+
+### 第四步:启动服务并组建集群
+
+**1. 启动所有节点的后台服务**
+依次在 3 台机器上执行:
+
+```bash
+sudo systemctl start taosd
+sudo systemctl enable taosd
+
+```
+
+**2. 将节点加入集群 (仅在 192.168.254.11 上操作)**
+在 `11` 这台机器的终端上,输入 `taos` 进入数据库控制台:
+
+```bash
+taos
+
+```
+
+在 `taos>` 提示符下,手动添加另外两个节点:
+
+```sql
+taos> CREATE DNODE "TDengine_node2:6030";
+taos> CREATE DNODE "TDengine_node3:6030";
+
+```
+
+**3. 验证集群状态**
+
+```sql
+taos> SHOW DNODES;
+
+```
+
+如果输出结果显示 3 个节点且 `status` 均为 `ready`,说明集群已经完美建立!
+
+---
+
+### 第五步:创建 3 副本的高可用数据库
+
+最后,在 `taos>` 控制台中创建您的项目数据库并设置 3 副本:
+
+```sql
+taos> CREATE DATABASE my_db REPLICA 3;
+
+```

+ 88 - 0
TDengine_dev.pdf

@@ -0,0 +1,88 @@
+集群设计 Ubuntu + TDengine 3.3.0.0
+
+第一步:配置本地 Hosts 解析(仅添加别名)
+
+在 3 台 Ubuntu 服务器上,编辑 /etc/hosts 文件:
+
+  sudo nano /etc/hosts
+
+在文件末尾追加以下内容。这里的 TDengine_node 系列将仅作为 TDengine 集群内部通信使用的域名别
+名,完全不影响系统原本的主机名:
+
+  192.168.254.11 TDengine_node1
+  192.168.254.12 TDengine_node2
+  192.168.254.13 TDengine_node3
+
+(如果您之前开启了 ufw 防火墙,请确保放行 6030-6042 端口,或者在内网测试时执行
+关闭它。 sudo ufw disable
+                     )
+
+第二步:下载并安装 TDengine 3.3.0.0
+
+在 3 台服务器上分别执行下载和安装命令:
+
+  # 下载 3.3.0.0 版本的 deb 包
+  wget https://www.taosdata.com/assets-download/3.0/TDengine-server-3.3.0.0-Linux-x64.deb
+
+  # 执行安装(遇到粉色弹窗提示输入 FQDN 和 Email 时,直接按回车跳过)
+  sudo dpkg -i TDengine-server-3.3.0.0-Linux-x64.deb
+第三步:通过配置文件指定 TDengine 专属 FQDN
+
+这是最关键的一步。通过修改 taos.cfg ,明确告诉 TDengine 程序它叫什么名字,从而绕开系统原本
+的主机名。
+在 上配置: 1. 192.168.254.11
+
+  sudo nano /etc/taos/taos.cfg
+
+写入以下内容(注意第一行定义了集群的起始节点,第二行定义了本机别名):
+
+firstEp    TDengine_node1:6030
+fqdn       TDengine_node1
+
+在 上配置: 2. 192.168.254.12
+
+  sudo nano /etc/taos/taos.cfg
+
+写入以下内容:    TDengine_node1:6030
+           TDengine_node2
+  firstEp
+  fqdn
+
+在 上配置: 3. 192.168.254.13
+
+  sudo nano /etc/taos/taos.cfg
+
+写入以下内容:    TDengine_node1:6030
+           TDengine_node3
+  firstEp
+  fqdn
+第四步:启动服务并组建集群
+
+1. 启动所有节点的后台服务
+依次在 3 台机器上执行:
+
+  sudo systemctl start taosd
+  sudo systemctl enable taosd
+
+2. 将节点加入集群 (仅在 192.168.254.11 上操作)
+在 11 这台机器的终端上,输入 taos 进入数据库控制台:
+
+  taos
+
+在 taos> 提示符下,手动添加另外两个节点:
+
+  taos> CREATE DNODE "TDengine_node2:6030";
+  taos> CREATE DNODE "TDengine_node3:6030";
+
+3. 验证集群状态
+
+  taos> SHOW DNODES;
+
+如果输出结果显示 3 个节点且 status 均为 ready ,说明集群已经完美建立!
+
+第五步:创建 3 副本的高可用数据库
+
+最后,在 taos> 控制台中创建您的项目数据库并设置 3 副本:
+
+  taos> CREATE DATABASE my_db REPLICA 3;
+

+ 54 - 1
backend/db/tdengine.go

@@ -95,7 +95,60 @@ func initSchema() {
 	_, _ = TD.Exec("ALTER STABLE readings MODIFY TAG metric BINARY(256)")
 }
 
-// InsertReading inserts a single reading
+// ReadingRecord represents a single reading to be batch-inserted
+type ReadingRecord struct {
+	DeviceID   string
+	Metric     string
+	Value      float64
+	LocationID string
+	Ts         time.Time
+}
+
+// escapeSQL escapes single quotes for SQL string literals to prevent injection
+func escapeSQL(s string) string {
+	return strings.ReplaceAll(strings.ReplaceAll(s, "\\", "\\\\"), "'", "''")
+}
+
+// BatchInsertReadings batch inserts multiple readings in one REST call, reducing I/O and network round-trips.
+// Splits into chunks of batchSize (default 500) if records exceed limit to avoid oversized requests.
+func BatchInsertReadings(records []ReadingRecord) error {
+	if TD == nil {
+		return fmt.Errorf("TDengine not initialized")
+	}
+	if len(records) == 0 {
+		return nil
+	}
+
+	const batchSize = 500
+	tsFmt := "2006-01-02 15:04:05.000"
+
+	for start := 0; start < len(records); start += batchSize {
+		end := start + batchSize
+		if end > len(records) {
+			end = len(records)
+		}
+		chunk := records[start:end]
+
+		var sb strings.Builder
+		for i, r := range chunk {
+			safeMetric := strings.ReplaceAll(strings.ReplaceAll(r.Metric, ".", "_"), "-", "_")
+			safeDID := fmt.Sprintf("d_%s_%s", strings.ReplaceAll(r.DeviceID, "-", "_"), safeMetric)
+			sb.WriteString(fmt.Sprintf("INSERT INTO `%s` USING readings TAGS ('%s', '%s', '%s') VALUES ('%s', %f)",
+				safeDID, escapeSQL(r.DeviceID), escapeSQL(r.Metric), escapeSQL(r.LocationID),
+				r.Ts.Format(tsFmt), r.Value))
+			if i < len(chunk)-1 {
+				sb.WriteString("; ")
+			}
+		}
+
+		if _, err := TD.Exec(sb.String()); err != nil {
+			return fmt.Errorf("BatchInsertReadings chunk [%d:%d]: %w", start, end, err)
+		}
+	}
+	return nil
+}
+
+// InsertReading inserts a single reading (kept for backward compatibility)
 func InsertReading(deviceID string, metric string, val float64, locationID string, ts time.Time) error {
 	if TD == nil {
 		return fmt.Errorf("TDengine not initialized")

+ 23 - 17
backend/services/collector.go

@@ -366,28 +366,24 @@ func (s *CollectorService) processSourceGroup(sourceID string, devices []models.
 	}
 
 	now := time.Now()
-	count := 0
-	
-	// Helper to Save
+	var toInsert []db.ReadingRecord
+
+	// Helper to collect (batch insert at end of cycle)
 	saveMetric := func(deviceID, metric string, val float64, locID string) {
 		// Update Cache
 		if deviceMetricValues[deviceID] == nil {
 			deviceMetricValues[deviceID] = make(map[string]float64)
 		}
 		deviceMetricValues[deviceID][metric] = val
-		// DB Insert
-		if err := db.InsertReading(deviceID, metric, val, locID, now); err == nil {
-			count++
-			if s.IsDebug() {
-				log.Printf("调试: 已保存 设备=%s 指标=%s 值=%.2f", deviceID, metric, val)
-			}
-
-			// Check Alarms Async
-			if GlobalAlarmService != nil {
-				GlobalAlarmService.CheckRules(deviceID, metric, val)
-			}
-		} else {
-			log.Printf("数据库错误: 保存失败 设备=%s 指标=%s 值=%.2f: %v", deviceID, metric, val, err)
+		toInsert = append(toInsert, db.ReadingRecord{
+			DeviceID: deviceID, Metric: metric, Value: val, LocationID: locID, Ts: now,
+		})
+		if s.IsDebug() {
+			log.Printf("调试: 已收集 设备=%s 指标=%s 值=%.2f", deviceID, metric, val)
+		}
+		// Check Alarms Async (before batch write)
+		if GlobalAlarmService != nil {
+			GlobalAlarmService.CheckRules(deviceID, metric, val)
 		}
 	}
 
@@ -505,7 +501,17 @@ func (s *CollectorService) processSourceGroup(sourceID string, devices []models.
 			}
 		}
 	}
-	
+
+	// Batch insert to TDengine (one REST call per ~500 records)
+	count := 0
+	if len(toInsert) > 0 {
+		if err := db.BatchInsertReadings(toInsert); err != nil {
+			log.Printf("数据库错误: 批量写入失败 %d 条: %v", len(toInsert), err)
+		} else {
+			count = len(toInsert)
+		}
+	}
+
 	duration := time.Since(start)
 	log.Printf("Source %s: Collected %d data points for %d devices in %v", source.Name, count, len(devices), duration)