| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- package services
- import (
- "ems-backend/db"
- "log"
- "sync"
- "time"
- )
- const (
- writeChanSize = 10000
- writeBatchSize = 500
- writeFlushInterval = 100 * time.Millisecond
- )
- var GlobalTDengineWriter *TDengineWriter
- // TDengineWriter asynchronously batches and writes readings to TDengine via channel + goroutine.
- // Decouples collection from write, reducing blocking risk during collection cycles.
- type TDengineWriter struct {
- ch chan []db.ReadingRecord
- wg sync.WaitGroup
- }
- // NewTDengineWriter creates and returns a new TDengineWriter.
- func NewTDengineWriter() *TDengineWriter {
- return &TDengineWriter{
- ch: make(chan []db.ReadingRecord, 64),
- }
- }
- // Start starts the background writer goroutine.
- func (w *TDengineWriter) Start() {
- w.wg.Add(1)
- go w.run()
- log.Println("TDengineWriter started (async batch write)")
- }
- // Stop gracefully stops the writer: closes the channel, flushes remaining data, then returns.
- func (w *TDengineWriter) Stop() {
- close(w.ch)
- w.wg.Wait()
- log.Println("TDengineWriter stopped")
- }
- // Write sends a batch of records to the writer. Non-blocking; drops with log if channel is full.
- func (w *TDengineWriter) Write(records []db.ReadingRecord) {
- if len(records) == 0 {
- return
- }
- select {
- case w.ch <- records:
- default:
- log.Printf("TDengineWriter: channel full, dropping %d records", len(records))
- }
- }
- func (w *TDengineWriter) run() {
- defer w.wg.Done()
- ticker := time.NewTicker(writeFlushInterval)
- defer ticker.Stop()
- pending := make([]db.ReadingRecord, 0, writeBatchSize*4)
- flush := func() {
- if len(pending) == 0 {
- return
- }
- if err := db.BatchInsertReadings(pending); err != nil {
- log.Printf("TDengineWriter: batch insert failed (%d records): %v", len(pending), err)
- }
- pending = pending[:0]
- }
- for {
- select {
- case batch, ok := <-w.ch:
- if !ok {
- flush()
- return
- }
- pending = append(pending, batch...)
- for len(pending) >= writeBatchSize {
- chunk := pending[:writeBatchSize]
- pending = pending[writeBatchSize:]
- if err := db.BatchInsertReadings(chunk); err != nil {
- log.Printf("TDengineWriter: batch insert failed (%d records): %v", len(chunk), err)
- }
- }
- case <-ticker.C:
- flush()
- }
- }
- }
|