소스 검색

解决了因 Goroutine 暴涨导致的内存溢出(OOM)问题

liuq 1 개월 전
부모
커밋
ee926f5f02
2개의 변경된 파일42개의 추가작업 그리고 10개의 파일을 삭제
  1. 41 9
      backend/services/alarm_service.go
  2. 1 1
      backend/services/collector.go

+ 41 - 9
backend/services/alarm_service.go

@@ -13,24 +13,59 @@ import (
 
 var GlobalAlarmService *AlarmService
 
+type AlarmTask struct {
+	DeviceID string
+	Metric   string
+	Value    float64
+}
+
 type RuleState struct {
 	FirstTriggerTime *time.Time
 	LastAlarmTime    *time.Time
 }
 
 type AlarmService struct {
-	mu     sync.RWMutex
-	states map[string]*RuleState // Key: ruleID_deviceID
+	mu       sync.RWMutex
+	states   map[string]*RuleState // Key: ruleID_deviceID
+	taskChan chan AlarmTask        // Buffered channel for alarm tasks
 }
 
 func NewAlarmService() *AlarmService {
 	GlobalAlarmService = &AlarmService{
-		states: make(map[string]*RuleState),
+		states:   make(map[string]*RuleState),
+		taskChan: make(chan AlarmTask, 5000), // Buffer size 5000
+	}
+
+	// Start worker pool (e.g., 5 workers)
+	for i := 0; i < 5; i++ {
+		go GlobalAlarmService.worker()
 	}
+
 	return GlobalAlarmService
 }
 
+func (s *AlarmService) worker() {
+	for task := range s.taskChan {
+		s.processTask(task)
+	}
+}
+
 func (s *AlarmService) CheckRules(deviceID string, metric string, value float64) {
+	select {
+	case s.taskChan <- AlarmTask{DeviceID: deviceID, Metric: metric, Value: value}:
+		// Task enqueued successfully
+	default:
+		// Queue is full, drop task to prevent blocking or OOM
+		// In a real system, you might want to log this occasionally or use a larger buffer
+		// log.Printf("Alarm queue full! Dropping check for %s (%s)", deviceID, metric)
+	}
+}
+
+func (s *AlarmService) processTask(task AlarmTask) {
+	deviceID := task.DeviceID
+	metric := task.Metric
+	value := task.Value
+
 	// 1. Fetch relevant rules via bindings
 	// We need to find rules that bind to:
 	// a) This DeviceID (TargetType='DEVICE')
@@ -39,17 +74,15 @@ func (s *AlarmService) CheckRules(deviceID string, metric string, value float64)
 	var rules []models.AlarmRule
 	
 	// Get Device Location
+	// Note: Repeated DB calls here can still be heavy if not cached. 
+	// Ideally, device info should be cached or passed in.
 	var device models.Device
 	var locationIDs []string
 	if err := models.DB.Select("location_id").First(&device, "id = ?", deviceID).Error; err == nil && device.LocationID != nil {
-		// Found location, now find parents (Optional recursive, but for now just direct location)
 		locationIDs = append(locationIDs, device.LocationID.String())
-		// TODO: Recursive parent lookup if needed.
 	}
 
 	// Build query conditions
-	// Bindings.TargetID = deviceID OR (Bindings.TargetID IN locationIDs AND Bindings.TargetType='SPACE')
-	
 	targetIDs := []string{deviceID}
 	targetIDs = append(targetIDs, locationIDs...)
 	
@@ -62,8 +95,7 @@ func (s *AlarmService) CheckRules(deviceID string, metric string, value float64)
 		return
 	}
 
-	// De-duplicate rules if multiple bindings point to same rule (gorm might handle, but good to ensure)
-	// (Not strictly necessary if logic is idempotent, but efficient)
+	// De-duplicate rules
 	uniqueRules := make(map[uuid.UUID]models.AlarmRule)
 	for _, r := range rules {
 		uniqueRules[r.ID] = r

+ 1 - 1
backend/services/collector.go

@@ -372,7 +372,7 @@ func (s *CollectorService) processSourceGroup(sourceID string, devices []models.
 
 			// Check Alarms Async
 			if GlobalAlarmService != nil {
-				go GlobalAlarmService.CheckRules(deviceID, metric, val)
+				GlobalAlarmService.CheckRules(deviceID, metric, val)
 			}
 		} else {
 			log.Printf("数据库错误: 保存失败 设备=%s 指标=%s 值=%.2f: %v", deviceID, metric, val, err)