| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231 |
- package services
- import (
- "ems-backend/models"
- "fmt"
- "log"
- "strings"
- "sync"
- "time"
- "github.com/google/uuid"
- )
- var GlobalAlarmService *AlarmService
- type AlarmTask struct {
- DeviceID string
- Metric string
- Value float64
- }
- type RuleState struct {
- FirstTriggerTime *time.Time
- LastAlarmTime *time.Time
- }
- type AlarmService struct {
- mu sync.RWMutex
- states map[string]*RuleState // Key: ruleID_deviceID
- taskChan chan AlarmTask // Buffered channel for alarm tasks
-
- // Rate limiting for auto-check
- lastAutoCheckTime time.Time
- checkMu sync.Mutex
- }
- func NewAlarmService() *AlarmService {
- GlobalAlarmService = &AlarmService{
- states: make(map[string]*RuleState),
- taskChan: make(chan AlarmTask, 5000), // Buffer size 5000
- lastAutoCheckTime: time.Now().Add(-60 * time.Second), // Allow immediate first check
- }
- // Start worker pool (e.g., 5 workers)
- for i := 0; i < 5; i++ {
- go GlobalAlarmService.worker()
- }
- return GlobalAlarmService
- }
- func (s *AlarmService) worker() {
- for task := range s.taskChan {
- s.processTask(task)
- }
- }
- func (s *AlarmService) CheckRules(deviceID string, metric string, value float64) {
- select {
- case s.taskChan <- AlarmTask{DeviceID: deviceID, Metric: metric, Value: value}:
- // Task enqueued successfully
- default:
- // Queue is full, drop task to prevent blocking or OOM
- // In a real system, you might want to log this occasionally or use a larger buffer
- // log.Printf("Alarm queue full! Dropping check for %s (%s)", deviceID, metric)
- }
- }
- func (s *AlarmService) processTask(task AlarmTask) {
- deviceID := task.DeviceID
- metric := task.Metric
- value := task.Value
- // 1. Fetch relevant rules via bindings
- // We need to find rules that bind to:
- // a) This DeviceID (TargetType='DEVICE')
- // b) This Device's LocationID (TargetType='SPACE') [TODO: And parent locations]
-
- var rules []models.AlarmRule
-
- // Get Device Location
- // Note: Repeated DB calls here can still be heavy if not cached.
- // Ideally, device info should be cached or passed in.
- var device models.Device
- var locationIDs []string
- if err := models.DB.Select("location_id").First(&device, "id = ?", deviceID).Error; err == nil && device.LocationID != nil {
- locationIDs = append(locationIDs, device.LocationID.String())
- }
- // Build query conditions
- targetIDs := []string{deviceID}
- targetIDs = append(targetIDs, locationIDs...)
-
- err := models.DB.Joins("JOIN alarm_rule_bindings ON alarm_rules.id = alarm_rule_bindings.rule_id").
- Where("alarm_rules.metric = ? AND alarm_rules.enabled = ?", metric, true).
- Where("alarm_rule_bindings.target_id IN ?", targetIDs).
- Find(&rules).Error
- if err != nil {
- return
- }
- // De-duplicate rules
- uniqueRules := make(map[uuid.UUID]models.AlarmRule)
- for _, r := range rules {
- uniqueRules[r.ID] = r
- }
- for _, rule := range uniqueRules {
- s.evaluateRule(rule, deviceID, value)
- }
- }
- func (s *AlarmService) evaluateRule(rule models.AlarmRule, deviceID string, value float64) {
- stateKey := fmt.Sprintf("%s_%s", rule.ID.String(), deviceID)
-
- s.mu.Lock()
- if s.states[stateKey] == nil {
- s.states[stateKey] = &RuleState{}
- }
- state := s.states[stateKey]
- s.mu.Unlock()
- // Check Condition
- matched := false
- switch rule.Operator {
- case ">":
- matched = value > rule.Threshold
- case "<":
- matched = value < rule.Threshold
- case ">=":
- matched = value >= rule.Threshold
- case "<=":
- matched = value <= rule.Threshold
- case "=":
- matched = value == rule.Threshold
- }
- if !matched {
- // Reset trigger time if condition no longer met
- if state.FirstTriggerTime != nil {
- s.mu.Lock()
- state.FirstTriggerTime = nil
- s.mu.Unlock()
- }
- return
- }
- now := time.Now()
-
- s.mu.Lock()
- defer s.mu.Unlock()
- // Handle Duration
- if state.FirstTriggerTime == nil {
- state.FirstTriggerTime = &now
- }
-
- // Check if duration requirement is met
- if now.Sub(*state.FirstTriggerTime).Seconds() < float64(rule.Duration) {
- return // Not long enough
- }
- // Handle Silence Period
- if state.LastAlarmTime != nil {
- if now.Sub(*state.LastAlarmTime).Seconds() < float64(rule.SilencePeriod) {
- return // In silence period
- }
- }
- // Trigger Alarm
- s.triggerAlarm(rule, deviceID, value)
- state.LastAlarmTime = &now
- }
- func (s *AlarmService) triggerAlarm(rule models.AlarmRule, deviceID string, value float64) {
- content := rule.Message
- if content == "" {
- content = fmt.Sprintf("%s 异常: 当前值 %.2f (阈值 %.2f)", rule.Metric, value, rule.Threshold)
- }
-
- // Get device info for name replacement
- var device models.Device
- deviceName := "未知设备"
- if err := models.DB.Select("name").First(&device, "id = ?", deviceID).Error; err == nil {
- deviceName = device.Name
- }
- // Template replacement
- content = strings.ReplaceAll(content, "{val}", fmt.Sprintf("%.2f", value))
- content = strings.ReplaceAll(content, "{dev}", deviceName)
- // Ensure DeviceID is valid UUID
- dUUID, err := uuid.Parse(deviceID)
- if err != nil {
- log.Printf("Invalid DeviceID UUID: %s", deviceID)
- return
- }
- alarm := models.AlarmLog{
- DeviceID: dUUID,
- Type: rule.Name, // Use rule name as Type
- Content: content,
- Status: "ACTIVE",
- StartTime: time.Now(),
- }
- if err := models.DB.Create(&alarm).Error; err != nil {
- log.Printf("Failed to create alarm log: %v", err)
- } else {
- log.Printf("!!! ALARM TRIGGERED !!! Rule: %s, Device: %s, Value: %.2f", rule.Name, deviceID, value)
-
- // 触发自动处理检查 (带频率限制)
- s.triggerAutoCheck()
- }
- }
- // triggerAutoCheck 触发自动处理检查,增加频率限制
- func (s *AlarmService) triggerAutoCheck() {
- s.checkMu.Lock()
- defer s.checkMu.Unlock()
- // 如果距离上次检查不足 60 秒,则跳过
- if time.Since(s.lastAutoCheckTime) < 60*time.Second {
- return
- }
- s.lastAutoCheckTime = time.Now()
- // 异步执行检查
- go CheckAndAutoHandleAlarms()
- }
|