|
|
@@ -21,6 +21,7 @@ type CollectorService struct {
|
|
|
DebugMode bool
|
|
|
mu sync.RWMutex
|
|
|
IntervalSeconds float64
|
|
|
+ runningSources sync.Map
|
|
|
}
|
|
|
|
|
|
func NewCollectorService() *CollectorService {
|
|
|
@@ -151,11 +152,22 @@ func (s *CollectorService) collectJob() {
|
|
|
|
|
|
// 3. Process each group
|
|
|
for sourceID, devs := range deviceGroups {
|
|
|
+ // 并发控制:如果该源的上一次采集还在运行,则跳过本次
|
|
|
+ if _, loaded := s.runningSources.LoadOrStore(sourceID, true); loaded {
|
|
|
+ log.Printf("Warning: Collection for source %s is still running, skipping this cycle.", sourceID)
|
|
|
+ continue
|
|
|
+ }
|
|
|
go s.processSourceGroup(sourceID, devs)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
func (s *CollectorService) processSourceGroup(sourceID string, devices []models.Device) {
|
|
|
+ // 任务结束时移除标记
|
|
|
+ defer s.runningSources.Delete(sourceID)
|
|
|
+
|
|
|
+ // 记录开始时间
|
|
|
+ start := time.Now()
|
|
|
+
|
|
|
// Fetch Source Config
|
|
|
var source models.IntegrationSource
|
|
|
if err := models.DB.First(&source, "id = ?", sourceID).Error; err != nil {
|
|
|
@@ -494,5 +506,10 @@ func (s *CollectorService) processSourceGroup(sourceID string, devices []models.
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- log.Printf("Source %s: Collected %d data points for %d devices", source.Name, count, len(devices))
|
|
|
+ duration := time.Since(start)
|
|
|
+ log.Printf("Source %s: Collected %d data points for %d devices in %v", source.Name, count, len(devices), duration)
|
|
|
+
|
|
|
+ if duration.Seconds() > s.IntervalSeconds {
|
|
|
+ log.Printf("PERFORMANCE WARNING: Collection for source %s took %v, which is longer than interval %.1fs", source.Name, duration, s.IntervalSeconds)
|
|
|
+ }
|
|
|
}
|