| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240 |
- from apscheduler.schedulers.asyncio import AsyncIOScheduler
- from apscheduler.triggers.cron import CronTrigger
- from sqlalchemy.orm import Session
- from datetime import datetime
- import os
- import uuid
- import asyncio
- import logging
- import json
- from typing import List, Dict, Any
- from backend.app.core.database import SessionLocal
- from backend.app.models import sql_models
- from backend.app.services.video_core import video_manager
- from backend.app.services.llm_agent import analyze_image
- logger = logging.getLogger(__name__)
- scheduler = AsyncIOScheduler()
- async def run_analysis_pipeline(
- task_id: int,
- camera_ids: List[int],
- model_config: sql_models.ModelConfig,
- rules: List[Any],
- save_to_db: bool = True
- ) -> List[Dict[str, Any]]:
- """
- Reusable core analysis logic.
- Returns a list of result dicts.
- """
- snapshots = {}
-
- # Step A: Snapshots (Memory)
- for cam_id in camera_ids:
- img_bytes = video_manager.get_snapshot(cam_id)
- if img_bytes:
- snapshots[cam_id] = img_bytes
-
- if not snapshots:
- logger.warning(f"Task {task_id}: No snapshots available for cameras {camera_ids}")
- return []
- # Step B: Save to Disk & Prepare LLM Tasks
- llm_tasks = []
- metadata_list = []
-
- date_str = datetime.now().strftime("%Y-%m-%d")
- save_dir = os.path.join("backend/app/static/snapshots", date_str)
- os.makedirs(save_dir, exist_ok=True)
-
- for cam_id, img_bytes in snapshots.items():
- filename = f"{uuid.uuid4()}.jpg"
- filepath = os.path.join(save_dir, filename)
-
- with open(filepath, "wb") as f:
- f.write(img_bytes)
-
- # Relative path for DB/Frontend
- relative_path = f"/static/snapshots/{date_str}/{filename}"
-
- # Prepare LLM call
- llm_tasks.append(
- analyze_image(
- base_url=model_config.base_url,
- api_key=model_config.api_key,
- model_name=model_config.model_name,
- rules=rules,
- image_bytes=img_bytes
- )
- )
-
- metadata_list.append({
- "camera_id": cam_id,
- "snapshot_path": relative_path
- })
-
- # Step C: Parallel LLM Analysis
- results_text = await asyncio.gather(*llm_tasks)
-
- final_results = []
- db = SessionLocal() if save_to_db else None
-
- try:
- for i, text_result in enumerate(results_text):
- meta = metadata_list[i]
-
- # Parse JSON result
- results_data = []
- try:
- clean_text = text_result.replace("```json", "").replace("```", "").strip()
- parsed = json.loads(clean_text)
- if isinstance(parsed, list):
- results_data = parsed
- elif isinstance(parsed, dict):
- results_data = [parsed]
- except Exception as parse_err:
- logger.warning(f"Failed to parse JSON result: {parse_err}. Raw text: {text_result}")
- # Fallback: Create a single error entry if parsing fails
- results_data = [{
- "alarm_name": "Parsing Error",
- "alarm_content": text_result,
- "is_alarm": False, # Or True? Let's say False unless ALARM tag found
- "area": "Unknown",
- "time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- }]
- if "[ALARM]" in text_result:
- results_data[0]["is_alarm"] = True
- # Inject current time into results to ensure accuracy
- current_time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- for item in results_data:
- item["time"] = current_time_str
- # 1. Prepare return entry (One per camera, with full JSON)
- # This is used by the frontend Test functionality to display the table
- result_entry = {
- "camera_id": meta["camera_id"],
- "snapshot_path": meta["snapshot_path"],
- "ai_result": json.dumps(results_data, ensure_ascii=False), # Full JSON for frontend parsing
- "is_alarm": any(item.get("is_alarm") == True for item in results_data)
- }
- final_results.append(result_entry)
-
- # 2. Save to DB (One per item/rule)
- if save_to_db and db:
- for item in results_data:
- is_alarm = item.get("is_alarm", False)
- if isinstance(is_alarm, str):
- is_alarm = is_alarm.lower() == 'true'
-
- # USER REQUEST: If not alarm, do not save to DB
- if not is_alarm:
- continue
- alarm_name = item.get("alarm_name", "Unknown")
- alarm_content = item.get("alarm_content", "")
- area = item.get("area", "")
-
- # Log to Console
- logger.info(f"Task[{task_id}] | Cam[{meta['camera_id']}] | Name: {alarm_name} | Alarm: {is_alarm}")
-
- log = sql_models.TaskLog(
- task_id=task_id,
- camera_id=meta["camera_id"],
- snapshot_path=meta["snapshot_path"],
- alarm_name=alarm_name,
- alarm_content=alarm_content,
- area=area,
- is_alarm=is_alarm,
- check_time=datetime.now()
- )
- db.add(log)
-
- if save_to_db and db:
- db.commit()
-
- except Exception as e:
- logger.error(f"Error saving logs: {e}")
- if db:
- db.rollback()
- finally:
- if db:
- db.close()
-
- return final_results
- async def execute_task(task_id: int):
- db = SessionLocal()
- try:
- task = db.query(sql_models.Task).filter(sql_models.Task.id == task_id).first()
- if not task or not task.is_running:
- return
-
- model_config = task.model_config
- if not model_config:
- logger.error(f"Task {task_id} has no model config")
- return
-
- await run_analysis_pipeline(
- task_id=task.id,
- camera_ids=task.camera_ids,
- model_config=model_config,
- rules=task.rules,
- save_to_db=True
- )
-
- except Exception as e:
- logger.error(f"Task Execution Error: {e}")
- finally:
- db.close()
- def update_job(task_id: int, cron_expression: str, is_running: bool):
- job_id = f"task_{task_id}"
-
- # Remove existing
- if scheduler.get_job(job_id):
- scheduler.remove_job(job_id)
-
- if is_running:
- try:
- # Try standard 5-field cron
- trigger = CronTrigger.from_crontab(cron_expression)
- except ValueError:
- # Try 6-field cron (assume: second minute hour day month day_of_week)
- # APScheduler CronTrigger constructor args:
- # year, month, day, week, day_of_week, hour, minute, second
- try:
- parts = cron_expression.split()
- if len(parts) == 6:
- trigger = CronTrigger(
- second=parts[0],
- minute=parts[1],
- hour=parts[2],
- day=parts[3],
- month=parts[4],
- day_of_week=parts[5]
- )
- else:
- raise ValueError(f"Invalid cron format: {cron_expression}")
- except Exception as e:
- logger.error(f"Failed to schedule task {task_id}: Invalid cron expression '{cron_expression}'. Error: {e}")
- return
- scheduler.add_job(
- execute_task,
- trigger,
- id=job_id,
- args=[task_id],
- replace_existing=True
- )
- def init_scheduler():
- scheduler.start()
- # Restore jobs from DB
- db = SessionLocal()
- tasks = db.query(sql_models.Task).filter(sql_models.Task.is_running == True).all()
- for task in tasks:
- update_job(task.id, task.cron_expression, True)
- db.close()
|