tdengine_writer.go 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package services
  2. import (
  3. "ems-backend/db"
  4. "log"
  5. "sync"
  6. "time"
  7. )
  8. const (
  9. writeChanSize = 10000
  10. writeBatchSize = 500
  11. writeFlushInterval = 100 * time.Millisecond
  12. )
  13. var GlobalTDengineWriter *TDengineWriter
  14. // TDengineWriter asynchronously batches and writes readings to TDengine via channel + goroutine.
  15. // Decouples collection from write, reducing blocking risk during collection cycles.
  16. type TDengineWriter struct {
  17. ch chan []db.ReadingRecord
  18. wg sync.WaitGroup
  19. }
  20. // NewTDengineWriter creates and returns a new TDengineWriter.
  21. func NewTDengineWriter() *TDengineWriter {
  22. return &TDengineWriter{
  23. ch: make(chan []db.ReadingRecord, 64),
  24. }
  25. }
  26. // Start starts the background writer goroutine.
  27. func (w *TDengineWriter) Start() {
  28. w.wg.Add(1)
  29. go w.run()
  30. log.Println("TDengineWriter started (async batch write)")
  31. }
  32. // Stop gracefully stops the writer: closes the channel, flushes remaining data, then returns.
  33. func (w *TDengineWriter) Stop() {
  34. close(w.ch)
  35. w.wg.Wait()
  36. log.Println("TDengineWriter stopped")
  37. }
  38. // Write sends a batch of records to the writer. Non-blocking; drops with log if channel is full.
  39. func (w *TDengineWriter) Write(records []db.ReadingRecord) {
  40. if len(records) == 0 {
  41. return
  42. }
  43. select {
  44. case w.ch <- records:
  45. default:
  46. log.Printf("TDengineWriter: channel full, dropping %d records", len(records))
  47. }
  48. }
  49. func (w *TDengineWriter) run() {
  50. defer w.wg.Done()
  51. ticker := time.NewTicker(writeFlushInterval)
  52. defer ticker.Stop()
  53. pending := make([]db.ReadingRecord, 0, writeBatchSize*4)
  54. flush := func() {
  55. if len(pending) == 0 {
  56. return
  57. }
  58. if err := db.BatchInsertReadings(pending); err != nil {
  59. log.Printf("TDengineWriter: batch insert failed (%d records): %v", len(pending), err)
  60. }
  61. pending = pending[:0]
  62. }
  63. for {
  64. select {
  65. case batch, ok := <-w.ch:
  66. if !ok {
  67. flush()
  68. return
  69. }
  70. pending = append(pending, batch...)
  71. for len(pending) >= writeBatchSize {
  72. chunk := pending[:writeBatchSize]
  73. pending = pending[writeBatchSize:]
  74. if err := db.BatchInsertReadings(chunk); err != nil {
  75. log.Printf("TDengineWriter: batch insert failed (%d records): %v", len(chunk), err)
  76. }
  77. }
  78. case <-ticker.C:
  79. flush()
  80. }
  81. }
  82. }