scheduler.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. from apscheduler.schedulers.asyncio import AsyncIOScheduler
  2. from apscheduler.triggers.cron import CronTrigger
  3. from sqlalchemy.orm import Session
  4. from datetime import datetime
  5. import os
  6. import uuid
  7. import asyncio
  8. import logging
  9. import json
  10. from typing import List, Dict, Any
  11. from backend.app.core.database import SessionLocal
  12. from backend.app.models import sql_models
  13. from backend.app.services.video_core import video_manager
  14. from backend.app.services.llm_agent import analyze_image
  15. logger = logging.getLogger(__name__)
  16. scheduler = AsyncIOScheduler()
  17. async def run_analysis_pipeline(
  18. task_id: int,
  19. camera_ids: List[int],
  20. model_config: sql_models.ModelConfig,
  21. rules: List[Any],
  22. save_to_db: bool = True
  23. ) -> List[Dict[str, Any]]:
  24. """
  25. Reusable core analysis logic.
  26. Returns a list of result dicts.
  27. """
  28. snapshots = {}
  29. # Step A: Snapshots (Memory)
  30. for cam_id in camera_ids:
  31. img_bytes = video_manager.get_snapshot(cam_id)
  32. if img_bytes:
  33. snapshots[cam_id] = img_bytes
  34. if not snapshots:
  35. logger.warning(f"Task {task_id}: No snapshots available for cameras {camera_ids}")
  36. return []
  37. # Step B: Save to Disk & Prepare LLM Tasks
  38. llm_tasks = []
  39. metadata_list = []
  40. date_str = datetime.now().strftime("%Y-%m-%d")
  41. save_dir = os.path.join("backend/app/static/snapshots", date_str)
  42. os.makedirs(save_dir, exist_ok=True)
  43. for cam_id, img_bytes in snapshots.items():
  44. filename = f"{uuid.uuid4()}.jpg"
  45. filepath = os.path.join(save_dir, filename)
  46. with open(filepath, "wb") as f:
  47. f.write(img_bytes)
  48. # Relative path for DB/Frontend
  49. relative_path = f"/static/snapshots/{date_str}/{filename}"
  50. # Prepare LLM call
  51. llm_tasks.append(
  52. analyze_image(
  53. base_url=model_config.base_url,
  54. api_key=model_config.api_key,
  55. model_name=model_config.model_name,
  56. rules=rules,
  57. image_bytes=img_bytes
  58. )
  59. )
  60. metadata_list.append({
  61. "camera_id": cam_id,
  62. "snapshot_path": relative_path
  63. })
  64. # Step C: Parallel LLM Analysis
  65. results_text = await asyncio.gather(*llm_tasks)
  66. final_results = []
  67. db = SessionLocal() if save_to_db else None
  68. try:
  69. for i, text_result in enumerate(results_text):
  70. meta = metadata_list[i]
  71. # Parse JSON result
  72. results_data = []
  73. try:
  74. clean_text = text_result.replace("```json", "").replace("```", "").strip()
  75. parsed = json.loads(clean_text)
  76. if isinstance(parsed, list):
  77. results_data = parsed
  78. elif isinstance(parsed, dict):
  79. results_data = [parsed]
  80. except Exception as parse_err:
  81. logger.warning(f"Failed to parse JSON result: {parse_err}. Raw text: {text_result}")
  82. # Fallback: Create a single error entry if parsing fails
  83. results_data = [{
  84. "alarm_name": "Parsing Error",
  85. "alarm_content": text_result,
  86. "is_alarm": False, # Or True? Let's say False unless ALARM tag found
  87. "area": "Unknown",
  88. "time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  89. }]
  90. if "[ALARM]" in text_result:
  91. results_data[0]["is_alarm"] = True
  92. # Inject current time into results to ensure accuracy
  93. current_time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  94. for item in results_data:
  95. item["time"] = current_time_str
  96. # 1. Prepare return entry (One per camera, with full JSON)
  97. # This is used by the frontend Test functionality to display the table
  98. result_entry = {
  99. "camera_id": meta["camera_id"],
  100. "snapshot_path": meta["snapshot_path"],
  101. "ai_result": json.dumps(results_data, ensure_ascii=False), # Full JSON for frontend parsing
  102. "is_alarm": any(item.get("is_alarm") == True for item in results_data)
  103. }
  104. final_results.append(result_entry)
  105. # 2. Save to DB (One per item/rule)
  106. if save_to_db and db:
  107. for item in results_data:
  108. is_alarm = item.get("is_alarm", False)
  109. if isinstance(is_alarm, str):
  110. is_alarm = is_alarm.lower() == 'true'
  111. # USER REQUEST: If not alarm, do not save to DB
  112. if not is_alarm:
  113. continue
  114. alarm_name = item.get("alarm_name", "Unknown")
  115. alarm_content = item.get("alarm_content", "")
  116. area = item.get("area", "")
  117. # Log to Console
  118. logger.info(f"Task[{task_id}] | Cam[{meta['camera_id']}] | Name: {alarm_name} | Alarm: {is_alarm}")
  119. log = sql_models.TaskLog(
  120. task_id=task_id,
  121. camera_id=meta["camera_id"],
  122. snapshot_path=meta["snapshot_path"],
  123. alarm_name=alarm_name,
  124. alarm_content=alarm_content,
  125. area=area,
  126. is_alarm=is_alarm,
  127. check_time=datetime.now()
  128. )
  129. db.add(log)
  130. if save_to_db and db:
  131. db.commit()
  132. except Exception as e:
  133. logger.error(f"Error saving logs: {e}")
  134. if db:
  135. db.rollback()
  136. finally:
  137. if db:
  138. db.close()
  139. return final_results
  140. async def execute_task(task_id: int):
  141. db = SessionLocal()
  142. try:
  143. task = db.query(sql_models.Task).filter(sql_models.Task.id == task_id).first()
  144. if not task or not task.is_running:
  145. return
  146. model_config = task.model_config
  147. if not model_config:
  148. logger.error(f"Task {task_id} has no model config")
  149. return
  150. await run_analysis_pipeline(
  151. task_id=task.id,
  152. camera_ids=task.camera_ids,
  153. model_config=model_config,
  154. rules=task.rules,
  155. save_to_db=True
  156. )
  157. except Exception as e:
  158. logger.error(f"Task Execution Error: {e}")
  159. finally:
  160. db.close()
  161. def update_job(task_id: int, cron_expression: str, is_running: bool):
  162. job_id = f"task_{task_id}"
  163. # Remove existing
  164. if scheduler.get_job(job_id):
  165. scheduler.remove_job(job_id)
  166. if is_running:
  167. try:
  168. # Try standard 5-field cron
  169. trigger = CronTrigger.from_crontab(cron_expression)
  170. except ValueError:
  171. # Try 6-field cron (assume: second minute hour day month day_of_week)
  172. # APScheduler CronTrigger constructor args:
  173. # year, month, day, week, day_of_week, hour, minute, second
  174. try:
  175. parts = cron_expression.split()
  176. if len(parts) == 6:
  177. trigger = CronTrigger(
  178. second=parts[0],
  179. minute=parts[1],
  180. hour=parts[2],
  181. day=parts[3],
  182. month=parts[4],
  183. day_of_week=parts[5]
  184. )
  185. else:
  186. raise ValueError(f"Invalid cron format: {cron_expression}")
  187. except Exception as e:
  188. logger.error(f"Failed to schedule task {task_id}: Invalid cron expression '{cron_expression}'. Error: {e}")
  189. return
  190. scheduler.add_job(
  191. execute_task,
  192. trigger,
  193. id=job_id,
  194. args=[task_id],
  195. replace_existing=True
  196. )
  197. def init_scheduler():
  198. scheduler.start()
  199. # Restore jobs from DB
  200. db = SessionLocal()
  201. tasks = db.query(sql_models.Task).filter(sql_models.Task.is_running == True).all()
  202. for task in tasks:
  203. update_job(task.id, task.cron_expression, True)
  204. db.close()