liuq 1 tháng trước cách đây
mục cha
commit
916ffa09bd
3 tập tin đã thay đổi với 110 bổ sung4 xóa
  1. 7 0
      backend/main.go
  2. 9 4
      backend/services/collector.go
  3. 94 0
      backend/services/tdengine_writer.go

+ 7 - 0
backend/main.go

@@ -85,6 +85,13 @@ func main() {
 	
 	// Initialize Time Series Database (TDengine)
 	db.InitTDengine()
+
+	// Start async TDengine writer when TDengine is configured (decouples collection from write)
+	if db.TD != nil {
+		services.GlobalTDengineWriter = services.NewTDengineWriter()
+		services.GlobalTDengineWriter.Start()
+		defer services.GlobalTDengineWriter.Stop()
+	}
 	
 	// Initialize Backup Service
 	services.InitBackupService()

+ 9 - 4
backend/services/collector.go

@@ -502,13 +502,18 @@ func (s *CollectorService) processSourceGroup(sourceID string, devices []models.
 		}
 	}
 
-	// Batch insert to TDengine (one REST call per ~500 records)
+	// Async write to TDengine (non-blocking when GlobalTDengineWriter is available)
 	count := 0
 	if len(toInsert) > 0 {
-		if err := db.BatchInsertReadings(toInsert); err != nil {
-			log.Printf("数据库错误: 批量写入失败 %d 条: %v", len(toInsert), err)
-		} else {
+		if GlobalTDengineWriter != nil {
+			GlobalTDengineWriter.Write(toInsert)
 			count = len(toInsert)
+		} else {
+			if err := db.BatchInsertReadings(toInsert); err != nil {
+				log.Printf("数据库错误: 批量写入失败 %d 条: %v", len(toInsert), err)
+			} else {
+				count = len(toInsert)
+			}
 		}
 	}
 

+ 94 - 0
backend/services/tdengine_writer.go

@@ -0,0 +1,94 @@
+package services
+
+import (
+	"ems-backend/db"
+	"log"
+	"sync"
+	"time"
+)
+
+const (
+	writeChanSize       = 10000
+	writeBatchSize      = 500
+	writeFlushInterval  = 100 * time.Millisecond
+)
+
+var GlobalTDengineWriter *TDengineWriter
+
+// TDengineWriter asynchronously batches and writes readings to TDengine via channel + goroutine.
+// Decouples collection from write, reducing blocking risk during collection cycles.
+type TDengineWriter struct {
+	ch chan []db.ReadingRecord
+	wg sync.WaitGroup
+}
+
+// NewTDengineWriter creates and returns a new TDengineWriter.
+func NewTDengineWriter() *TDengineWriter {
+	return &TDengineWriter{
+		ch: make(chan []db.ReadingRecord, 64),
+	}
+}
+
+// Start starts the background writer goroutine.
+func (w *TDengineWriter) Start() {
+	w.wg.Add(1)
+	go w.run()
+	log.Println("TDengineWriter started (async batch write)")
+}
+
+// Stop gracefully stops the writer: closes the channel, flushes remaining data, then returns.
+func (w *TDengineWriter) Stop() {
+	close(w.ch)
+	w.wg.Wait()
+	log.Println("TDengineWriter stopped")
+}
+
+// Write sends a batch of records to the writer. Non-blocking; drops with log if channel is full.
+func (w *TDengineWriter) Write(records []db.ReadingRecord) {
+	if len(records) == 0 {
+		return
+	}
+	select {
+	case w.ch <- records:
+	default:
+		log.Printf("TDengineWriter: channel full, dropping %d records", len(records))
+	}
+}
+
+func (w *TDengineWriter) run() {
+	defer w.wg.Done()
+	ticker := time.NewTicker(writeFlushInterval)
+	defer ticker.Stop()
+
+	pending := make([]db.ReadingRecord, 0, writeBatchSize*4)
+
+	flush := func() {
+		if len(pending) == 0 {
+			return
+		}
+		if err := db.BatchInsertReadings(pending); err != nil {
+			log.Printf("TDengineWriter: batch insert failed (%d records): %v", len(pending), err)
+		}
+		pending = pending[:0]
+	}
+
+	for {
+		select {
+		case batch, ok := <-w.ch:
+			if !ok {
+				flush()
+				return
+			}
+			pending = append(pending, batch...)
+			for len(pending) >= writeBatchSize {
+				chunk := pending[:writeBatchSize]
+				pending = pending[writeBatchSize:]
+				if err := db.BatchInsertReadings(chunk); err != nil {
+					log.Printf("TDengineWriter: batch insert failed (%d records): %v", len(chunk), err)
+				}
+			}
+		case <-ticker.C:
+			flush()
+		}
+	}
+}