alarm_service.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. package services
  2. import (
  3. "ems-backend/models"
  4. "fmt"
  5. "log"
  6. "strings"
  7. "sync"
  8. "time"
  9. "github.com/google/uuid"
  10. )
  11. var GlobalAlarmService *AlarmService
  12. type AlarmTask struct {
  13. DeviceID string
  14. Metric string
  15. Value float64
  16. }
  17. type RuleState struct {
  18. FirstTriggerTime *time.Time
  19. LastAlarmTime *time.Time
  20. }
  21. type AlarmService struct {
  22. mu sync.RWMutex
  23. states map[string]*RuleState // Key: ruleID_deviceID
  24. taskChan chan AlarmTask // Buffered channel for alarm tasks
  25. // Rate limiting for auto-check
  26. lastAutoCheckTime time.Time
  27. checkMu sync.Mutex
  28. }
  29. func NewAlarmService() *AlarmService {
  30. GlobalAlarmService = &AlarmService{
  31. states: make(map[string]*RuleState),
  32. taskChan: make(chan AlarmTask, 5000), // Buffer size 5000
  33. lastAutoCheckTime: time.Now().Add(-60 * time.Second), // Allow immediate first check
  34. }
  35. // Start worker pool (e.g., 5 workers)
  36. for i := 0; i < 5; i++ {
  37. go GlobalAlarmService.worker()
  38. }
  39. return GlobalAlarmService
  40. }
  41. func (s *AlarmService) worker() {
  42. for task := range s.taskChan {
  43. s.processTask(task)
  44. }
  45. }
  46. func (s *AlarmService) CheckRules(deviceID string, metric string, value float64) {
  47. select {
  48. case s.taskChan <- AlarmTask{DeviceID: deviceID, Metric: metric, Value: value}:
  49. // Task enqueued successfully
  50. default:
  51. // Queue is full, drop task to prevent blocking or OOM
  52. // In a real system, you might want to log this occasionally or use a larger buffer
  53. // log.Printf("Alarm queue full! Dropping check for %s (%s)", deviceID, metric)
  54. }
  55. }
  56. func (s *AlarmService) processTask(task AlarmTask) {
  57. deviceID := task.DeviceID
  58. metric := task.Metric
  59. value := task.Value
  60. // 1. Fetch relevant rules via bindings
  61. // We need to find rules that bind to:
  62. // a) This DeviceID (TargetType='DEVICE')
  63. // b) This Device's LocationID (TargetType='SPACE') [TODO: And parent locations]
  64. var rules []models.AlarmRule
  65. // Get Device Location
  66. // Note: Repeated DB calls here can still be heavy if not cached.
  67. // Ideally, device info should be cached or passed in.
  68. var device models.Device
  69. var locationIDs []string
  70. if err := models.DB.Select("location_id").First(&device, "id = ?", deviceID).Error; err == nil && device.LocationID != nil {
  71. locationIDs = append(locationIDs, device.LocationID.String())
  72. }
  73. // Build query conditions
  74. targetIDs := []string{deviceID}
  75. targetIDs = append(targetIDs, locationIDs...)
  76. err := models.DB.Joins("JOIN alarm_rule_bindings ON alarm_rules.id = alarm_rule_bindings.rule_id").
  77. Where("alarm_rules.metric = ? AND alarm_rules.enabled = ?", metric, true).
  78. Where("alarm_rule_bindings.target_id IN ?", targetIDs).
  79. Find(&rules).Error
  80. if err != nil {
  81. return
  82. }
  83. // De-duplicate rules
  84. uniqueRules := make(map[uuid.UUID]models.AlarmRule)
  85. for _, r := range rules {
  86. uniqueRules[r.ID] = r
  87. }
  88. for _, rule := range uniqueRules {
  89. s.evaluateRule(rule, deviceID, value)
  90. }
  91. }
  92. func (s *AlarmService) evaluateRule(rule models.AlarmRule, deviceID string, value float64) {
  93. stateKey := fmt.Sprintf("%s_%s", rule.ID.String(), deviceID)
  94. s.mu.Lock()
  95. if s.states[stateKey] == nil {
  96. s.states[stateKey] = &RuleState{}
  97. }
  98. state := s.states[stateKey]
  99. s.mu.Unlock()
  100. // Check Condition
  101. matched := false
  102. switch rule.Operator {
  103. case ">":
  104. matched = value > rule.Threshold
  105. case "<":
  106. matched = value < rule.Threshold
  107. case ">=":
  108. matched = value >= rule.Threshold
  109. case "<=":
  110. matched = value <= rule.Threshold
  111. case "=":
  112. matched = value == rule.Threshold
  113. }
  114. if !matched {
  115. // Reset trigger time if condition no longer met
  116. if state.FirstTriggerTime != nil {
  117. s.mu.Lock()
  118. state.FirstTriggerTime = nil
  119. s.mu.Unlock()
  120. }
  121. return
  122. }
  123. now := time.Now()
  124. s.mu.Lock()
  125. defer s.mu.Unlock()
  126. // Handle Duration
  127. if state.FirstTriggerTime == nil {
  128. state.FirstTriggerTime = &now
  129. }
  130. // Check if duration requirement is met
  131. if now.Sub(*state.FirstTriggerTime).Seconds() < float64(rule.Duration) {
  132. return // Not long enough
  133. }
  134. // Handle Silence Period
  135. if state.LastAlarmTime != nil {
  136. if now.Sub(*state.LastAlarmTime).Seconds() < float64(rule.SilencePeriod) {
  137. return // In silence period
  138. }
  139. }
  140. // Trigger Alarm
  141. s.triggerAlarm(rule, deviceID, value)
  142. state.LastAlarmTime = &now
  143. }
  144. func (s *AlarmService) triggerAlarm(rule models.AlarmRule, deviceID string, value float64) {
  145. content := rule.Message
  146. if content == "" {
  147. content = fmt.Sprintf("%s 异常: 当前值 %.2f (阈值 %.2f)", rule.Metric, value, rule.Threshold)
  148. }
  149. // Get device info for name replacement
  150. var device models.Device
  151. deviceName := "未知设备"
  152. if err := models.DB.Select("name").First(&device, "id = ?", deviceID).Error; err == nil {
  153. deviceName = device.Name
  154. }
  155. // Template replacement
  156. content = strings.ReplaceAll(content, "{val}", fmt.Sprintf("%.2f", value))
  157. content = strings.ReplaceAll(content, "{dev}", deviceName)
  158. // Ensure DeviceID is valid UUID
  159. dUUID, err := uuid.Parse(deviceID)
  160. if err != nil {
  161. log.Printf("Invalid DeviceID UUID: %s", deviceID)
  162. return
  163. }
  164. alarm := models.AlarmLog{
  165. DeviceID: dUUID,
  166. Type: rule.Name, // Use rule name as Type
  167. Content: content,
  168. Status: "ACTIVE",
  169. StartTime: time.Now(),
  170. }
  171. if err := models.DB.Create(&alarm).Error; err != nil {
  172. log.Printf("Failed to create alarm log: %v", err)
  173. } else {
  174. log.Printf("!!! ALARM TRIGGERED !!! Rule: %s, Device: %s, Value: %.2f", rule.Name, deviceID, value)
  175. // 触发自动处理检查 (带频率限制)
  176. s.triggerAutoCheck()
  177. }
  178. }
  179. // triggerAutoCheck 触发自动处理检查,增加频率限制
  180. func (s *AlarmService) triggerAutoCheck() {
  181. s.checkMu.Lock()
  182. defer s.checkMu.Unlock()
  183. // 如果距离上次检查不足 60 秒,则跳过
  184. if time.Since(s.lastAutoCheckTime) < 60*time.Second {
  185. return
  186. }
  187. s.lastAutoCheckTime = time.Now()
  188. // 异步执行检查
  189. go CheckAndAutoHandleAlarms()
  190. }